diff options
| author | Michael Anthony Knyszek <mknyszek@google.com> | 2024-06-03 21:46:39 +0000 |
|---|---|---|
| committer | Gopher Robot <gobot@golang.org> | 2024-06-04 16:19:04 +0000 |
| commit | b5a861782312d2b3a4f71e33d9a0c2b01a40fe5f (patch) | |
| tree | 27f868c3eba92440b03c9ae8436019bfe0bafdba /src/net/http | |
| parent | 499de42188ee0b0680aec4c49e25594456fdf15a (diff) | |
| download | go-b5a861782312d2b3a4f71e33d9a0c2b01a40fe5f.tar.xz | |
all: update vendored dependencies
The Go 1.23 code freeze has recently started. This is a time to update
all golang.org/x/... module versions that contribute packages to the
std and cmd modules in the standard library to latest master versions.
For #36905.
[git-generate]
go install golang.org/x/build/cmd/updatestd@latest
go install golang.org/x/tools/cmd/bundle@latest
updatestd -goroot=$(pwd) -branch=master
Change-Id: I9162f547c148809d6fb1e4157f6f504634cef3b7
Reviewed-on: https://go-review.googlesource.com/c/go/+/589935
Auto-Submit: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Matthew Dempsky <mdempsky@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'src/net/http')
| -rw-r--r-- | src/net/http/h2_bundle.go | 753 |
1 files changed, 196 insertions, 557 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 5f97a27ac2..0b305844ae 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -3525,13 +3525,6 @@ type http2stringWriter interface { WriteString(s string) (n int, err error) } -// A gate lets two goroutines coordinate their activities. -type http2gate chan struct{} - -func (g http2gate) Done() { g <- struct{}{} } - -func (g http2gate) Wait() { <-g } - // A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed). type http2closeWaiter chan struct{} @@ -3704,6 +3697,17 @@ func http2validPseudoPath(v string) bool { // any size (as long as it's first). type http2incomparable [0]func() +// synctestGroupInterface is the methods of synctestGroup used by Server and Transport. +// It's defined as an interface here to let us keep synctestGroup entirely test-only +// and not a part of non-test builds. +type http2synctestGroupInterface interface { + Join() + Now() time.Time + NewTimer(d time.Duration) http2timer + AfterFunc(d time.Duration, f func()) http2timer + ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) +} + // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like // io.Pipe except there are no PipeReader/PipeWriter halves, and the // underlying buffer is an interface. (io.Pipe is always unbuffered) @@ -3980,6 +3984,39 @@ type http2Server struct { // so that we don't embed a Mutex in this struct, which will make the // struct non-copyable, which might break some callers. state *http2serverInternalState + + // Synchronization group used for testing. + // Outside of tests, this is nil. + group http2synctestGroupInterface +} + +func (s *http2Server) markNewGoroutine() { + if s.group != nil { + s.group.Join() + } +} + +func (s *http2Server) now() time.Time { + if s.group != nil { + return s.group.Now() + } + return time.Now() +} + +// newTimer creates a new time.Timer, or a synthetic timer in tests. +func (s *http2Server) newTimer(d time.Duration) http2timer { + if s.group != nil { + return s.group.NewTimer(d) + } + return http2timeTimer{time.NewTimer(d)} +} + +// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests. +func (s *http2Server) afterFunc(d time.Duration, f func()) http2timer { + if s.group != nil { + return s.group.AfterFunc(d, f) + } + return http2timeTimer{time.AfterFunc(d, f)} } func (s *http2Server) initialConnRecvWindowSize() int32 { @@ -4226,6 +4263,10 @@ func (o *http2ServeConnOpts) handler() Handler { // // The opts parameter is optional. If nil, default values are used. func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { + s.serveConn(c, opts, nil) +} + +func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func(*http2serverConn)) { baseCtx, cancel := http2serverConnBaseContext(c, opts) defer cancel() @@ -4252,6 +4293,9 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { pushEnabled: true, sawClientPreface: opts.SawClientPreface, } + if newf != nil { + newf(sc) + } s.state.registerConn(sc) defer s.state.unregisterConn(sc) @@ -4425,8 +4469,8 @@ type http2serverConn struct { inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop needToSendGoAway bool // we need to schedule a GOAWAY frame write goAwayCode http2ErrCode - shutdownTimer *time.Timer // nil until used - idleTimer *time.Timer // nil if unused + shutdownTimer http2timer // nil until used + idleTimer http2timer // nil if unused // Owned by the writeFrameAsync goroutine: headerWriteBuf bytes.Buffer @@ -4475,12 +4519,12 @@ type http2stream struct { flow http2outflow // limits writing from Handler to client inflow http2inflow // what the client is allowed to POST/etc to us state http2streamState - resetQueued bool // RST_STREAM queued for write; set by sc.resetStream - gotTrailerHeader bool // HEADER frame for trailers was seen - wroteHeaders bool // whether we wrote headers (not status 100) - readDeadline *time.Timer // nil if unused - writeDeadline *time.Timer // nil if unused - closeErr error // set before cw is closed + resetQueued bool // RST_STREAM queued for write; set by sc.resetStream + gotTrailerHeader bool // HEADER frame for trailers was seen + wroteHeaders bool // whether we wrote headers (not status 100) + readDeadline http2timer // nil if unused + writeDeadline http2timer // nil if unused + closeErr error // set before cw is closed trailer Header // accumulated trailers reqTrailer Header // handler's Request.Trailer @@ -4561,11 +4605,7 @@ func http2isClosedConnError(err error) bool { return false } - // TODO: remove this string search and be more like the Windows - // case below. That might involve modifying the standard library - // to return better error types. - str := err.Error() - if strings.Contains(str, "use of closed network connection") { + if errors.Is(err, net.ErrClosed) { return true } @@ -4644,8 +4684,9 @@ type http2readFrameResult struct { // consumer is done with the frame. // It's run on its own goroutine. func (sc *http2serverConn) readFrames() { - gate := make(http2gate) - gateDone := gate.Done + sc.srv.markNewGoroutine() + gate := make(chan struct{}) + gateDone := func() { gate <- struct{}{} } for { f, err := sc.framer.ReadFrame() select { @@ -4676,6 +4717,7 @@ type http2frameWriteResult struct { // At most one goroutine can be running writeFrameAsync at a time per // serverConn. func (sc *http2serverConn) writeFrameAsync(wr http2FrameWriteRequest, wd *http2writeData) { + sc.srv.markNewGoroutine() var err error if wd == nil { err = wr.write.writeFrame(sc) @@ -4755,13 +4797,13 @@ func (sc *http2serverConn) serve() { sc.setConnState(StateIdle) if sc.srv.IdleTimeout > 0 { - sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) + sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) defer sc.idleTimer.Stop() } go sc.readFrames() // closed by defer sc.conn.Close above - settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer) + settingsTimer := sc.srv.afterFunc(http2firstSettingsTimeout, sc.onSettingsTimer) defer settingsTimer.Stop() loopNum := 0 @@ -4892,10 +4934,10 @@ func (sc *http2serverConn) readPreface() error { errc <- nil } }() - timer := time.NewTimer(http2prefaceTimeout) // TODO: configurable on *Server? + timer := sc.srv.newTimer(http2prefaceTimeout) // TODO: configurable on *Server? defer timer.Stop() select { - case <-timer.C: + case <-timer.C(): return http2errPrefaceTimeout case err := <-errc: if err == nil { @@ -5260,7 +5302,7 @@ func (sc *http2serverConn) goAway(code http2ErrCode) { func (sc *http2serverConn) shutDownIn(d time.Duration) { sc.serveG.check() - sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) + sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer) } func (sc *http2serverConn) resetStream(se http2StreamError) { @@ -5474,7 +5516,7 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) { delete(sc.streams, st.id) if len(sc.streams) == 0 { sc.setConnState(StateIdle) - if sc.srv.IdleTimeout > 0 { + if sc.srv.IdleTimeout > 0 && sc.idleTimer != nil { sc.idleTimer.Reset(sc.srv.IdleTimeout) } if http2h1ServerKeepAlivesDisabled(sc.hs) { @@ -5496,6 +5538,7 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) { } } st.closeErr = err + st.cancelCtx() st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc sc.writeSched.CloseStream(st.id) } @@ -5856,7 +5899,7 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { // (in Go 1.8), though. That's a more sane option anyway. if sc.hs.ReadTimeout > 0 { sc.conn.SetReadDeadline(time.Time{}) - st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout) + st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout) } return sc.scheduleHandler(id, rw, req, handler) @@ -5954,7 +5997,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState st.flow.add(sc.initialStreamSendWindowSize) st.inflow.init(sc.srv.initialStreamRecvWindowSize()) if sc.hs.WriteTimeout > 0 { - st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) + st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) } sc.streams[id] = st @@ -6178,6 +6221,7 @@ func (sc *http2serverConn) handlerDone() { // Run on its own goroutine. func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, handler func(ResponseWriter, *Request)) { + sc.srv.markNewGoroutine() defer sc.sendServeMsg(http2handlerDoneMsg) didPanic := true defer func() { @@ -6474,7 +6518,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) { var date string if _, ok := rws.snapHeader["Date"]; !ok { // TODO(bradfitz): be faster here, like net/http? measure. - date = time.Now().UTC().Format(TimeFormat) + date = rws.conn.srv.now().UTC().Format(TimeFormat) } for _, v := range rws.snapHeader["Trailer"] { @@ -6596,7 +6640,7 @@ func (rws *http2responseWriterState) promoteUndeclaredTrailers() { func (w *http2responseWriter) SetReadDeadline(deadline time.Time) error { st := w.rws.stream - if !deadline.IsZero() && deadline.Before(time.Now()) { + if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) { // If we're setting a deadline in the past, reset the stream immediately // so writes after SetWriteDeadline returns will fail. st.onReadTimeout() @@ -6612,9 +6656,9 @@ func (w *http2responseWriter) SetReadDeadline(deadline time.Time) error { if deadline.IsZero() { st.readDeadline = nil } else if st.readDeadline == nil { - st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout) + st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout) } else { - st.readDeadline.Reset(deadline.Sub(time.Now())) + st.readDeadline.Reset(deadline.Sub(sc.srv.now())) } }) return nil @@ -6622,7 +6666,7 @@ func (w *http2responseWriter) SetReadDeadline(deadline time.Time) error { func (w *http2responseWriter) SetWriteDeadline(deadline time.Time) error { st := w.rws.stream - if !deadline.IsZero() && deadline.Before(time.Now()) { + if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) { // If we're setting a deadline in the past, reset the stream immediately // so writes after SetWriteDeadline returns will fail. st.onWriteTimeout() @@ -6638,9 +6682,9 @@ func (w *http2responseWriter) SetWriteDeadline(deadline time.Time) error { if deadline.IsZero() { st.writeDeadline = nil } else if st.writeDeadline == nil { - st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout) + st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout) } else { - st.writeDeadline.Reset(deadline.Sub(time.Now())) + st.writeDeadline.Reset(deadline.Sub(sc.srv.now())) } }) return nil @@ -7116,328 +7160,19 @@ func (sc *http2serverConn) countError(name string, err error) error { return err } -// testSyncHooks coordinates goroutines in tests. -// -// For example, a call to ClientConn.RoundTrip involves several goroutines, including: -// - the goroutine running RoundTrip; -// - the clientStream.doRequest goroutine, which writes the request; and -// - the clientStream.readLoop goroutine, which reads the response. -// -// Using testSyncHooks, a test can start a RoundTrip and identify when all these goroutines -// are blocked waiting for some condition such as reading the Request.Body or waiting for -// flow control to become available. -// -// The testSyncHooks also manage timers and synthetic time in tests. -// This permits us to, for example, start a request and cause it to time out waiting for -// response headers without resorting to time.Sleep calls. -type http2testSyncHooks struct { - // active/inactive act as a mutex and condition variable. - // - // - neither chan contains a value: testSyncHooks is locked. - // - active contains a value: unlocked, and at least one goroutine is not blocked - // - inactive contains a value: unlocked, and all goroutines are blocked - active chan struct{} - inactive chan struct{} - - // goroutine counts - total int // total goroutines - condwait map[*sync.Cond]int // blocked in sync.Cond.Wait - blocked []*http2testBlockedGoroutine // otherwise blocked - - // fake time - now time.Time - timers []*http2fakeTimer - - // Transport testing: Report various events. - newclientconn func(*http2ClientConn) - newstream func(*http2clientStream) -} - -// testBlockedGoroutine is a blocked goroutine. -type http2testBlockedGoroutine struct { - f func() bool // blocked until f returns true - ch chan struct{} // closed when unblocked -} - -func http2newTestSyncHooks() *http2testSyncHooks { - h := &http2testSyncHooks{ - active: make(chan struct{}, 1), - inactive: make(chan struct{}, 1), - condwait: map[*sync.Cond]int{}, - } - h.inactive <- struct{}{} - h.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) - return h -} - -// lock acquires the testSyncHooks mutex. -func (h *http2testSyncHooks) lock() { - select { - case <-h.active: - case <-h.inactive: - } -} - -// waitInactive waits for all goroutines to become inactive. -func (h *http2testSyncHooks) waitInactive() { - for { - <-h.inactive - if !h.unlock() { - break - } - } -} - -// unlock releases the testSyncHooks mutex. -// It reports whether any goroutines are active. -func (h *http2testSyncHooks) unlock() (active bool) { - // Look for a blocked goroutine which can be unblocked. - blocked := h.blocked[:0] - unblocked := false - for _, b := range h.blocked { - if !unblocked && b.f() { - unblocked = true - close(b.ch) - } else { - blocked = append(blocked, b) - } - } - h.blocked = blocked - - // Count goroutines blocked on condition variables. - condwait := 0 - for _, count := range h.condwait { - condwait += count - } - - if h.total > condwait+len(blocked) { - h.active <- struct{}{} - return true - } else { - h.inactive <- struct{}{} - return false - } -} - -// goRun starts a new goroutine. -func (h *http2testSyncHooks) goRun(f func()) { - h.lock() - h.total++ - h.unlock() - go func() { - defer func() { - h.lock() - h.total-- - h.unlock() - }() - f() - }() -} - -// blockUntil indicates that a goroutine is blocked waiting for some condition to become true. -// It waits until f returns true before proceeding. -// -// Example usage: -// -// h.blockUntil(func() bool { -// // Is the context done yet? -// select { -// case <-ctx.Done(): -// default: -// return false -// } -// return true -// }) -// // Wait for the context to become done. -// <-ctx.Done() -// -// The function f passed to blockUntil must be non-blocking and idempotent. -func (h *http2testSyncHooks) blockUntil(f func() bool) { - if f() { - return - } - ch := make(chan struct{}) - h.lock() - h.blocked = append(h.blocked, &http2testBlockedGoroutine{ - f: f, - ch: ch, - }) - h.unlock() - <-ch -} - -// broadcast is sync.Cond.Broadcast. -func (h *http2testSyncHooks) condBroadcast(cond *sync.Cond) { - h.lock() - delete(h.condwait, cond) - h.unlock() - cond.Broadcast() -} - -// broadcast is sync.Cond.Wait. -func (h *http2testSyncHooks) condWait(cond *sync.Cond) { - h.lock() - h.condwait[cond]++ - h.unlock() -} - -// newTimer creates a new fake timer. -func (h *http2testSyncHooks) newTimer(d time.Duration) http2timer { - h.lock() - defer h.unlock() - t := &http2fakeTimer{ - hooks: h, - when: h.now.Add(d), - c: make(chan time.Time), - } - h.timers = append(h.timers, t) - return t -} - -// afterFunc creates a new fake AfterFunc timer. -func (h *http2testSyncHooks) afterFunc(d time.Duration, f func()) http2timer { - h.lock() - defer h.unlock() - t := &http2fakeTimer{ - hooks: h, - when: h.now.Add(d), - f: f, - } - h.timers = append(h.timers, t) - return t -} - -func (h *http2testSyncHooks) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(ctx) - t := h.afterFunc(d, cancel) - return ctx, func() { - t.Stop() - cancel() - } -} - -func (h *http2testSyncHooks) timeUntilEvent() time.Duration { - h.lock() - defer h.unlock() - var next time.Time - for _, t := range h.timers { - if next.IsZero() || t.when.Before(next) { - next = t.when - } - } - if d := next.Sub(h.now); d > 0 { - return d - } - return 0 -} - -// advance advances time and causes synthetic timers to fire. -func (h *http2testSyncHooks) advance(d time.Duration) { - h.lock() - defer h.unlock() - h.now = h.now.Add(d) - timers := h.timers[:0] - for _, t := range h.timers { - t := t // remove after go.mod depends on go1.22 - t.mu.Lock() - switch { - case t.when.After(h.now): - timers = append(timers, t) - case t.when.IsZero(): - // stopped timer - default: - t.when = time.Time{} - if t.c != nil { - close(t.c) - } - if t.f != nil { - h.total++ - go func() { - defer func() { - h.lock() - h.total-- - h.unlock() - }() - t.f() - }() - } - } - t.mu.Unlock() - } - h.timers = timers -} - -// A timer wraps a time.Timer, or a synthetic equivalent in tests. -// Unlike time.Timer, timer is single-use: The timer channel is closed when the timer expires. -type http2timer interface { +// A timer is a time.Timer, as an interface which can be replaced in tests. +type http2timer = interface { C() <-chan time.Time - Stop() bool Reset(d time.Duration) bool + Stop() bool } -// timeTimer implements timer using real time. +// timeTimer adapts a time.Timer to the timer interface. type http2timeTimer struct { - t *time.Timer - c chan time.Time + *time.Timer } -// newTimeTimer creates a new timer using real time. -func http2newTimeTimer(d time.Duration) http2timer { - ch := make(chan time.Time) - t := time.AfterFunc(d, func() { - close(ch) - }) - return &http2timeTimer{t, ch} -} - -// newTimeAfterFunc creates an AfterFunc timer using real time. -func http2newTimeAfterFunc(d time.Duration, f func()) http2timer { - return &http2timeTimer{ - t: time.AfterFunc(d, f), - } -} - -func (t http2timeTimer) C() <-chan time.Time { return t.c } - -func (t http2timeTimer) Stop() bool { return t.t.Stop() } - -func (t http2timeTimer) Reset(d time.Duration) bool { return t.t.Reset(d) } - -// fakeTimer implements timer using fake time. -type http2fakeTimer struct { - hooks *http2testSyncHooks - - mu sync.Mutex - when time.Time // when the timer will fire - c chan time.Time // closed when the timer fires; mutually exclusive with f - f func() // called when the timer fires; mutually exclusive with c -} - -func (t *http2fakeTimer) C() <-chan time.Time { return t.c } - -func (t *http2fakeTimer) Stop() bool { - t.mu.Lock() - defer t.mu.Unlock() - stopped := t.when.IsZero() - t.when = time.Time{} - return stopped -} - -func (t *http2fakeTimer) Reset(d time.Duration) bool { - if t.c != nil || t.f == nil { - panic("fakeTimer only supports Reset on AfterFunc timers") - } - t.mu.Lock() - defer t.mu.Unlock() - t.hooks.lock() - defer t.hooks.unlock() - active := !t.when.IsZero() - t.when = t.hooks.now.Add(d) - if !active { - t.hooks.timers = append(t.hooks.timers, t) - } - return active -} +func (t http2timeTimer) C() <-chan time.Time { return t.Timer.C } const ( // transportDefaultConnFlow is how many connection-level flow control @@ -7586,7 +7321,45 @@ type http2Transport struct { connPoolOnce sync.Once connPoolOrDef http2ClientConnPool // non-nil version of ConnPool - syncHooks *http2testSyncHooks + *http2transportTestHooks +} + +// Hook points used for testing. +// Outside of tests, t.transportTestHooks is nil and these all have minimal implementations. +// Inside tests, see the testSyncHooks function docs. + +type http2transportTestHooks struct { + newclientconn func(*http2ClientConn) + group http2synctestGroupInterface +} + +func (t *http2Transport) markNewGoroutine() { + if t != nil && t.http2transportTestHooks != nil { + t.http2transportTestHooks.group.Join() + } +} + +// newTimer creates a new time.Timer, or a synthetic timer in tests. +func (t *http2Transport) newTimer(d time.Duration) http2timer { + if t.http2transportTestHooks != nil { + return t.http2transportTestHooks.group.NewTimer(d) + } + return http2timeTimer{time.NewTimer(d)} +} + +// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests. +func (t *http2Transport) afterFunc(d time.Duration, f func()) http2timer { + if t.http2transportTestHooks != nil { + return t.http2transportTestHooks.group.AfterFunc(d, f) + } + return http2timeTimer{time.AfterFunc(d, f)} +} + +func (t *http2Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { + if t.http2transportTestHooks != nil { + return t.http2transportTestHooks.group.ContextWithTimeout(ctx, d) + } + return context.WithTimeout(ctx, d) } func (t *http2Transport) maxHeaderListSize() uint32 { @@ -7753,60 +7526,6 @@ type http2ClientConn struct { werr error // first write error that has occurred hbuf bytes.Buffer // HPACK encoder writes into this henc *hpack.Encoder - - syncHooks *http2testSyncHooks // can be nil -} - -// Hook points used for testing. -// Outside of tests, cc.syncHooks is nil and these all have minimal implementations. -// Inside tests, see the testSyncHooks function docs. - -// goRun starts a new goroutine. -func (cc *http2ClientConn) goRun(f func()) { - if cc.syncHooks != nil { - cc.syncHooks.goRun(f) - return - } - go f() -} - -// condBroadcast is cc.cond.Broadcast. -func (cc *http2ClientConn) condBroadcast() { - if cc.syncHooks != nil { - cc.syncHooks.condBroadcast(cc.cond) - } - cc.cond.Broadcast() -} - -// condWait is cc.cond.Wait. -func (cc *http2ClientConn) condWait() { - if cc.syncHooks != nil { - cc.syncHooks.condWait(cc.cond) - } - cc.cond.Wait() -} - -// newTimer creates a new time.Timer, or a synthetic timer in tests. -func (cc *http2ClientConn) newTimer(d time.Duration) http2timer { - if cc.syncHooks != nil { - return cc.syncHooks.newTimer(d) - } - return http2newTimeTimer(d) -} - -// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests. -func (cc *http2ClientConn) afterFunc(d time.Duration, f func()) http2timer { - if cc.syncHooks != nil { - return cc.syncHooks.afterFunc(d, f) - } - return http2newTimeAfterFunc(d, f) -} - -func (cc *http2ClientConn) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { - if cc.syncHooks != nil { - return cc.syncHooks.contextWithTimeout(ctx, d) - } - return context.WithTimeout(ctx, d) } // clientStream is the state for a single HTTP/2 stream. One of these @@ -7888,7 +7607,7 @@ func (cs *http2clientStream) abortStreamLocked(err error) { // TODO(dneil): Clean up tests where cs.cc.cond is nil. if cs.cc.cond != nil { // Wake up writeRequestBody if it is waiting on flow control. - cs.cc.condBroadcast() + cs.cc.cond.Broadcast() } } @@ -7898,7 +7617,7 @@ func (cs *http2clientStream) abortRequestBodyWrite() { defer cc.mu.Unlock() if cs.reqBody != nil && cs.reqBodyClosed == nil { cs.closeReqBodyLocked() - cc.condBroadcast() + cc.cond.Broadcast() } } @@ -7908,10 +7627,11 @@ func (cs *http2clientStream) closeReqBodyLocked() { } cs.reqBodyClosed = make(chan struct{}) reqBodyClosed := cs.reqBodyClosed - cs.cc.goRun(func() { + go func() { + cs.cc.t.markNewGoroutine() cs.reqBody.Close() close(reqBodyClosed) - }) + }() } type http2stickyErrWriter struct { @@ -8028,21 +7748,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res backoff := float64(uint(1) << (uint(retry) - 1)) backoff += backoff * (0.1 * mathrand.Float64()) d := time.Second * time.Duration(backoff) - var tm http2timer - if t.syncHooks != nil { - tm = t.syncHooks.newTimer(d) - t.syncHooks.blockUntil(func() bool { - select { - case <-tm.C(): - case <-req.Context().Done(): - default: - return false - } - return true - }) - } else { - tm = http2newTimeTimer(d) - } + tm := t.newTimer(d) select { case <-tm.C(): t.vlogf("RoundTrip retrying after failure: %v", roundTripErr) @@ -8127,8 +7833,8 @@ func http2canRetryError(err error) bool { } func (t *http2Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*http2ClientConn, error) { - if t.syncHooks != nil { - return t.newClientConn(nil, singleUse, t.syncHooks) + if t.http2transportTestHooks != nil { + return t.newClientConn(nil, singleUse) } host, _, err := net.SplitHostPort(addr) if err != nil { @@ -8138,7 +7844,7 @@ func (t *http2Transport) dialClientConn(ctx context.Context, addr string, single if err != nil { return nil, err } - return t.newClientConn(tconn, singleUse, nil) + return t.newClientConn(tconn, singleUse) } func (t *http2Transport) newTLSConfig(host string) *tls.Config { @@ -8204,10 +7910,10 @@ func (t *http2Transport) maxEncoderHeaderTableSize() uint32 { } func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { - return t.newClientConn(c, t.disableKeepAlives(), nil) + return t.newClientConn(c, t.disableKeepAlives()) } -func (t *http2Transport) newClientConn(c net.Conn, singleUse bool, hooks *http2testSyncHooks) (*http2ClientConn, error) { +func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) { cc := &http2ClientConn{ t: t, tconn: c, @@ -8222,16 +7928,12 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool, hooks *http2t wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), reqHeaderMu: make(chan struct{}, 1), - syncHooks: hooks, } - if hooks != nil { - hooks.newclientconn(cc) + if t.http2transportTestHooks != nil { + t.markNewGoroutine() + t.http2transportTestHooks.newclientconn(cc) c = cc.tconn } - if d := t.idleConnTimeout(); d != 0 { - cc.idleTimeout = d - cc.idleTimer = cc.afterFunc(d, cc.onIdleTimeout) - } if http2VerboseLogs { t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) } @@ -8295,7 +7997,13 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool, hooks *http2t return nil, cc.werr } - cc.goRun(cc.readLoop) + // Start the idle timer after the connection is fully initialized. + if d := t.idleConnTimeout(); d != 0 { + cc.idleTimeout = d + cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout) + } + + go cc.readLoop() return cc, nil } @@ -8303,7 +8011,7 @@ func (cc *http2ClientConn) healthCheck() { pingTimeout := cc.t.pingTimeout() // We don't need to periodically ping in the health check, because the readLoop of ClientConn will // trigger the healthCheck again if there is no frame received. - ctx, cancel := cc.contextWithTimeout(context.Background(), pingTimeout) + ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout) defer cancel() cc.vlogf("http2: Transport sending health check") err := cc.Ping(ctx) @@ -8546,7 +8254,8 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error { // Wait for all in-flight streams to complete or connection to close done := make(chan struct{}) cancelled := false // guarded by cc.mu - cc.goRun(func() { + go func() { + cc.t.markNewGoroutine() cc.mu.Lock() defer cc.mu.Unlock() for { @@ -8558,9 +8267,9 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error { if cancelled { break } - cc.condWait() + cc.cond.Wait() } - }) + }() http2shutdownEnterWaitStateHook() select { case <-done: @@ -8570,7 +8279,7 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error { cc.mu.Lock() // Free the goroutine above cancelled = true - cc.condBroadcast() + cc.cond.Broadcast() cc.mu.Unlock() return ctx.Err() } @@ -8608,7 +8317,7 @@ func (cc *http2ClientConn) closeForError(err error) { for _, cs := range cc.streams { cs.abortStreamLocked(err) } - cc.condBroadcast() + cc.cond.Broadcast() cc.mu.Unlock() cc.closeConn() } @@ -8723,23 +8432,30 @@ func (cc *http2ClientConn) roundTrip(req *Request, streamf func(*http2clientStre respHeaderRecv: make(chan struct{}), donec: make(chan struct{}), } - cc.goRun(func() { - cs.doRequest(req) - }) + + // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? + if !cc.t.disableCompression() && + req.Header.Get("Accept-Encoding") == "" && + req.Header.Get("Range") == "" && + !cs.isHead { + // Request gzip only, not deflate. Deflate is ambiguous and + // not as universally supported anyway. + // See: https://zlib.net/zlib_faq.html#faq39 + // + // Note that we don't request this for HEAD requests, + // due to a bug in nginx: + // http://trac.nginx.org/nginx/ticket/358 + // https://golang.org/issue/5522 + // + // We don't request gzip if the request is for a range, since + // auto-decoding a portion of a gzipped document will just fail + // anyway. See https://golang.org/issue/8923 + cs.requestedGzip = true + } + + go cs.doRequest(req, streamf) waitDone := func() error { - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-cs.donec: - case <-ctx.Done(): - case <-cs.reqCancel: - default: - return false - } - return true - }) - } select { case <-cs.donec: return nil @@ -8800,24 +8516,7 @@ func (cc *http2ClientConn) roundTrip(req *Request, streamf func(*http2clientStre return err } - if streamf != nil { - streamf(cs) - } - for { - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-cs.respHeaderRecv: - case <-cs.abort: - case <-ctx.Done(): - case <-cs.reqCancel: - default: - return false - } - return true - }) - } select { case <-cs.respHeaderRecv: return handleResponseHeaders() @@ -8847,8 +8546,9 @@ func (cc *http2ClientConn) roundTrip(req *Request, streamf func(*http2clientStre // doRequest runs for the duration of the request lifetime. // // It sends the request and performs post-request cleanup (closing Request.Body, etc.). -func (cs *http2clientStream) doRequest(req *Request) { - err := cs.writeRequest(req) +func (cs *http2clientStream) doRequest(req *Request, streamf func(*http2clientStream)) { + cs.cc.t.markNewGoroutine() + err := cs.writeRequest(req, streamf) cs.cleanupWriteRequest(err) } @@ -8859,7 +8559,7 @@ func (cs *http2clientStream) doRequest(req *Request) { // // It returns non-nil if the request ends otherwise. // If the returned error is StreamError, the error Code may be used in resetting the stream. -func (cs *http2clientStream) writeRequest(req *Request) (err error) { +func (cs *http2clientStream) writeRequest(req *Request, streamf func(*http2clientStream)) (err error) { cc := cs.cc ctx := cs.ctx @@ -8873,21 +8573,6 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) { if cc.reqHeaderMu == nil { panic("RoundTrip on uninitialized ClientConn") // for tests } - var newStreamHook func(*http2clientStream) - if cc.syncHooks != nil { - newStreamHook = cc.syncHooks.newstream - cc.syncHooks.blockUntil(func() bool { - select { - case cc.reqHeaderMu <- struct{}{}: - <-cc.reqHeaderMu - case <-cs.reqCancel: - case <-ctx.Done(): - default: - return false - } - return true - }) - } select { case cc.reqHeaderMu <- struct{}{}: case <-cs.reqCancel: @@ -8912,28 +8597,8 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) { } cc.mu.Unlock() - if newStreamHook != nil { - newStreamHook(cs) - } - - // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? - if !cc.t.disableCompression() && - req.Header.Get("Accept-Encoding") == "" && - req.Header.Get("Range") == "" && - !cs.isHead { - // Request gzip only, not deflate. Deflate is ambiguous and - // not as universally supported anyway. - // See: https://zlib.net/zlib_faq.html#faq39 - // - // Note that we don't request this for HEAD requests, - // due to a bug in nginx: - // http://trac.nginx.org/nginx/ticket/358 - // https://golang.org/issue/5522 - // - // We don't request gzip if the request is for a range, since - // auto-decoding a portion of a gzipped document will just fail - // anyway. See https://golang.org/issue/8923 - cs.requestedGzip = true + if streamf != nil { + streamf(cs) } continueTimeout := cc.t.expectContinueTimeout() @@ -8996,7 +8661,7 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) { var respHeaderTimer <-chan time.Time var respHeaderRecv chan struct{} if d := cc.responseHeaderTimeout(); d != 0 { - timer := cc.newTimer(d) + timer := cc.t.newTimer(d) defer timer.Stop() respHeaderTimer = timer.C() respHeaderRecv = cs.respHeaderRecv @@ -9005,21 +8670,6 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) { // or until the request is aborted (via context, error, or otherwise), // whichever comes first. for { - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-cs.peerClosed: - case <-respHeaderTimer: - case <-respHeaderRecv: - case <-cs.abort: - case <-ctx.Done(): - case <-cs.reqCancel: - default: - return false - } - return true - }) - } select { case <-cs.peerClosed: return nil @@ -9168,7 +8818,7 @@ func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) e return nil } cc.pendingRequests++ - cc.condWait() + cc.cond.Wait() cc.pendingRequests-- select { case <-cs.abort: @@ -9431,7 +9081,7 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er cs.flow.take(take) return take, nil } - cc.condWait() + cc.cond.Wait() } } @@ -9714,7 +9364,7 @@ func (cc *http2ClientConn) forgetStreamID(id uint32) { } // Wake up writeRequestBody via clientStream.awaitFlowControl and // wake up RoundTrip if there is a pending request. - cc.condBroadcast() + cc.cond.Broadcast() closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 { @@ -9736,6 +9386,7 @@ type http2clientConnReadLoop struct { // readLoop runs in its own goroutine and reads and dispatches frames. func (cc *http2ClientConn) readLoop() { + cc.t.markNewGoroutine() rl := &http2clientConnReadLoop{cc: cc} defer rl.cleanup() cc.readerErr = rl.run() @@ -9802,7 +9453,7 @@ func (rl *http2clientConnReadLoop) cleanup() { cs.abortStreamLocked(err) } } - cc.condBroadcast() + cc.cond.Broadcast() cc.mu.Unlock() } @@ -9839,7 +9490,7 @@ func (rl *http2clientConnReadLoop) run() error { readIdleTimeout := cc.t.ReadIdleTimeout var t http2timer if readIdleTimeout != 0 { - t = cc.afterFunc(readIdleTimeout, cc.healthCheck) + t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck) } for { f, err := cc.fr.ReadFrame() @@ -10437,7 +10088,7 @@ func (rl *http2clientConnReadLoop) processSettingsNoWrite(f *http2SettingsFrame) for _, cs := range cc.streams { cs.flow.add(delta) } - cc.condBroadcast() + cc.cond.Broadcast() cc.initialWindowSize = s.Val case http2SettingHeaderTableSize: @@ -10492,7 +10143,7 @@ func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame return http2ConnectionError(http2ErrCodeFlowControl) } - cc.condBroadcast() + cc.cond.Broadcast() return nil } @@ -10536,7 +10187,8 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error { } var pingError error errc := make(chan struct{}) - cc.goRun(func() { + go func() { + cc.t.markNewGoroutine() cc.wmu.Lock() defer cc.wmu.Unlock() if pingError = cc.fr.WritePing(false, p); pingError != nil { @@ -10547,20 +10199,7 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error { close(errc) return } - }) - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-c: - case <-errc: - case <-ctx.Done(): - case <-cc.readerDone: - default: - return false - } - return true - }) - } + }() select { case <-c: return nil @@ -11874,8 +11513,8 @@ func (ws *http2priorityWriteScheduler) addClosedOrIdleNode(list *[]*http2priorit } func (ws *http2priorityWriteScheduler) removeNode(n *http2priorityNode) { - for k := n.kids; k != nil; k = k.next { - k.setParent(n.parent) + for n.kids != nil { + n.kids.setParent(n.parent) } n.setParent(nil) delete(ws.nodes, n.id) |
