diff options
| author | Shulhan <ms@kilabit.info> | 2023-11-24 21:40:47 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2023-11-26 21:32:52 +0700 |
| commit | 423c3fde1cd36d55c8fc4cdbf58394b5177dea05 (patch) | |
| tree | 48c7a3852161bbff37524866cf25f08fb796a051 | |
| parent | 580c6809aa3cc28e9f616f0c365f4d61000d5af4 (diff) | |
| download | pakakeh.go-423c3fde1cd36d55c8fc4cdbf58394b5177dea05.tar.xz | |
lib/http: add method WriteRaw to SSEEndpoint
The WriteRaw method write raw event message directly, without any
parsing.
| -rw-r--r-- | lib/http/sse_endpoint.go | 10 | ||||
| -rw-r--r-- | lib/http/sseclient/sseclient.go | 13 | ||||
| -rw-r--r-- | lib/http/sseclient/sseclient_test.go | 131 | ||||
| -rw-r--r-- | lib/http/sseclient/testdata/write_raw_test.data | 42 |
4 files changed, 194 insertions, 2 deletions
diff --git a/lib/http/sse_endpoint.go b/lib/http/sse_endpoint.go index 47c19eb7..b453883a 100644 --- a/lib/http/sse_endpoint.go +++ b/lib/http/sse_endpoint.go @@ -107,6 +107,16 @@ func (ep *SSEEndpoint) WriteMessage(msg string, id *string) (err error) { return nil } +// WriteRaw write raw event message directly, without any parsing. +func (ep *SSEEndpoint) WriteRaw(msg []byte) (err error) { + _, err = ep.bufrw.Write(msg) + if err != nil { + return fmt.Errorf(`WriteRaw: %w`, err) + } + ep.bufrw.Flush() + return nil +} + // WriteRetry inform user how long they should wait, after disconnect, // before re-connecting back to server. // diff --git a/lib/http/sseclient/sseclient.go b/lib/http/sseclient/sseclient.go index 9cfeddd2..71bef4c0 100644 --- a/lib/http/sseclient/sseclient.go +++ b/lib/http/sseclient/sseclient.go @@ -4,6 +4,15 @@ // Package sseclient implement HTTP client for Server-Sent Events (SSE). // +// # Notes on implementation +// +// The SSE specification have inconsistent state when dispatching empty +// data. +// In the "9.2.6 Interpreting an event stream", if the data buffer is empty +// it would return; but in the third example it can dispatch an empty +// string. +// In this implement we ignore an empty string in server and client. +// // References, // - [whatwg.org Server-sent events] // @@ -342,8 +351,8 @@ func (cl *Client) parseEvent(raw []byte) { } data.Reset() if ev.ID != cl.LastEventID { - // Only set LastEventID if event - // message is complete. + // Only set LastEventID if message + // is complete. cl.LastEventID = ev.ID } } diff --git a/lib/http/sseclient/sseclient_test.go b/lib/http/sseclient/sseclient_test.go index fd26442b..cb1385f6 100644 --- a/lib/http/sseclient/sseclient_test.go +++ b/lib/http/sseclient/sseclient_test.go @@ -143,6 +143,137 @@ func TestClient(t *testing.T) { test.Assert(t, `LastEventID`, cl.LastEventID, `1`) } +func TestClient_raw(t *testing.T) { + type testCase struct { + raw []byte + exp []Event + } + + var ( + tdata *test.Data + err error + ) + + tdata, err = test.LoadData(`testdata/write_raw_test.data`) + if err != nil { + t.Fatal(err) + } + + var cases = []testCase{{ + raw: tdata.Input[`case/1`], + exp: []Event{{ + Type: EventTypeOpen, // The first message always open. + }, { + Type: EventTypeMessage, + Data: "YHOO\n+2\n10", + }}, + }, { + raw: tdata.Input[`case/2`], + exp: []Event{{ + Type: EventTypeMessage, + Data: `first event`, + ID: `1`, + }, { + Type: EventTypeMessage, + Data: `second event`, + }, { + Type: EventTypeMessage, + Data: ` third event`, + }}, + }, { + // The SSE specification have inconsistent state when + // dispatching empty data. + // In the "9.2.6 Interpreting an event stream", if the data + // buffer is empty it would return; but in the example as + // tested in this case it would dispatch empty string. + raw: tdata.Input[`case/3`], + exp: []Event{{ + Type: EventTypeMessage, + Data: "\n", + }}, + }, { + raw: tdata.Input[`case/4`], + exp: []Event{{ + Type: EventTypeMessage, + Data: `test`, + }, { + Type: EventTypeMessage, + Data: `test`, + }}, + }, { + raw: tdata.Input[`case/5`], + exp: []Event{{ + Type: `join`, + Data: "Named event\n with multiple\n ID", + ID: `2`, + }}, + }} + + var expq = make(chan Event) + + var servercb = func(ep *libhttp.SSEEndpoint, _ *http.Request) { + var ( + c testCase + ev Event + err error + x int + ) + for x, c = range cases { + err = ep.WriteRaw([]byte(c.raw)) + if err != nil { + t.Fatalf(`WriteRaw #%d: %s`, x, err) + } + for _, ev = range c.exp { + expq <- ev + } + } + } + + var addr string + + addr, err = testRunSSEServer(t, servercb) + if err != nil { + t.Fatal(`testRunSSEServer:`, err) + } + + var cl = Client{ + Endpoint: fmt.Sprintf(`http://%s/sse`, addr), + } + + err = cl.Connect(nil) + if err != nil { + t.Fatal(`Connect:`, err) + } + + var ( + timeout = 1 * time.Second + ticker = time.NewTicker(timeout) + + c testCase + expEvent Event + gotEvent Event + tag string + x, y int + ) + for x, c = range cases { + for y = range c.exp { + tag = fmt.Sprintf(`Case #%d/#%d`, x, y) + + select { + case <-ticker.C: + t.Fatalf(`%s: timeout`, tag) + + case gotEvent = <-cl.C: + expEvent = <-expq + test.Assert(t, tag, expEvent, gotEvent) + } + ticker.Reset(timeout) + } + } + _ = cl.Close() + test.Assert(t, `LastEventID`, `2`, cl.LastEventID) +} + // testGenerateAddress generate random port for server address. func testGenerateAddress() (addr string) { var port = rand.Int() % 60000 diff --git a/lib/http/sseclient/testdata/write_raw_test.data b/lib/http/sseclient/testdata/write_raw_test.data new file mode 100644 index 00000000..a689f2aa --- /dev/null +++ b/lib/http/sseclient/testdata/write_raw_test.data @@ -0,0 +1,42 @@ +Test Server write WriteRaw. + +>>> case/1 +data: YHOO +data: +2 +data: 10 + + +>>> case/2 +: test stream + +data: first event +id: 1 + +data:second event +id + +data: third event + + +>>> case/3 +data + +data +data + +data: + +>>> case/4 +data:test + +data: test + + +>>> case/5 +event: join +data: Named event +data: with multiple +data: ID +id: 1 +id: 2 + |
