diff options
| -rw-r--r-- | src/net/http/http.go | 5 | ||||
| -rw-r--r-- | src/net/http/serve_test.go | 12 | ||||
| -rw-r--r-- | src/net/http/server.go | 249 |
3 files changed, 170 insertions, 96 deletions
diff --git a/src/net/http/http.go b/src/net/http/http.go index 258efbb152..b2130b11a8 100644 --- a/src/net/http/http.go +++ b/src/net/http/http.go @@ -6,6 +6,7 @@ package http import ( "strings" + "time" "unicode/utf8" "golang_org/x/net/lex/httplex" @@ -15,6 +16,10 @@ import ( // Transport's byte-limiting readers. const maxInt64 = 1<<63 - 1 +// aLongTimeAgo is a non-zero time, far in the past, used for +// immediate cancelation of network operations. +var aLongTimeAgo = time.Unix(233431200, 0) + // TODO(bradfitz): move common stuff here. The other files have accumulated // generic http stuff in random places. diff --git a/src/net/http/serve_test.go b/src/net/http/serve_test.go index db72e70e35..5e12902ba7 100644 --- a/src/net/http/serve_test.go +++ b/src/net/http/serve_test.go @@ -4202,13 +4202,11 @@ func testServerRequestContextCancel_ServeHTTPDone(t *testing.T, h2 bool) { } } +// Tests that the Request.Context available to the Handler is canceled +// if the peer closes their TCP connection. This requires that the server +// is always blocked in a Read call so it notices the EOF from the client. +// See issues 15927 and 15224. func TestServerRequestContextCancel_ConnClose(t *testing.T) { - // Currently the context is not canceled when the connection - // is closed because we're not reading from the connection - // until after ServeHTTP for the previous handler is done. - // Until the server code is modified to always be in a read - // (Issue 15224), this test doesn't work yet. - t.Skip("TODO(bradfitz): this test doesn't yet work; golang.org/issue/15224") defer afterTest(t) inHandler := make(chan struct{}) handlerDone := make(chan struct{}) @@ -4237,7 +4235,7 @@ func TestServerRequestContextCancel_ConnClose(t *testing.T) { select { case <-handlerDone: - case <-time.After(3 * time.Second): + case <-time.After(4 * time.Second): t.Fatalf("timeout waiting to see ServeHTTP exit") } } diff --git a/src/net/http/server.go b/src/net/http/server.go index f102488310..3c6b96c5be 100644 --- a/src/net/http/server.go +++ b/src/net/http/server.go @@ -208,6 +208,9 @@ type conn struct { // Immutable; never nil. server *Server + // cancelCtx cancels the connection-level context. + cancelCtx context.CancelFunc + // rwc is the underlying network connection. // This is never wrapped by other types and is the value given out // to CloseNotifier callers. It is usually of type *net.TCPConn or @@ -247,6 +250,8 @@ type conn struct { // mu guards hijackedv, use of bufr, (*response).closeNotifyCh. mu sync.Mutex + curReq atomic.Value // of *response (which has a Request in it) + // hijackedv is whether this connection has been hijacked // by a Handler with the Hijacker interface. // It is guarded by mu. @@ -264,8 +269,12 @@ func (c *conn) hijackLocked() (rwc net.Conn, buf *bufio.ReadWriter, err error) { if c.hijackedv { return nil, nil, ErrHijacked } + c.r.abortPendingRead() + c.hijackedv = true rwc = c.rwc + rwc.SetDeadline(time.Time{}) + buf = bufio.NewReadWriter(c.bufr, bufio.NewWriter(rwc)) c.setState(rwc, StateHijacked) return @@ -415,9 +424,11 @@ type response struct { dateBuf [len(TimeFormat)]byte clenBuf [10]byte - // closeNotifyCh is non-nil once CloseNotify is called. - // Guarded by conn.mu - closeNotifyCh <-chan bool + // closeNotifyCh is the channel returned by CloseNotify. + // TODO(bradfitz): this is currently (for Go 1.8) always + // non-nil. Make this lazily-created again as it used to be. + closeNotifyCh chan bool + didCloseNotify int32 // atomic (only 0->1 winner should send) } type atomicBool int32 @@ -550,60 +561,148 @@ type readResult struct { // call blocked in a background goroutine to wait for activity and // trigger a CloseNotifier channel. type connReader struct { - r io.Reader - remain int64 // bytes remaining + conn *conn + + mu sync.Mutex // guards following + hasByte bool + byteBuf [1]byte + bgErr error // non-nil means error happened on background read + cond *sync.Cond + inRead bool + aborted bool // set true before conn.rwc deadline is set to past + remain int64 // bytes remaining +} + +func (cr *connReader) lock() { + cr.mu.Lock() + if cr.cond == nil { + cr.cond = sync.NewCond(&cr.mu) + } +} + +func (cr *connReader) unlock() { cr.mu.Unlock() } - // ch is non-nil if a background read is in progress. - // It is guarded by conn.mu. - ch chan readResult +func (cr *connReader) startBackgroundRead() { + cr.lock() + defer cr.unlock() + if cr.inRead { + panic("invalid concurrent Body.Read call") + } + cr.inRead = true + go cr.backgroundRead() +} + +func (cr *connReader) backgroundRead() { + n, err := cr.conn.rwc.Read(cr.byteBuf[:]) + cr.lock() + if n == 1 { + cr.hasByte = true + // We were at EOF already (since we wouldn't be in a + // background read otherwise), so this is a pipelined + // HTTP request. + cr.closeNotifyFromPipelinedRequest() + } + if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() { + // Ignore this error. It's the expected error from + // another goroutine calling abortPendingRead. + } else if err != nil { + cr.handleReadError(err) + } + cr.aborted = false + cr.inRead = false + cr.unlock() + cr.cond.Broadcast() +} + +func (cr *connReader) abortPendingRead() { + cr.lock() + defer cr.unlock() + if !cr.inRead { + return + } + cr.aborted = true + cr.conn.rwc.SetReadDeadline(aLongTimeAgo) + for cr.inRead { + cr.cond.Wait() + } + cr.conn.rwc.SetReadDeadline(time.Time{}) } func (cr *connReader) setReadLimit(remain int64) { cr.remain = remain } func (cr *connReader) setInfiniteReadLimit() { cr.remain = maxInt64 } func (cr *connReader) hitReadLimit() bool { return cr.remain <= 0 } +// may be called from multiple goroutines. +func (cr *connReader) handleReadError(err error) { + cr.conn.cancelCtx() + cr.closeNotify() +} + +// closeNotifyFromPipelinedRequest simply calls closeNotify. +// +// This method wrapper is here for documentation. The callers are the +// cases where we send on the closenotify channel because of a +// pipelined HTTP request, per the previous Go behavior and +// documentation (that this "MAY" happen). +// +// TODO: consider changing this behavior and making context +// cancelation and closenotify work the same. +func (cr *connReader) closeNotifyFromPipelinedRequest() { + cr.closeNotify() +} + +// may be called from multiple goroutines. +func (cr *connReader) closeNotify() { + res, _ := cr.conn.curReq.Load().(*response) + if res != nil { + if atomic.CompareAndSwapInt32(&res.didCloseNotify, 0, 1) { + res.closeNotifyCh <- true + } + } +} + func (cr *connReader) Read(p []byte) (n int, err error) { + cr.lock() + if cr.inRead { + cr.unlock() + panic("invalid concurrent Body.Read call") + } if cr.hitReadLimit() { + cr.unlock() return 0, io.EOF } + if cr.bgErr != nil { + err = cr.bgErr + cr.unlock() + return 0, err + } if len(p) == 0 { - return + cr.unlock() + return 0, nil } if int64(len(p)) > cr.remain { p = p[:cr.remain] } - - // Is a background read (started by CloseNotifier) already in - // flight? If so, wait for it and use its result. - ch := cr.ch - if ch != nil { - cr.ch = nil - res := <-ch - if res.n == 1 { - p[0] = res.b - cr.remain -= 1 - } - return res.n, res.err + if cr.hasByte { + p[0] = cr.byteBuf[0] + cr.hasByte = false + cr.unlock() + return 1, nil } - n, err = cr.r.Read(p) - cr.remain -= int64(n) - return -} + cr.inRead = true + cr.unlock() + n, err = cr.conn.rwc.Read(p) -func (cr *connReader) startBackgroundRead(onReadComplete func()) { - if cr.ch != nil { - // Background read already started. - return + cr.lock() + cr.inRead = false + if err != nil { + cr.handleReadError(err) } - cr.ch = make(chan readResult, 1) - go cr.closeNotifyAwaitActivityRead(cr.ch, onReadComplete) -} + cr.remain -= int64(n) + cr.unlock() -func (cr *connReader) closeNotifyAwaitActivityRead(ch chan<- readResult, onReadComplete func()) { - var buf [1]byte - n, err := cr.r.Read(buf[:1]) - onReadComplete() - ch <- readResult{n, err, buf[0]} + cr.cond.Broadcast() + return n, err } var ( @@ -818,6 +917,7 @@ func (c *conn) readRequest(ctx context.Context) (w *response, err error) { reqBody: req.Body, handlerHeader: make(Header), contentLength: -1, + closeNotifyCh: make(chan bool, 1), // We populate these ahead of time so we're not // reading from req.Header after their Handler starts @@ -1358,6 +1458,8 @@ func (w *response) finishRequest() { w.cw.close() w.conn.bufw.Flush() + w.conn.r.abortPendingRead() + // Close the body (regardless of w.closeAfterReply) so we can // re-use its bufio.Reader later safely. w.reqBody.Close() @@ -1525,13 +1627,14 @@ func (c *conn) serve(ctx context.Context) { // HTTP/1.x from here on. - c.r = &connReader{r: c.rwc} - c.bufr = newBufioReader(c.r) - c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10) - ctx, cancelCtx := context.WithCancel(ctx) + c.cancelCtx = cancelCtx defer cancelCtx() + c.r = &connReader{conn: c} + c.bufr = newBufioReader(c.r) + c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10) + for { w, err := c.readRequest(ctx) if c.r.remain != c.server.initialReadLimitSize() { @@ -1575,11 +1678,24 @@ func (c *conn) serve(ctx context.Context) { return } + c.curReq.Store(w) + + if requestBodyRemains(req.Body) { + registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead) + } else { + if w.conn.bufr.Buffered() > 0 { + w.conn.r.closeNotifyFromPipelinedRequest() + } + w.conn.r.startBackgroundRead() + } + // HTTP cannot have multiple simultaneous active requests.[*] // Until the server replies to this request, it can't read another, // so we might as well run the handler in this goroutine. // [*] Not strictly true: HTTP pipelining. We could let them all process // in parallel even if their responses need to be serialized. + // But we're not going to implement HTTP pipelining because it + // was never deployed in the wild and the answer is HTTP/2. serverHandler{c.server}.ServeHTTP(w, w.req) w.cancelCtx() if c.hijacked() { @@ -1593,6 +1709,7 @@ func (c *conn) serve(ctx context.Context) { return } c.setState(c.rwc, StateIdle) + c.curReq.Store((*response)(nil)) } } @@ -1628,10 +1745,6 @@ func (w *response) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error) { c.mu.Lock() defer c.mu.Unlock() - if w.closeNotifyCh != nil { - return nil, nil, errors.New("http: Hijack is incompatible with use of CloseNotifier in same ServeHTTP call") - } - // Release the bufioWriter that writes to the chunk writer, it is not // used after a connection has been hijacked. rwc, buf, err = c.hijackLocked() @@ -1646,50 +1759,7 @@ func (w *response) CloseNotify() <-chan bool { if w.handlerDone.isSet() { panic("net/http: CloseNotify called after ServeHTTP finished") } - c := w.conn - c.mu.Lock() - defer c.mu.Unlock() - - if w.closeNotifyCh != nil { - return w.closeNotifyCh - } - ch := make(chan bool, 1) - w.closeNotifyCh = ch - - if w.conn.hijackedv { - // CloseNotify is undefined after a hijack, but we have - // no place to return an error, so just return a channel, - // even though it'll never receive a value. - return ch - } - - var once sync.Once - notify := func() { once.Do(func() { ch <- true }) } - - if requestBodyRemains(w.reqBody) { - // They're still consuming the request body, so we - // shouldn't notify yet. - registerOnHitEOF(w.reqBody, func() { - c.mu.Lock() - defer c.mu.Unlock() - startCloseNotifyBackgroundRead(c, notify) - }) - } else { - startCloseNotifyBackgroundRead(c, notify) - } - return ch -} - -// c.mu must be held. -func startCloseNotifyBackgroundRead(c *conn, notify func()) { - if c.bufr.Buffered() > 0 { - // They've consumed the request body, so anything - // remaining is a pipelined request, which we - // document as firing on. - notify() - } else { - c.r.startBackgroundRead(notify) - } + return w.closeNotifyCh } func registerOnHitEOF(rc io.ReadCloser, fn func()) { @@ -2725,6 +2795,7 @@ func (w checkConnErrorWriter) Write(p []byte) (n int, err error) { n, err = w.c.rwc.Write(p) if err != nil && w.c.werr == nil { w.c.werr = err + w.c.cancelCtx() } return } |
