diff options
| author | Shulhan <ms@kilabit.info> | 2023-07-01 01:34:25 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2023-07-01 17:25:15 +0700 |
| commit | b9f69e549f1ecde42f34cc2dab6437bb62c42fe6 (patch) | |
| tree | 9a83faa9b24eab3b5456ce3b7f6f7e04f8201204 /lib/websocket | |
| parent | 091751a2e0e4f162454275e511a718d15ebf3886 (diff) | |
| download | pakakeh.go-b9f69e549f1ecde42f34cc2dab6437bb62c42fe6.tar.xz | |
lib/websocket: handle concurrent ping using goroutines
The maximum goroutines is quarter of max queue.
The new goroutine for pinger will be dispatched when no goroutine can
consume the current processed connection.
Diffstat (limited to 'lib/websocket')
| -rw-r--r-- | lib/websocket/server.go | 72 | ||||
| -rw-r--r-- | lib/websocket/server_options.go | 8 |
2 files changed, 67 insertions, 13 deletions
diff --git a/lib/websocket/server.go b/lib/websocket/server.go index b872ad76..30985268 100644 --- a/lib/websocket/server.go +++ b/lib/websocket/server.go @@ -52,6 +52,7 @@ type Server struct { // cause undefined effects. Options *ServerOptions + qpinger chan int chUpgrade chan int qreader chan int running chan struct{} @@ -65,6 +66,7 @@ type Server struct { sock int + numGoPinger atomic.Int32 numGoUpgrade atomic.Int32 numGoReader atomic.Int32 @@ -83,6 +85,7 @@ func NewServer(opts *ServerOptions) (serv *Server) { Options: opts, Clients: newClientManager(), routes: newRootRoute(), + qpinger: make(chan int), chUpgrade: make(chan int), qreader: make(chan int), running: make(chan struct{}, 1), @@ -832,17 +835,15 @@ func (serv *Server) delayReader(conn int) { serv.handleClose(conn, &req) } -// pinger is a routine that send control PING frame to all client connections +// pollPinger is a routine that send control PING frame to all client connections // every N seconds. -func (serv *Server) pinger() { +func (serv *Server) pollPinger() { var ( - logp = `pinger` pingTicker *time.Ticker = time.NewTicker(16 * time.Second) - framePing []byte = NewFramePing(false, nil) - all []int - conn int - err error + all []int + conn int + numPinger int32 ) for { @@ -851,13 +852,19 @@ func (serv *Server) pinger() { all = serv.Clients.All() for _, conn = range all { - err = Send(conn, framePing, serv.Options.ReadWriteTimeout) - if err != nil { - // Error on sending PING will be - // assumed as bad connection. - log.Printf(`%s: %s`, logp, err) - serv.ClientRemove(conn) + select { + case serv.qpinger <- conn: + default: + numPinger = serv.numGoPinger.Load() + if numPinger < serv.Options.maxGoroutinePinger { + go serv.pinger() + serv.numGoPinger.Add(1) + serv.qpinger <- conn + } else { + go serv.delayPinger(conn) + } } + } case <-serv.running: return @@ -865,6 +872,43 @@ func (serv *Server) pinger() { } } +func (serv *Server) pinger() { + var ( + framePing = NewFramePing(false, nil) + + conn int + err error + ) + for conn = range serv.qpinger { + err = Send(conn, framePing, serv.Options.ReadWriteTimeout) + if err != nil { + // Error on sending PING will be assumed as bad + // connection. + serv.ClientRemove(conn) + } + } +} + +func (serv *Server) delayPinger(conn int) { + var ( + delay = 300 * time.Millisecond + total time.Duration + ) + for total < serv.Options.ReadWriteTimeout { + time.Sleep(delay) + select { + case serv.qpinger <- conn: + return + default: + total += delay + } + } + var req = Frame{ + closeCode: StatusInternalError, + } + serv.handleClose(conn, &req) +} + // Start accepting incoming connection from clients. func (serv *Server) Start() (err error) { var logp = `Start` @@ -886,7 +930,9 @@ func (serv *Server) Start() (err error) { go serv.reader() serv.numGoReader.Add(1) + go serv.pollPinger() go serv.pinger() + serv.numGoPinger.Add(1) var ( conn int diff --git a/lib/websocket/server_options.go b/lib/websocket/server_options.go index dc8399a7..0c169188 100644 --- a/lib/websocket/server_options.go +++ b/lib/websocket/server_options.go @@ -15,6 +15,7 @@ const ( defServerStatusPath = "/status" defServerReadWriteTimeout = 30 * time.Second + defServerMaxGoroutinePinger = _maxQueue / 4 defServerMaxGoroutineReader = 1024 defServerMaxGoroutineUpgrader int32 = 128 ) @@ -72,6 +73,10 @@ type ServerOptions struct { // Default to 30 seconds. ReadWriteTimeout time.Duration + // maxGoroutinePinger define maximum number of goroutines to ping each + // connected clients at the same time. + maxGoroutinePinger int32 + maxGoroutineReader int32 // maxGoroutineUpgrader define maximum goroutines running at the same @@ -95,6 +100,9 @@ func (opts *ServerOptions) init() { if opts.ReadWriteTimeout <= 0 { opts.ReadWriteTimeout = defServerReadWriteTimeout } + if opts.maxGoroutinePinger <= 0 { + opts.maxGoroutinePinger = defServerMaxGoroutinePinger + } if opts.maxGoroutineReader <= 0 { opts.maxGoroutineReader = defServerMaxGoroutineReader } |
