summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2026-02-06 07:08:29 +0700
committerShulhan <ms@kilabit.info>2026-02-06 07:08:29 +0700
commit54d395f53e344a3cf3861c0e3d2f82ad14ecc602 (patch)
tree9f19525f8b950ddd9d715d9abe71e5545df4d65e
parenta3c27356bfe3e56809f579b617e02829a7ea0a68 (diff)
downloadawwan-54d395f53e344a3cf3861c0e3d2f82ad14ecc602.tar.xz
all: improve the Server-Sent Events (SSE) output
In the ExecResponse, store the event in the Output instead of message data, so the server can iterate the Output directly and pass it to WriteEvent directly. The event ID now start at 1 with type "begin". This is to minimize confusion when comparing empty Last-Event-ID from client, which is equal to 0.
-rw-r--r--awwan.go7
-rw-r--r--exec_response.go26
-rw-r--r--http_server.go43
-rw-r--r--http_server_test.go34
-rw-r--r--testdata/http_server/execute/cancel_test.data23
-rw-r--r--testdata/http_server/execute/local_test.data21
6 files changed, 82 insertions, 72 deletions
diff --git a/awwan.go b/awwan.go
index 82142aa..e14e783 100644
--- a/awwan.go
+++ b/awwan.go
@@ -389,6 +389,7 @@ func (aww *Awwan) Local(ctx context.Context, req *ExecRequest) (err error) {
goto out
}
}
+ req.mlog.Flush()
}
req.mlog.Outf(`=== END: %s %s %s`, req.Mode, req.Script, req.LineRange)
out:
@@ -499,6 +500,12 @@ func (aww *Awwan) Serve(listener net.Listener, address string, isDev bool) (err
return aww.httpd.start()
}
+// Stop the HTTP server from calling [Awwan.Serve].
+func (aww *Awwan) Stop() (err error) {
+ err = aww.httpd.Server.Shutdown(context.Background())
+ return err
+}
+
// loadSSHConfig load all SSH config from user's home and the awwan base
// directory.
func (aww *Awwan) loadSSHConfig() (err error) {
diff --git a/exec_response.go b/exec_response.go
index 92774cf..9f700a1 100644
--- a/exec_response.go
+++ b/exec_response.go
@@ -36,7 +36,9 @@ type ExecResponse struct {
Error string `json:"error"`
- Output []string `json:"output"`
+ Output []sseclient.Event `json:"output"`
+
+ id int
// mtxOutput protect read/write on Output.
mtxOutput sync.Mutex
@@ -53,10 +55,18 @@ func newExecResponse(req *ExecRequest) (execRes *ExecResponse) {
ID: fmt.Sprintf(`%s:%s:%s:%d`, req.Mode, req.Script, req.LineRange, now.Unix()),
BeginAt: now.Format(time.RFC3339),
- Output: make([]string, 0, 8),
+ Output: make([]sseclient.Event, 0, 512),
eventq: make(chan sseclient.Event, 512),
}
+ execRes.id++
+ var ev = sseclient.Event{
+ Type: `begin`,
+ Data: execRes.BeginAt,
+ ID: strconv.Itoa(execRes.id),
+ }
+ execRes.Output = append(execRes.Output, ev)
+
// Use the ExecResponse itself as handler for output.
req.registerLogWriter(`response`, execRes)
@@ -71,18 +81,19 @@ func (execRes *ExecResponse) Write(out []byte) (n int, err error) {
}
execRes.mtxOutput.Lock()
+ defer execRes.mtxOutput.Unlock()
+
+ execRes.id++
var ev = sseclient.Event{
Data: string(out),
- ID: strconv.FormatInt(int64(len(execRes.Output)), 10),
+ ID: strconv.Itoa(execRes.id),
}
-
- execRes.Output = append(execRes.Output, ev.Data)
+ execRes.Output = append(execRes.Output, ev)
select {
case execRes.eventq <- ev:
default:
}
- execRes.mtxOutput.Unlock()
return len(out), nil
}
@@ -106,8 +117,11 @@ func (execRes *ExecResponse) end(execErr error) {
execRes.EndAt = timeNow().UTC().Format(time.RFC3339)
+ execRes.id++
ev.Type = `end`
ev.Data = execRes.EndAt
+ ev.ID = strconv.Itoa(execRes.id)
+ execRes.Output = append(execRes.Output, ev)
select {
case execRes.eventq <- ev:
diff --git a/http_server.go b/http_server.go
index f2a35b4..e0935c5 100644
--- a/http_server.go
+++ b/http_server.go
@@ -714,6 +714,9 @@ func (httpd *httpServer) Execute(epr *libhttp.EndpointRequest) (resb []byte, err
res.Code = http.StatusOK
res.Data = execRes
+ execRes.mtxOutput.Lock()
+ defer execRes.mtxOutput.Unlock()
+
resb, err = json.Marshal(res)
if err != nil {
res.Message = fmt.Sprintf(`%s: %s`, logp, err)
@@ -821,42 +824,24 @@ func (httpd *httpServer) ExecuteTail(sseconn *libhttp.SSEConn) {
var (
lastEventIDStr = sseconn.HTTPRequest.Header.Get(libhttp.HeaderLastEventID)
lastEventID int64
+ ok bool
)
if len(lastEventIDStr) != 0 {
lastEventID, _ = strconv.ParseInt(lastEventIDStr, 10, 64)
}
- if lastEventID == 0 {
- _ = sseconn.WriteEvent(`begin`, execRes.BeginAt, nil)
- }
execRes.mtxOutput.Lock()
- if lastEventID < int64(len(execRes.Output)) {
- // Send out the existing output based on request
- // Last-Event-ID ...
- var (
- idx int
- out string
- idstr string
- )
- for idx, out = range execRes.Output[int(lastEventID):] {
- idstr = strconv.FormatInt(int64(idx), 10)
- _ = sseconn.WriteEvent(``, out, &idstr)
- }
- lastEventID = int64(idx)
+ // Send out the existing output based on request
+ // Last-Event-ID ...
+ var ev sseclient.Event
+ for lastEventID < int64(len(execRes.Output)) {
+ ev = execRes.Output[int(lastEventID)]
+ _ = sseconn.WriteEvent(ev.Type, ev.Data, &ev.ID)
+ lastEventID++
}
- execRes.mtxOutput.Unlock()
-
- var (
- ok bool
- ev sseclient.Event
- evid int64
- )
-
- execRes.mtxOutput.Lock()
if len(execRes.EndAt) != 0 {
// The execution has been completed.
- _ = sseconn.WriteEvent(`end`, execRes.EndAt, nil)
execRes.mtxOutput.Unlock()
goto out
}
@@ -870,13 +855,9 @@ func (httpd *httpServer) ExecuteTail(sseconn *libhttp.SSEConn) {
// Channel has been closed.
break
}
- if len(ev.ID) == 0 {
- _ = sseconn.WriteEvent(ev.Type, ev.Data, nil)
- continue
- }
// Skip event where ID is less than last ID from output.
- evid = ev.IDInt()
+ evid := ev.IDInt()
if evid <= lastEventID {
continue
}
diff --git a/http_server_test.go b/http_server_test.go
index f6a32aa..d5cb270 100644
--- a/http_server_test.go
+++ b/http_server_test.go
@@ -222,6 +222,9 @@ func TestHttpServer_Execute(t *testing.T) {
log.Fatal(err)
}
}()
+ t.Cleanup(func() {
+ aww.Stop()
+ })
err = libnet.WaitAlive(`tcp`, address, 10*time.Second)
if err != nil {
@@ -299,25 +302,19 @@ func TestHttpServer_Execute(t *testing.T) {
for ever {
select {
case ev = <-ssec.C:
- if len(ev.Type) != 0 {
- fmt.Fprintf(&buf, "event: %s\n", ev.Type)
- }
- if len(ev.Data) != 0 {
- fmt.Fprintf(&buf, "data: %q\n", ev.Data)
- }
- if len(ev.ID) != 0 {
- fmt.Fprintf(&buf, "id: %s\n", ev.ID)
- }
- buf.WriteByte('\n')
+ fmt.Fprintf(&buf, "event: %s\ndata: %q\nid: %s\n\n",
+ ev.Type, ev.Data, ev.ID)
if ev.Type == "end" {
ever = false
break
}
+ timeWait.Reset(time.Second)
case <-timeWait.C:
break
}
}
+ timeWait.Stop()
test.Assert(t, `Execute tail`, string(tdata.Output[`local:/local.aww:1-:tail`]), buf.String())
}
@@ -353,6 +350,9 @@ func TestHttpServer_ExecuteCancel(t *testing.T) {
log.Fatal(err)
}
}()
+ t.Cleanup(func() {
+ aww.Stop()
+ })
err = libnet.WaitAlive(`tcp`, address, 10*time.Second)
if err != nil {
@@ -430,16 +430,8 @@ func TestHttpServer_ExecuteCancel(t *testing.T) {
for ever {
select {
case ev = <-ssec.C:
- if len(ev.Type) != 0 {
- fmt.Fprintf(&buf, "event: %s\n", ev.Type)
- }
- if len(ev.Data) != 0 {
- fmt.Fprintf(&buf, "data: %q\n", ev.Data)
- }
- if len(ev.ID) != 0 {
- fmt.Fprintf(&buf, "id: %s\n", ev.ID)
- }
- buf.WriteByte('\n')
+ fmt.Fprintf(&buf, "event: %s\ndata: %q\nid: %s\n\n",
+ ev.Type, ev.Data, ev.ID)
if ev.ID == `1` {
testDoExecuteCancel(t, tdata, cl, execResp.ID)
@@ -449,10 +441,12 @@ func TestHttpServer_ExecuteCancel(t *testing.T) {
ever = false
break
}
+ timeWait.Reset(time.Second)
case <-timeWait.C:
break
}
}
+ timeWait.Stop()
test.Assert(t, `Execute cancel`, string(tdata.Output[`local:/cancel.aww:1-:tail`]), buf.String())
}
diff --git a/testdata/http_server/execute/cancel_test.data b/testdata/http_server/execute/cancel_test.data
index edf21f9..2db8d38 100644
--- a/testdata/http_server/execute/cancel_test.data
+++ b/testdata/http_server/execute/cancel_test.data
@@ -18,36 +18,41 @@ The SSE data is quoted to make the string viewable.
"begin_at": "2023-11-26T15:21:00Z",
"end_at": "",
"error": "",
- "output": []
+ "output": [
+ {
+ "Type": "begin",
+ "Data": "2023-11-26T15:21:00Z",
+ "ID": "1"
+ }
+ ]
},
"code": 200
}
<<< local:/cancel.aww:1-:tail
event: open
+data: ""
+id:
event: begin
data: "2023-11-26T15:21:00Z"
+id: 1
event: message
data: " === BEGIN: local /cancel.aww 1-\n"
-id: 0
+id: 2
event: message
data: " --> 1: sleep 300\n"
-id: 1
+id: 3
event: message
data: " !!! ExecLocal: signal: killed\n"
-id: 2
-
-event: message
-data: "Local: ExecLocal: signal: killed"
-id: 2
+id: 4
event: end
data: "2023-11-26T15:21:00Z"
-id: 2
+id: 5
diff --git a/testdata/http_server/execute/local_test.data b/testdata/http_server/execute/local_test.data
index 5f96637..0c573b8 100644
--- a/testdata/http_server/execute/local_test.data
+++ b/testdata/http_server/execute/local_test.data
@@ -16,35 +16,44 @@
"begin_at": "2023-11-26T15:21:00Z",
"end_at": "",
"error": "",
- "output": []
+ "output": [
+ {
+ "Type": "begin",
+ "Data": "2023-11-26T15:21:00Z",
+ "ID": "1"
+ }
+ ]
},
"code": 200
}
<<< local:/local.aww:1-:tail
event: open
+data: ""
+id:
event: begin
data: "2023-11-26T15:21:00Z"
+id: 1
event: message
data: " === BEGIN: local /local.aww 1-\n"
-id: 0
+id: 2
event: message
data: " --> 1: echo \"test\"\n"
-id: 1
+id: 3
event: message
data: "test\n"
-id: 2
+id: 4
event: message
data: " === END: local /local.aww 1-\n"
-id: 3
+id: 5
event: end
data: "2023-11-26T15:21:00Z"
-id: 3
+id: 6