diff options
| author | Shulhan <ms@kilabit.info> | 2023-07-01 01:34:25 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2023-07-01 17:25:15 +0700 |
| commit | b9f69e549f1ecde42f34cc2dab6437bb62c42fe6 (patch) | |
| tree | 9a83faa9b24eab3b5456ce3b7f6f7e04f8201204 | |
| parent | 091751a2e0e4f162454275e511a718d15ebf3886 (diff) | |
| download | pakakeh.go-b9f69e549f1ecde42f34cc2dab6437bb62c42fe6.tar.xz | |
lib/websocket: handle concurrent ping using goroutines
The maximum goroutines is quarter of max queue.
The new goroutine for pinger will be dispatched when no goroutine can
consume the current processed connection.
| -rw-r--r-- | lib/websocket/server.go | 72 | ||||
| -rw-r--r-- | lib/websocket/server_options.go | 8 |
2 files changed, 67 insertions, 13 deletions
diff --git a/lib/websocket/server.go b/lib/websocket/server.go index b872ad76..30985268 100644 --- a/lib/websocket/server.go +++ b/lib/websocket/server.go @@ -52,6 +52,7 @@ type Server struct { // cause undefined effects. Options *ServerOptions + qpinger chan int chUpgrade chan int qreader chan int running chan struct{} @@ -65,6 +66,7 @@ type Server struct { sock int + numGoPinger atomic.Int32 numGoUpgrade atomic.Int32 numGoReader atomic.Int32 @@ -83,6 +85,7 @@ func NewServer(opts *ServerOptions) (serv *Server) { Options: opts, Clients: newClientManager(), routes: newRootRoute(), + qpinger: make(chan int), chUpgrade: make(chan int), qreader: make(chan int), running: make(chan struct{}, 1), @@ -832,17 +835,15 @@ func (serv *Server) delayReader(conn int) { serv.handleClose(conn, &req) } -// pinger is a routine that send control PING frame to all client connections +// pollPinger is a routine that send control PING frame to all client connections // every N seconds. -func (serv *Server) pinger() { +func (serv *Server) pollPinger() { var ( - logp = `pinger` pingTicker *time.Ticker = time.NewTicker(16 * time.Second) - framePing []byte = NewFramePing(false, nil) - all []int - conn int - err error + all []int + conn int + numPinger int32 ) for { @@ -851,13 +852,19 @@ func (serv *Server) pinger() { all = serv.Clients.All() for _, conn = range all { - err = Send(conn, framePing, serv.Options.ReadWriteTimeout) - if err != nil { - // Error on sending PING will be - // assumed as bad connection. - log.Printf(`%s: %s`, logp, err) - serv.ClientRemove(conn) + select { + case serv.qpinger <- conn: + default: + numPinger = serv.numGoPinger.Load() + if numPinger < serv.Options.maxGoroutinePinger { + go serv.pinger() + serv.numGoPinger.Add(1) + serv.qpinger <- conn + } else { + go serv.delayPinger(conn) + } } + } case <-serv.running: return @@ -865,6 +872,43 @@ func (serv *Server) pinger() { } } +func (serv *Server) pinger() { + var ( + framePing = NewFramePing(false, nil) + + conn int + err error + ) + for conn = range serv.qpinger { + err = Send(conn, framePing, serv.Options.ReadWriteTimeout) + if err != nil { + // Error on sending PING will be assumed as bad + // connection. + serv.ClientRemove(conn) + } + } +} + +func (serv *Server) delayPinger(conn int) { + var ( + delay = 300 * time.Millisecond + total time.Duration + ) + for total < serv.Options.ReadWriteTimeout { + time.Sleep(delay) + select { + case serv.qpinger <- conn: + return + default: + total += delay + } + } + var req = Frame{ + closeCode: StatusInternalError, + } + serv.handleClose(conn, &req) +} + // Start accepting incoming connection from clients. func (serv *Server) Start() (err error) { var logp = `Start` @@ -886,7 +930,9 @@ func (serv *Server) Start() (err error) { go serv.reader() serv.numGoReader.Add(1) + go serv.pollPinger() go serv.pinger() + serv.numGoPinger.Add(1) var ( conn int diff --git a/lib/websocket/server_options.go b/lib/websocket/server_options.go index dc8399a7..0c169188 100644 --- a/lib/websocket/server_options.go +++ b/lib/websocket/server_options.go @@ -15,6 +15,7 @@ const ( defServerStatusPath = "/status" defServerReadWriteTimeout = 30 * time.Second + defServerMaxGoroutinePinger = _maxQueue / 4 defServerMaxGoroutineReader = 1024 defServerMaxGoroutineUpgrader int32 = 128 ) @@ -72,6 +73,10 @@ type ServerOptions struct { // Default to 30 seconds. ReadWriteTimeout time.Duration + // maxGoroutinePinger define maximum number of goroutines to ping each + // connected clients at the same time. + maxGoroutinePinger int32 + maxGoroutineReader int32 // maxGoroutineUpgrader define maximum goroutines running at the same @@ -95,6 +100,9 @@ func (opts *ServerOptions) init() { if opts.ReadWriteTimeout <= 0 { opts.ReadWriteTimeout = defServerReadWriteTimeout } + if opts.maxGoroutinePinger <= 0 { + opts.maxGoroutinePinger = defServerMaxGoroutinePinger + } if opts.maxGoroutineReader <= 0 { opts.maxGoroutineReader = defServerMaxGoroutineReader } |
