aboutsummaryrefslogtreecommitdiff
path: root/lib/websocket
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2023-07-01 01:34:25 +0700
committerShulhan <ms@kilabit.info>2023-07-01 17:25:15 +0700
commitb9f69e549f1ecde42f34cc2dab6437bb62c42fe6 (patch)
tree9a83faa9b24eab3b5456ce3b7f6f7e04f8201204 /lib/websocket
parent091751a2e0e4f162454275e511a718d15ebf3886 (diff)
downloadpakakeh.go-b9f69e549f1ecde42f34cc2dab6437bb62c42fe6.tar.xz
lib/websocket: handle concurrent ping using goroutines
The maximum goroutines is quarter of max queue. The new goroutine for pinger will be dispatched when no goroutine can consume the current processed connection.
Diffstat (limited to 'lib/websocket')
-rw-r--r--lib/websocket/server.go72
-rw-r--r--lib/websocket/server_options.go8
2 files changed, 67 insertions, 13 deletions
diff --git a/lib/websocket/server.go b/lib/websocket/server.go
index b872ad76..30985268 100644
--- a/lib/websocket/server.go
+++ b/lib/websocket/server.go
@@ -52,6 +52,7 @@ type Server struct {
// cause undefined effects.
Options *ServerOptions
+ qpinger chan int
chUpgrade chan int
qreader chan int
running chan struct{}
@@ -65,6 +66,7 @@ type Server struct {
sock int
+ numGoPinger atomic.Int32
numGoUpgrade atomic.Int32
numGoReader atomic.Int32
@@ -83,6 +85,7 @@ func NewServer(opts *ServerOptions) (serv *Server) {
Options: opts,
Clients: newClientManager(),
routes: newRootRoute(),
+ qpinger: make(chan int),
chUpgrade: make(chan int),
qreader: make(chan int),
running: make(chan struct{}, 1),
@@ -832,17 +835,15 @@ func (serv *Server) delayReader(conn int) {
serv.handleClose(conn, &req)
}
-// pinger is a routine that send control PING frame to all client connections
+// pollPinger is a routine that send control PING frame to all client connections
// every N seconds.
-func (serv *Server) pinger() {
+func (serv *Server) pollPinger() {
var (
- logp = `pinger`
pingTicker *time.Ticker = time.NewTicker(16 * time.Second)
- framePing []byte = NewFramePing(false, nil)
- all []int
- conn int
- err error
+ all []int
+ conn int
+ numPinger int32
)
for {
@@ -851,13 +852,19 @@ func (serv *Server) pinger() {
all = serv.Clients.All()
for _, conn = range all {
- err = Send(conn, framePing, serv.Options.ReadWriteTimeout)
- if err != nil {
- // Error on sending PING will be
- // assumed as bad connection.
- log.Printf(`%s: %s`, logp, err)
- serv.ClientRemove(conn)
+ select {
+ case serv.qpinger <- conn:
+ default:
+ numPinger = serv.numGoPinger.Load()
+ if numPinger < serv.Options.maxGoroutinePinger {
+ go serv.pinger()
+ serv.numGoPinger.Add(1)
+ serv.qpinger <- conn
+ } else {
+ go serv.delayPinger(conn)
+ }
}
+
}
case <-serv.running:
return
@@ -865,6 +872,43 @@ func (serv *Server) pinger() {
}
}
+func (serv *Server) pinger() {
+ var (
+ framePing = NewFramePing(false, nil)
+
+ conn int
+ err error
+ )
+ for conn = range serv.qpinger {
+ err = Send(conn, framePing, serv.Options.ReadWriteTimeout)
+ if err != nil {
+ // Error on sending PING will be assumed as bad
+ // connection.
+ serv.ClientRemove(conn)
+ }
+ }
+}
+
+func (serv *Server) delayPinger(conn int) {
+ var (
+ delay = 300 * time.Millisecond
+ total time.Duration
+ )
+ for total < serv.Options.ReadWriteTimeout {
+ time.Sleep(delay)
+ select {
+ case serv.qpinger <- conn:
+ return
+ default:
+ total += delay
+ }
+ }
+ var req = Frame{
+ closeCode: StatusInternalError,
+ }
+ serv.handleClose(conn, &req)
+}
+
// Start accepting incoming connection from clients.
func (serv *Server) Start() (err error) {
var logp = `Start`
@@ -886,7 +930,9 @@ func (serv *Server) Start() (err error) {
go serv.reader()
serv.numGoReader.Add(1)
+ go serv.pollPinger()
go serv.pinger()
+ serv.numGoPinger.Add(1)
var (
conn int
diff --git a/lib/websocket/server_options.go b/lib/websocket/server_options.go
index dc8399a7..0c169188 100644
--- a/lib/websocket/server_options.go
+++ b/lib/websocket/server_options.go
@@ -15,6 +15,7 @@ const (
defServerStatusPath = "/status"
defServerReadWriteTimeout = 30 * time.Second
+ defServerMaxGoroutinePinger = _maxQueue / 4
defServerMaxGoroutineReader = 1024
defServerMaxGoroutineUpgrader int32 = 128
)
@@ -72,6 +73,10 @@ type ServerOptions struct {
// Default to 30 seconds.
ReadWriteTimeout time.Duration
+ // maxGoroutinePinger define maximum number of goroutines to ping each
+ // connected clients at the same time.
+ maxGoroutinePinger int32
+
maxGoroutineReader int32
// maxGoroutineUpgrader define maximum goroutines running at the same
@@ -95,6 +100,9 @@ func (opts *ServerOptions) init() {
if opts.ReadWriteTimeout <= 0 {
opts.ReadWriteTimeout = defServerReadWriteTimeout
}
+ if opts.maxGoroutinePinger <= 0 {
+ opts.maxGoroutinePinger = defServerMaxGoroutinePinger
+ }
if opts.maxGoroutineReader <= 0 {
opts.maxGoroutineReader = defServerMaxGoroutineReader
}