summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2023-07-01 01:34:25 +0700
committerShulhan <ms@kilabit.info>2023-07-01 17:25:15 +0700
commitb9f69e549f1ecde42f34cc2dab6437bb62c42fe6 (patch)
tree9a83faa9b24eab3b5456ce3b7f6f7e04f8201204
parent091751a2e0e4f162454275e511a718d15ebf3886 (diff)
downloadpakakeh.go-b9f69e549f1ecde42f34cc2dab6437bb62c42fe6.tar.xz
lib/websocket: handle concurrent ping using goroutines
The maximum goroutines is quarter of max queue. The new goroutine for pinger will be dispatched when no goroutine can consume the current processed connection.
-rw-r--r--lib/websocket/server.go72
-rw-r--r--lib/websocket/server_options.go8
2 files changed, 67 insertions, 13 deletions
diff --git a/lib/websocket/server.go b/lib/websocket/server.go
index b872ad76..30985268 100644
--- a/lib/websocket/server.go
+++ b/lib/websocket/server.go
@@ -52,6 +52,7 @@ type Server struct {
// cause undefined effects.
Options *ServerOptions
+ qpinger chan int
chUpgrade chan int
qreader chan int
running chan struct{}
@@ -65,6 +66,7 @@ type Server struct {
sock int
+ numGoPinger atomic.Int32
numGoUpgrade atomic.Int32
numGoReader atomic.Int32
@@ -83,6 +85,7 @@ func NewServer(opts *ServerOptions) (serv *Server) {
Options: opts,
Clients: newClientManager(),
routes: newRootRoute(),
+ qpinger: make(chan int),
chUpgrade: make(chan int),
qreader: make(chan int),
running: make(chan struct{}, 1),
@@ -832,17 +835,15 @@ func (serv *Server) delayReader(conn int) {
serv.handleClose(conn, &req)
}
-// pinger is a routine that send control PING frame to all client connections
+// pollPinger is a routine that send control PING frame to all client connections
// every N seconds.
-func (serv *Server) pinger() {
+func (serv *Server) pollPinger() {
var (
- logp = `pinger`
pingTicker *time.Ticker = time.NewTicker(16 * time.Second)
- framePing []byte = NewFramePing(false, nil)
- all []int
- conn int
- err error
+ all []int
+ conn int
+ numPinger int32
)
for {
@@ -851,13 +852,19 @@ func (serv *Server) pinger() {
all = serv.Clients.All()
for _, conn = range all {
- err = Send(conn, framePing, serv.Options.ReadWriteTimeout)
- if err != nil {
- // Error on sending PING will be
- // assumed as bad connection.
- log.Printf(`%s: %s`, logp, err)
- serv.ClientRemove(conn)
+ select {
+ case serv.qpinger <- conn:
+ default:
+ numPinger = serv.numGoPinger.Load()
+ if numPinger < serv.Options.maxGoroutinePinger {
+ go serv.pinger()
+ serv.numGoPinger.Add(1)
+ serv.qpinger <- conn
+ } else {
+ go serv.delayPinger(conn)
+ }
}
+
}
case <-serv.running:
return
@@ -865,6 +872,43 @@ func (serv *Server) pinger() {
}
}
+func (serv *Server) pinger() {
+ var (
+ framePing = NewFramePing(false, nil)
+
+ conn int
+ err error
+ )
+ for conn = range serv.qpinger {
+ err = Send(conn, framePing, serv.Options.ReadWriteTimeout)
+ if err != nil {
+ // Error on sending PING will be assumed as bad
+ // connection.
+ serv.ClientRemove(conn)
+ }
+ }
+}
+
+func (serv *Server) delayPinger(conn int) {
+ var (
+ delay = 300 * time.Millisecond
+ total time.Duration
+ )
+ for total < serv.Options.ReadWriteTimeout {
+ time.Sleep(delay)
+ select {
+ case serv.qpinger <- conn:
+ return
+ default:
+ total += delay
+ }
+ }
+ var req = Frame{
+ closeCode: StatusInternalError,
+ }
+ serv.handleClose(conn, &req)
+}
+
// Start accepting incoming connection from clients.
func (serv *Server) Start() (err error) {
var logp = `Start`
@@ -886,7 +930,9 @@ func (serv *Server) Start() (err error) {
go serv.reader()
serv.numGoReader.Add(1)
+ go serv.pollPinger()
go serv.pinger()
+ serv.numGoPinger.Add(1)
var (
conn int
diff --git a/lib/websocket/server_options.go b/lib/websocket/server_options.go
index dc8399a7..0c169188 100644
--- a/lib/websocket/server_options.go
+++ b/lib/websocket/server_options.go
@@ -15,6 +15,7 @@ const (
defServerStatusPath = "/status"
defServerReadWriteTimeout = 30 * time.Second
+ defServerMaxGoroutinePinger = _maxQueue / 4
defServerMaxGoroutineReader = 1024
defServerMaxGoroutineUpgrader int32 = 128
)
@@ -72,6 +73,10 @@ type ServerOptions struct {
// Default to 30 seconds.
ReadWriteTimeout time.Duration
+ // maxGoroutinePinger define maximum number of goroutines to ping each
+ // connected clients at the same time.
+ maxGoroutinePinger int32
+
maxGoroutineReader int32
// maxGoroutineUpgrader define maximum goroutines running at the same
@@ -95,6 +100,9 @@ func (opts *ServerOptions) init() {
if opts.ReadWriteTimeout <= 0 {
opts.ReadWriteTimeout = defServerReadWriteTimeout
}
+ if opts.maxGoroutinePinger <= 0 {
+ opts.maxGoroutinePinger = defServerMaxGoroutinePinger
+ }
if opts.maxGoroutineReader <= 0 {
opts.maxGoroutineReader = defServerMaxGoroutineReader
}