summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2023-11-24 21:40:47 +0700
committerShulhan <ms@kilabit.info>2023-11-26 21:32:52 +0700
commit423c3fde1cd36d55c8fc4cdbf58394b5177dea05 (patch)
tree48c7a3852161bbff37524866cf25f08fb796a051
parent580c6809aa3cc28e9f616f0c365f4d61000d5af4 (diff)
downloadpakakeh.go-423c3fde1cd36d55c8fc4cdbf58394b5177dea05.tar.xz
lib/http: add method WriteRaw to SSEEndpoint
The WriteRaw method write raw event message directly, without any parsing.
-rw-r--r--lib/http/sse_endpoint.go10
-rw-r--r--lib/http/sseclient/sseclient.go13
-rw-r--r--lib/http/sseclient/sseclient_test.go131
-rw-r--r--lib/http/sseclient/testdata/write_raw_test.data42
4 files changed, 194 insertions, 2 deletions
diff --git a/lib/http/sse_endpoint.go b/lib/http/sse_endpoint.go
index 47c19eb7..b453883a 100644
--- a/lib/http/sse_endpoint.go
+++ b/lib/http/sse_endpoint.go
@@ -107,6 +107,16 @@ func (ep *SSEEndpoint) WriteMessage(msg string, id *string) (err error) {
return nil
}
+// WriteRaw write raw event message directly, without any parsing.
+func (ep *SSEEndpoint) WriteRaw(msg []byte) (err error) {
+ _, err = ep.bufrw.Write(msg)
+ if err != nil {
+ return fmt.Errorf(`WriteRaw: %w`, err)
+ }
+ ep.bufrw.Flush()
+ return nil
+}
+
// WriteRetry inform user how long they should wait, after disconnect,
// before re-connecting back to server.
//
diff --git a/lib/http/sseclient/sseclient.go b/lib/http/sseclient/sseclient.go
index 9cfeddd2..71bef4c0 100644
--- a/lib/http/sseclient/sseclient.go
+++ b/lib/http/sseclient/sseclient.go
@@ -4,6 +4,15 @@
// Package sseclient implement HTTP client for Server-Sent Events (SSE).
//
+// # Notes on implementation
+//
+// The SSE specification have inconsistent state when dispatching empty
+// data.
+// In the "9.2.6 Interpreting an event stream", if the data buffer is empty
+// it would return; but in the third example it can dispatch an empty
+// string.
+// In this implement we ignore an empty string in server and client.
+//
// References,
// - [whatwg.org Server-sent events]
//
@@ -342,8 +351,8 @@ func (cl *Client) parseEvent(raw []byte) {
}
data.Reset()
if ev.ID != cl.LastEventID {
- // Only set LastEventID if event
- // message is complete.
+ // Only set LastEventID if message
+ // is complete.
cl.LastEventID = ev.ID
}
}
diff --git a/lib/http/sseclient/sseclient_test.go b/lib/http/sseclient/sseclient_test.go
index fd26442b..cb1385f6 100644
--- a/lib/http/sseclient/sseclient_test.go
+++ b/lib/http/sseclient/sseclient_test.go
@@ -143,6 +143,137 @@ func TestClient(t *testing.T) {
test.Assert(t, `LastEventID`, cl.LastEventID, `1`)
}
+func TestClient_raw(t *testing.T) {
+ type testCase struct {
+ raw []byte
+ exp []Event
+ }
+
+ var (
+ tdata *test.Data
+ err error
+ )
+
+ tdata, err = test.LoadData(`testdata/write_raw_test.data`)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var cases = []testCase{{
+ raw: tdata.Input[`case/1`],
+ exp: []Event{{
+ Type: EventTypeOpen, // The first message always open.
+ }, {
+ Type: EventTypeMessage,
+ Data: "YHOO\n+2\n10",
+ }},
+ }, {
+ raw: tdata.Input[`case/2`],
+ exp: []Event{{
+ Type: EventTypeMessage,
+ Data: `first event`,
+ ID: `1`,
+ }, {
+ Type: EventTypeMessage,
+ Data: `second event`,
+ }, {
+ Type: EventTypeMessage,
+ Data: ` third event`,
+ }},
+ }, {
+ // The SSE specification have inconsistent state when
+ // dispatching empty data.
+ // In the "9.2.6 Interpreting an event stream", if the data
+ // buffer is empty it would return; but in the example as
+ // tested in this case it would dispatch empty string.
+ raw: tdata.Input[`case/3`],
+ exp: []Event{{
+ Type: EventTypeMessage,
+ Data: "\n",
+ }},
+ }, {
+ raw: tdata.Input[`case/4`],
+ exp: []Event{{
+ Type: EventTypeMessage,
+ Data: `test`,
+ }, {
+ Type: EventTypeMessage,
+ Data: `test`,
+ }},
+ }, {
+ raw: tdata.Input[`case/5`],
+ exp: []Event{{
+ Type: `join`,
+ Data: "Named event\n with multiple\n ID",
+ ID: `2`,
+ }},
+ }}
+
+ var expq = make(chan Event)
+
+ var servercb = func(ep *libhttp.SSEEndpoint, _ *http.Request) {
+ var (
+ c testCase
+ ev Event
+ err error
+ x int
+ )
+ for x, c = range cases {
+ err = ep.WriteRaw([]byte(c.raw))
+ if err != nil {
+ t.Fatalf(`WriteRaw #%d: %s`, x, err)
+ }
+ for _, ev = range c.exp {
+ expq <- ev
+ }
+ }
+ }
+
+ var addr string
+
+ addr, err = testRunSSEServer(t, servercb)
+ if err != nil {
+ t.Fatal(`testRunSSEServer:`, err)
+ }
+
+ var cl = Client{
+ Endpoint: fmt.Sprintf(`http://%s/sse`, addr),
+ }
+
+ err = cl.Connect(nil)
+ if err != nil {
+ t.Fatal(`Connect:`, err)
+ }
+
+ var (
+ timeout = 1 * time.Second
+ ticker = time.NewTicker(timeout)
+
+ c testCase
+ expEvent Event
+ gotEvent Event
+ tag string
+ x, y int
+ )
+ for x, c = range cases {
+ for y = range c.exp {
+ tag = fmt.Sprintf(`Case #%d/#%d`, x, y)
+
+ select {
+ case <-ticker.C:
+ t.Fatalf(`%s: timeout`, tag)
+
+ case gotEvent = <-cl.C:
+ expEvent = <-expq
+ test.Assert(t, tag, expEvent, gotEvent)
+ }
+ ticker.Reset(timeout)
+ }
+ }
+ _ = cl.Close()
+ test.Assert(t, `LastEventID`, `2`, cl.LastEventID)
+}
+
// testGenerateAddress generate random port for server address.
func testGenerateAddress() (addr string) {
var port = rand.Int() % 60000
diff --git a/lib/http/sseclient/testdata/write_raw_test.data b/lib/http/sseclient/testdata/write_raw_test.data
new file mode 100644
index 00000000..a689f2aa
--- /dev/null
+++ b/lib/http/sseclient/testdata/write_raw_test.data
@@ -0,0 +1,42 @@
+Test Server write WriteRaw.
+
+>>> case/1
+data: YHOO
+data: +2
+data: 10
+
+
+>>> case/2
+: test stream
+
+data: first event
+id: 1
+
+data:second event
+id
+
+data: third event
+
+
+>>> case/3
+data
+
+data
+data
+
+data:
+
+>>> case/4
+data:test
+
+data: test
+
+
+>>> case/5
+event: join
+data: Named event
+data: with multiple
+data: ID
+id: 1
+id: 2
+