aboutsummaryrefslogtreecommitdiff
path: root/src/net/http
diff options
context:
space:
mode:
authorMichael Anthony Knyszek <mknyszek@google.com>2024-06-03 21:46:39 +0000
committerGopher Robot <gobot@golang.org>2024-06-04 16:19:04 +0000
commitb5a861782312d2b3a4f71e33d9a0c2b01a40fe5f (patch)
tree27f868c3eba92440b03c9ae8436019bfe0bafdba /src/net/http
parent499de42188ee0b0680aec4c49e25594456fdf15a (diff)
downloadgo-b5a861782312d2b3a4f71e33d9a0c2b01a40fe5f.tar.xz
all: update vendored dependencies
The Go 1.23 code freeze has recently started. This is a time to update all golang.org/x/... module versions that contribute packages to the std and cmd modules in the standard library to latest master versions. For #36905. [git-generate] go install golang.org/x/build/cmd/updatestd@latest go install golang.org/x/tools/cmd/bundle@latest updatestd -goroot=$(pwd) -branch=master Change-Id: I9162f547c148809d6fb1e4157f6f504634cef3b7 Reviewed-on: https://go-review.googlesource.com/c/go/+/589935 Auto-Submit: Michael Knyszek <mknyszek@google.com> Reviewed-by: Matthew Dempsky <mdempsky@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'src/net/http')
-rw-r--r--src/net/http/h2_bundle.go753
1 files changed, 196 insertions, 557 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go
index 5f97a27ac2..0b305844ae 100644
--- a/src/net/http/h2_bundle.go
+++ b/src/net/http/h2_bundle.go
@@ -3525,13 +3525,6 @@ type http2stringWriter interface {
WriteString(s string) (n int, err error)
}
-// A gate lets two goroutines coordinate their activities.
-type http2gate chan struct{}
-
-func (g http2gate) Done() { g <- struct{}{} }
-
-func (g http2gate) Wait() { <-g }
-
// A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed).
type http2closeWaiter chan struct{}
@@ -3704,6 +3697,17 @@ func http2validPseudoPath(v string) bool {
// any size (as long as it's first).
type http2incomparable [0]func()
+// synctestGroupInterface is the methods of synctestGroup used by Server and Transport.
+// It's defined as an interface here to let us keep synctestGroup entirely test-only
+// and not a part of non-test builds.
+type http2synctestGroupInterface interface {
+ Join()
+ Now() time.Time
+ NewTimer(d time.Duration) http2timer
+ AfterFunc(d time.Duration, f func()) http2timer
+ ContextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc)
+}
+
// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
// underlying buffer is an interface. (io.Pipe is always unbuffered)
@@ -3980,6 +3984,39 @@ type http2Server struct {
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *http2serverInternalState
+
+ // Synchronization group used for testing.
+ // Outside of tests, this is nil.
+ group http2synctestGroupInterface
+}
+
+func (s *http2Server) markNewGoroutine() {
+ if s.group != nil {
+ s.group.Join()
+ }
+}
+
+func (s *http2Server) now() time.Time {
+ if s.group != nil {
+ return s.group.Now()
+ }
+ return time.Now()
+}
+
+// newTimer creates a new time.Timer, or a synthetic timer in tests.
+func (s *http2Server) newTimer(d time.Duration) http2timer {
+ if s.group != nil {
+ return s.group.NewTimer(d)
+ }
+ return http2timeTimer{time.NewTimer(d)}
+}
+
+// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
+func (s *http2Server) afterFunc(d time.Duration, f func()) http2timer {
+ if s.group != nil {
+ return s.group.AfterFunc(d, f)
+ }
+ return http2timeTimer{time.AfterFunc(d, f)}
}
func (s *http2Server) initialConnRecvWindowSize() int32 {
@@ -4226,6 +4263,10 @@ func (o *http2ServeConnOpts) handler() Handler {
//
// The opts parameter is optional. If nil, default values are used.
func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
+ s.serveConn(c, opts, nil)
+}
+
+func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func(*http2serverConn)) {
baseCtx, cancel := http2serverConnBaseContext(c, opts)
defer cancel()
@@ -4252,6 +4293,9 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
pushEnabled: true,
sawClientPreface: opts.SawClientPreface,
}
+ if newf != nil {
+ newf(sc)
+ }
s.state.registerConn(sc)
defer s.state.unregisterConn(sc)
@@ -4425,8 +4469,8 @@ type http2serverConn struct {
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode http2ErrCode
- shutdownTimer *time.Timer // nil until used
- idleTimer *time.Timer // nil if unused
+ shutdownTimer http2timer // nil until used
+ idleTimer http2timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -4475,12 +4519,12 @@ type http2stream struct {
flow http2outflow // limits writing from Handler to client
inflow http2inflow // what the client is allowed to POST/etc to us
state http2streamState
- resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
- gotTrailerHeader bool // HEADER frame for trailers was seen
- wroteHeaders bool // whether we wrote headers (not status 100)
- readDeadline *time.Timer // nil if unused
- writeDeadline *time.Timer // nil if unused
- closeErr error // set before cw is closed
+ resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
+ gotTrailerHeader bool // HEADER frame for trailers was seen
+ wroteHeaders bool // whether we wrote headers (not status 100)
+ readDeadline http2timer // nil if unused
+ writeDeadline http2timer // nil if unused
+ closeErr error // set before cw is closed
trailer Header // accumulated trailers
reqTrailer Header // handler's Request.Trailer
@@ -4561,11 +4605,7 @@ func http2isClosedConnError(err error) bool {
return false
}
- // TODO: remove this string search and be more like the Windows
- // case below. That might involve modifying the standard library
- // to return better error types.
- str := err.Error()
- if strings.Contains(str, "use of closed network connection") {
+ if errors.Is(err, net.ErrClosed) {
return true
}
@@ -4644,8 +4684,9 @@ type http2readFrameResult struct {
// consumer is done with the frame.
// It's run on its own goroutine.
func (sc *http2serverConn) readFrames() {
- gate := make(http2gate)
- gateDone := gate.Done
+ sc.srv.markNewGoroutine()
+ gate := make(chan struct{})
+ gateDone := func() { gate <- struct{}{} }
for {
f, err := sc.framer.ReadFrame()
select {
@@ -4676,6 +4717,7 @@ type http2frameWriteResult struct {
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *http2serverConn) writeFrameAsync(wr http2FrameWriteRequest, wd *http2writeData) {
+ sc.srv.markNewGoroutine()
var err error
if wd == nil {
err = wr.write.writeFrame(sc)
@@ -4755,13 +4797,13 @@ func (sc *http2serverConn) serve() {
sc.setConnState(StateIdle)
if sc.srv.IdleTimeout > 0 {
- sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
+ sc.idleTimer = sc.srv.afterFunc(sc.srv.IdleTimeout, sc.onIdleTimer)
defer sc.idleTimer.Stop()
}
go sc.readFrames() // closed by defer sc.conn.Close above
- settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer)
+ settingsTimer := sc.srv.afterFunc(http2firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
loopNum := 0
@@ -4892,10 +4934,10 @@ func (sc *http2serverConn) readPreface() error {
errc <- nil
}
}()
- timer := time.NewTimer(http2prefaceTimeout) // TODO: configurable on *Server?
+ timer := sc.srv.newTimer(http2prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
- case <-timer.C:
+ case <-timer.C():
return http2errPrefaceTimeout
case err := <-errc:
if err == nil {
@@ -5260,7 +5302,7 @@ func (sc *http2serverConn) goAway(code http2ErrCode) {
func (sc *http2serverConn) shutDownIn(d time.Duration) {
sc.serveG.check()
- sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer)
+ sc.shutdownTimer = sc.srv.afterFunc(d, sc.onShutdownTimer)
}
func (sc *http2serverConn) resetStream(se http2StreamError) {
@@ -5474,7 +5516,7 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) {
delete(sc.streams, st.id)
if len(sc.streams) == 0 {
sc.setConnState(StateIdle)
- if sc.srv.IdleTimeout > 0 {
+ if sc.srv.IdleTimeout > 0 && sc.idleTimer != nil {
sc.idleTimer.Reset(sc.srv.IdleTimeout)
}
if http2h1ServerKeepAlivesDisabled(sc.hs) {
@@ -5496,6 +5538,7 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) {
}
}
st.closeErr = err
+ st.cancelCtx()
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.CloseStream(st.id)
}
@@ -5856,7 +5899,7 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error {
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout > 0 {
sc.conn.SetReadDeadline(time.Time{})
- st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
+ st.readDeadline = sc.srv.afterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
}
return sc.scheduleHandler(id, rw, req, handler)
@@ -5954,7 +5997,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState
st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.init(sc.srv.initialStreamRecvWindowSize())
if sc.hs.WriteTimeout > 0 {
- st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
+ st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
sc.streams[id] = st
@@ -6178,6 +6221,7 @@ func (sc *http2serverConn) handlerDone() {
// Run on its own goroutine.
func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, handler func(ResponseWriter, *Request)) {
+ sc.srv.markNewGoroutine()
defer sc.sendServeMsg(http2handlerDoneMsg)
didPanic := true
defer func() {
@@ -6474,7 +6518,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) {
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure.
- date = time.Now().UTC().Format(TimeFormat)
+ date = rws.conn.srv.now().UTC().Format(TimeFormat)
}
for _, v := range rws.snapHeader["Trailer"] {
@@ -6596,7 +6640,7 @@ func (rws *http2responseWriterState) promoteUndeclaredTrailers() {
func (w *http2responseWriter) SetReadDeadline(deadline time.Time) error {
st := w.rws.stream
- if !deadline.IsZero() && deadline.Before(time.Now()) {
+ if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onReadTimeout()
@@ -6612,9 +6656,9 @@ func (w *http2responseWriter) SetReadDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.readDeadline = nil
} else if st.readDeadline == nil {
- st.readDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onReadTimeout)
+ st.readDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onReadTimeout)
} else {
- st.readDeadline.Reset(deadline.Sub(time.Now()))
+ st.readDeadline.Reset(deadline.Sub(sc.srv.now()))
}
})
return nil
@@ -6622,7 +6666,7 @@ func (w *http2responseWriter) SetReadDeadline(deadline time.Time) error {
func (w *http2responseWriter) SetWriteDeadline(deadline time.Time) error {
st := w.rws.stream
- if !deadline.IsZero() && deadline.Before(time.Now()) {
+ if !deadline.IsZero() && deadline.Before(w.rws.conn.srv.now()) {
// If we're setting a deadline in the past, reset the stream immediately
// so writes after SetWriteDeadline returns will fail.
st.onWriteTimeout()
@@ -6638,9 +6682,9 @@ func (w *http2responseWriter) SetWriteDeadline(deadline time.Time) error {
if deadline.IsZero() {
st.writeDeadline = nil
} else if st.writeDeadline == nil {
- st.writeDeadline = time.AfterFunc(deadline.Sub(time.Now()), st.onWriteTimeout)
+ st.writeDeadline = sc.srv.afterFunc(deadline.Sub(sc.srv.now()), st.onWriteTimeout)
} else {
- st.writeDeadline.Reset(deadline.Sub(time.Now()))
+ st.writeDeadline.Reset(deadline.Sub(sc.srv.now()))
}
})
return nil
@@ -7116,328 +7160,19 @@ func (sc *http2serverConn) countError(name string, err error) error {
return err
}
-// testSyncHooks coordinates goroutines in tests.
-//
-// For example, a call to ClientConn.RoundTrip involves several goroutines, including:
-// - the goroutine running RoundTrip;
-// - the clientStream.doRequest goroutine, which writes the request; and
-// - the clientStream.readLoop goroutine, which reads the response.
-//
-// Using testSyncHooks, a test can start a RoundTrip and identify when all these goroutines
-// are blocked waiting for some condition such as reading the Request.Body or waiting for
-// flow control to become available.
-//
-// The testSyncHooks also manage timers and synthetic time in tests.
-// This permits us to, for example, start a request and cause it to time out waiting for
-// response headers without resorting to time.Sleep calls.
-type http2testSyncHooks struct {
- // active/inactive act as a mutex and condition variable.
- //
- // - neither chan contains a value: testSyncHooks is locked.
- // - active contains a value: unlocked, and at least one goroutine is not blocked
- // - inactive contains a value: unlocked, and all goroutines are blocked
- active chan struct{}
- inactive chan struct{}
-
- // goroutine counts
- total int // total goroutines
- condwait map[*sync.Cond]int // blocked in sync.Cond.Wait
- blocked []*http2testBlockedGoroutine // otherwise blocked
-
- // fake time
- now time.Time
- timers []*http2fakeTimer
-
- // Transport testing: Report various events.
- newclientconn func(*http2ClientConn)
- newstream func(*http2clientStream)
-}
-
-// testBlockedGoroutine is a blocked goroutine.
-type http2testBlockedGoroutine struct {
- f func() bool // blocked until f returns true
- ch chan struct{} // closed when unblocked
-}
-
-func http2newTestSyncHooks() *http2testSyncHooks {
- h := &http2testSyncHooks{
- active: make(chan struct{}, 1),
- inactive: make(chan struct{}, 1),
- condwait: map[*sync.Cond]int{},
- }
- h.inactive <- struct{}{}
- h.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
- return h
-}
-
-// lock acquires the testSyncHooks mutex.
-func (h *http2testSyncHooks) lock() {
- select {
- case <-h.active:
- case <-h.inactive:
- }
-}
-
-// waitInactive waits for all goroutines to become inactive.
-func (h *http2testSyncHooks) waitInactive() {
- for {
- <-h.inactive
- if !h.unlock() {
- break
- }
- }
-}
-
-// unlock releases the testSyncHooks mutex.
-// It reports whether any goroutines are active.
-func (h *http2testSyncHooks) unlock() (active bool) {
- // Look for a blocked goroutine which can be unblocked.
- blocked := h.blocked[:0]
- unblocked := false
- for _, b := range h.blocked {
- if !unblocked && b.f() {
- unblocked = true
- close(b.ch)
- } else {
- blocked = append(blocked, b)
- }
- }
- h.blocked = blocked
-
- // Count goroutines blocked on condition variables.
- condwait := 0
- for _, count := range h.condwait {
- condwait += count
- }
-
- if h.total > condwait+len(blocked) {
- h.active <- struct{}{}
- return true
- } else {
- h.inactive <- struct{}{}
- return false
- }
-}
-
-// goRun starts a new goroutine.
-func (h *http2testSyncHooks) goRun(f func()) {
- h.lock()
- h.total++
- h.unlock()
- go func() {
- defer func() {
- h.lock()
- h.total--
- h.unlock()
- }()
- f()
- }()
-}
-
-// blockUntil indicates that a goroutine is blocked waiting for some condition to become true.
-// It waits until f returns true before proceeding.
-//
-// Example usage:
-//
-// h.blockUntil(func() bool {
-// // Is the context done yet?
-// select {
-// case <-ctx.Done():
-// default:
-// return false
-// }
-// return true
-// })
-// // Wait for the context to become done.
-// <-ctx.Done()
-//
-// The function f passed to blockUntil must be non-blocking and idempotent.
-func (h *http2testSyncHooks) blockUntil(f func() bool) {
- if f() {
- return
- }
- ch := make(chan struct{})
- h.lock()
- h.blocked = append(h.blocked, &http2testBlockedGoroutine{
- f: f,
- ch: ch,
- })
- h.unlock()
- <-ch
-}
-
-// broadcast is sync.Cond.Broadcast.
-func (h *http2testSyncHooks) condBroadcast(cond *sync.Cond) {
- h.lock()
- delete(h.condwait, cond)
- h.unlock()
- cond.Broadcast()
-}
-
-// broadcast is sync.Cond.Wait.
-func (h *http2testSyncHooks) condWait(cond *sync.Cond) {
- h.lock()
- h.condwait[cond]++
- h.unlock()
-}
-
-// newTimer creates a new fake timer.
-func (h *http2testSyncHooks) newTimer(d time.Duration) http2timer {
- h.lock()
- defer h.unlock()
- t := &http2fakeTimer{
- hooks: h,
- when: h.now.Add(d),
- c: make(chan time.Time),
- }
- h.timers = append(h.timers, t)
- return t
-}
-
-// afterFunc creates a new fake AfterFunc timer.
-func (h *http2testSyncHooks) afterFunc(d time.Duration, f func()) http2timer {
- h.lock()
- defer h.unlock()
- t := &http2fakeTimer{
- hooks: h,
- when: h.now.Add(d),
- f: f,
- }
- h.timers = append(h.timers, t)
- return t
-}
-
-func (h *http2testSyncHooks) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(ctx)
- t := h.afterFunc(d, cancel)
- return ctx, func() {
- t.Stop()
- cancel()
- }
-}
-
-func (h *http2testSyncHooks) timeUntilEvent() time.Duration {
- h.lock()
- defer h.unlock()
- var next time.Time
- for _, t := range h.timers {
- if next.IsZero() || t.when.Before(next) {
- next = t.when
- }
- }
- if d := next.Sub(h.now); d > 0 {
- return d
- }
- return 0
-}
-
-// advance advances time and causes synthetic timers to fire.
-func (h *http2testSyncHooks) advance(d time.Duration) {
- h.lock()
- defer h.unlock()
- h.now = h.now.Add(d)
- timers := h.timers[:0]
- for _, t := range h.timers {
- t := t // remove after go.mod depends on go1.22
- t.mu.Lock()
- switch {
- case t.when.After(h.now):
- timers = append(timers, t)
- case t.when.IsZero():
- // stopped timer
- default:
- t.when = time.Time{}
- if t.c != nil {
- close(t.c)
- }
- if t.f != nil {
- h.total++
- go func() {
- defer func() {
- h.lock()
- h.total--
- h.unlock()
- }()
- t.f()
- }()
- }
- }
- t.mu.Unlock()
- }
- h.timers = timers
-}
-
-// A timer wraps a time.Timer, or a synthetic equivalent in tests.
-// Unlike time.Timer, timer is single-use: The timer channel is closed when the timer expires.
-type http2timer interface {
+// A timer is a time.Timer, as an interface which can be replaced in tests.
+type http2timer = interface {
C() <-chan time.Time
- Stop() bool
Reset(d time.Duration) bool
+ Stop() bool
}
-// timeTimer implements timer using real time.
+// timeTimer adapts a time.Timer to the timer interface.
type http2timeTimer struct {
- t *time.Timer
- c chan time.Time
+ *time.Timer
}
-// newTimeTimer creates a new timer using real time.
-func http2newTimeTimer(d time.Duration) http2timer {
- ch := make(chan time.Time)
- t := time.AfterFunc(d, func() {
- close(ch)
- })
- return &http2timeTimer{t, ch}
-}
-
-// newTimeAfterFunc creates an AfterFunc timer using real time.
-func http2newTimeAfterFunc(d time.Duration, f func()) http2timer {
- return &http2timeTimer{
- t: time.AfterFunc(d, f),
- }
-}
-
-func (t http2timeTimer) C() <-chan time.Time { return t.c }
-
-func (t http2timeTimer) Stop() bool { return t.t.Stop() }
-
-func (t http2timeTimer) Reset(d time.Duration) bool { return t.t.Reset(d) }
-
-// fakeTimer implements timer using fake time.
-type http2fakeTimer struct {
- hooks *http2testSyncHooks
-
- mu sync.Mutex
- when time.Time // when the timer will fire
- c chan time.Time // closed when the timer fires; mutually exclusive with f
- f func() // called when the timer fires; mutually exclusive with c
-}
-
-func (t *http2fakeTimer) C() <-chan time.Time { return t.c }
-
-func (t *http2fakeTimer) Stop() bool {
- t.mu.Lock()
- defer t.mu.Unlock()
- stopped := t.when.IsZero()
- t.when = time.Time{}
- return stopped
-}
-
-func (t *http2fakeTimer) Reset(d time.Duration) bool {
- if t.c != nil || t.f == nil {
- panic("fakeTimer only supports Reset on AfterFunc timers")
- }
- t.mu.Lock()
- defer t.mu.Unlock()
- t.hooks.lock()
- defer t.hooks.unlock()
- active := !t.when.IsZero()
- t.when = t.hooks.now.Add(d)
- if !active {
- t.hooks.timers = append(t.hooks.timers, t)
- }
- return active
-}
+func (t http2timeTimer) C() <-chan time.Time { return t.Timer.C }
const (
// transportDefaultConnFlow is how many connection-level flow control
@@ -7586,7 +7321,45 @@ type http2Transport struct {
connPoolOnce sync.Once
connPoolOrDef http2ClientConnPool // non-nil version of ConnPool
- syncHooks *http2testSyncHooks
+ *http2transportTestHooks
+}
+
+// Hook points used for testing.
+// Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
+// Inside tests, see the testSyncHooks function docs.
+
+type http2transportTestHooks struct {
+ newclientconn func(*http2ClientConn)
+ group http2synctestGroupInterface
+}
+
+func (t *http2Transport) markNewGoroutine() {
+ if t != nil && t.http2transportTestHooks != nil {
+ t.http2transportTestHooks.group.Join()
+ }
+}
+
+// newTimer creates a new time.Timer, or a synthetic timer in tests.
+func (t *http2Transport) newTimer(d time.Duration) http2timer {
+ if t.http2transportTestHooks != nil {
+ return t.http2transportTestHooks.group.NewTimer(d)
+ }
+ return http2timeTimer{time.NewTimer(d)}
+}
+
+// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
+func (t *http2Transport) afterFunc(d time.Duration, f func()) http2timer {
+ if t.http2transportTestHooks != nil {
+ return t.http2transportTestHooks.group.AfterFunc(d, f)
+ }
+ return http2timeTimer{time.AfterFunc(d, f)}
+}
+
+func (t *http2Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
+ if t.http2transportTestHooks != nil {
+ return t.http2transportTestHooks.group.ContextWithTimeout(ctx, d)
+ }
+ return context.WithTimeout(ctx, d)
}
func (t *http2Transport) maxHeaderListSize() uint32 {
@@ -7753,60 +7526,6 @@ type http2ClientConn struct {
werr error // first write error that has occurred
hbuf bytes.Buffer // HPACK encoder writes into this
henc *hpack.Encoder
-
- syncHooks *http2testSyncHooks // can be nil
-}
-
-// Hook points used for testing.
-// Outside of tests, cc.syncHooks is nil and these all have minimal implementations.
-// Inside tests, see the testSyncHooks function docs.
-
-// goRun starts a new goroutine.
-func (cc *http2ClientConn) goRun(f func()) {
- if cc.syncHooks != nil {
- cc.syncHooks.goRun(f)
- return
- }
- go f()
-}
-
-// condBroadcast is cc.cond.Broadcast.
-func (cc *http2ClientConn) condBroadcast() {
- if cc.syncHooks != nil {
- cc.syncHooks.condBroadcast(cc.cond)
- }
- cc.cond.Broadcast()
-}
-
-// condWait is cc.cond.Wait.
-func (cc *http2ClientConn) condWait() {
- if cc.syncHooks != nil {
- cc.syncHooks.condWait(cc.cond)
- }
- cc.cond.Wait()
-}
-
-// newTimer creates a new time.Timer, or a synthetic timer in tests.
-func (cc *http2ClientConn) newTimer(d time.Duration) http2timer {
- if cc.syncHooks != nil {
- return cc.syncHooks.newTimer(d)
- }
- return http2newTimeTimer(d)
-}
-
-// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
-func (cc *http2ClientConn) afterFunc(d time.Duration, f func()) http2timer {
- if cc.syncHooks != nil {
- return cc.syncHooks.afterFunc(d, f)
- }
- return http2newTimeAfterFunc(d, f)
-}
-
-func (cc *http2ClientConn) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
- if cc.syncHooks != nil {
- return cc.syncHooks.contextWithTimeout(ctx, d)
- }
- return context.WithTimeout(ctx, d)
}
// clientStream is the state for a single HTTP/2 stream. One of these
@@ -7888,7 +7607,7 @@ func (cs *http2clientStream) abortStreamLocked(err error) {
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
- cs.cc.condBroadcast()
+ cs.cc.cond.Broadcast()
}
}
@@ -7898,7 +7617,7 @@ func (cs *http2clientStream) abortRequestBodyWrite() {
defer cc.mu.Unlock()
if cs.reqBody != nil && cs.reqBodyClosed == nil {
cs.closeReqBodyLocked()
- cc.condBroadcast()
+ cc.cond.Broadcast()
}
}
@@ -7908,10 +7627,11 @@ func (cs *http2clientStream) closeReqBodyLocked() {
}
cs.reqBodyClosed = make(chan struct{})
reqBodyClosed := cs.reqBodyClosed
- cs.cc.goRun(func() {
+ go func() {
+ cs.cc.t.markNewGoroutine()
cs.reqBody.Close()
close(reqBodyClosed)
- })
+ }()
}
type http2stickyErrWriter struct {
@@ -8028,21 +7748,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
backoff := float64(uint(1) << (uint(retry) - 1))
backoff += backoff * (0.1 * mathrand.Float64())
d := time.Second * time.Duration(backoff)
- var tm http2timer
- if t.syncHooks != nil {
- tm = t.syncHooks.newTimer(d)
- t.syncHooks.blockUntil(func() bool {
- select {
- case <-tm.C():
- case <-req.Context().Done():
- default:
- return false
- }
- return true
- })
- } else {
- tm = http2newTimeTimer(d)
- }
+ tm := t.newTimer(d)
select {
case <-tm.C():
t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
@@ -8127,8 +7833,8 @@ func http2canRetryError(err error) bool {
}
func (t *http2Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*http2ClientConn, error) {
- if t.syncHooks != nil {
- return t.newClientConn(nil, singleUse, t.syncHooks)
+ if t.http2transportTestHooks != nil {
+ return t.newClientConn(nil, singleUse)
}
host, _, err := net.SplitHostPort(addr)
if err != nil {
@@ -8138,7 +7844,7 @@ func (t *http2Transport) dialClientConn(ctx context.Context, addr string, single
if err != nil {
return nil, err
}
- return t.newClientConn(tconn, singleUse, nil)
+ return t.newClientConn(tconn, singleUse)
}
func (t *http2Transport) newTLSConfig(host string) *tls.Config {
@@ -8204,10 +7910,10 @@ func (t *http2Transport) maxEncoderHeaderTableSize() uint32 {
}
func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
- return t.newClientConn(c, t.disableKeepAlives(), nil)
+ return t.newClientConn(c, t.disableKeepAlives())
}
-func (t *http2Transport) newClientConn(c net.Conn, singleUse bool, hooks *http2testSyncHooks) (*http2ClientConn, error) {
+func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) {
cc := &http2ClientConn{
t: t,
tconn: c,
@@ -8222,16 +7928,12 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool, hooks *http2t
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
reqHeaderMu: make(chan struct{}, 1),
- syncHooks: hooks,
}
- if hooks != nil {
- hooks.newclientconn(cc)
+ if t.http2transportTestHooks != nil {
+ t.markNewGoroutine()
+ t.http2transportTestHooks.newclientconn(cc)
c = cc.tconn
}
- if d := t.idleConnTimeout(); d != 0 {
- cc.idleTimeout = d
- cc.idleTimer = cc.afterFunc(d, cc.onIdleTimeout)
- }
if http2VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
}
@@ -8295,7 +7997,13 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool, hooks *http2t
return nil, cc.werr
}
- cc.goRun(cc.readLoop)
+ // Start the idle timer after the connection is fully initialized.
+ if d := t.idleConnTimeout(); d != 0 {
+ cc.idleTimeout = d
+ cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
+ }
+
+ go cc.readLoop()
return cc, nil
}
@@ -8303,7 +8011,7 @@ func (cc *http2ClientConn) healthCheck() {
pingTimeout := cc.t.pingTimeout()
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
- ctx, cancel := cc.contextWithTimeout(context.Background(), pingTimeout)
+ ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
defer cancel()
cc.vlogf("http2: Transport sending health check")
err := cc.Ping(ctx)
@@ -8546,7 +8254,8 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error {
// Wait for all in-flight streams to complete or connection to close
done := make(chan struct{})
cancelled := false // guarded by cc.mu
- cc.goRun(func() {
+ go func() {
+ cc.t.markNewGoroutine()
cc.mu.Lock()
defer cc.mu.Unlock()
for {
@@ -8558,9 +8267,9 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error {
if cancelled {
break
}
- cc.condWait()
+ cc.cond.Wait()
}
- })
+ }()
http2shutdownEnterWaitStateHook()
select {
case <-done:
@@ -8570,7 +8279,7 @@ func (cc *http2ClientConn) Shutdown(ctx context.Context) error {
cc.mu.Lock()
// Free the goroutine above
cancelled = true
- cc.condBroadcast()
+ cc.cond.Broadcast()
cc.mu.Unlock()
return ctx.Err()
}
@@ -8608,7 +8317,7 @@ func (cc *http2ClientConn) closeForError(err error) {
for _, cs := range cc.streams {
cs.abortStreamLocked(err)
}
- cc.condBroadcast()
+ cc.cond.Broadcast()
cc.mu.Unlock()
cc.closeConn()
}
@@ -8723,23 +8432,30 @@ func (cc *http2ClientConn) roundTrip(req *Request, streamf func(*http2clientStre
respHeaderRecv: make(chan struct{}),
donec: make(chan struct{}),
}
- cc.goRun(func() {
- cs.doRequest(req)
- })
+
+ // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
+ if !cc.t.disableCompression() &&
+ req.Header.Get("Accept-Encoding") == "" &&
+ req.Header.Get("Range") == "" &&
+ !cs.isHead {
+ // Request gzip only, not deflate. Deflate is ambiguous and
+ // not as universally supported anyway.
+ // See: https://zlib.net/zlib_faq.html#faq39
+ //
+ // Note that we don't request this for HEAD requests,
+ // due to a bug in nginx:
+ // http://trac.nginx.org/nginx/ticket/358
+ // https://golang.org/issue/5522
+ //
+ // We don't request gzip if the request is for a range, since
+ // auto-decoding a portion of a gzipped document will just fail
+ // anyway. See https://golang.org/issue/8923
+ cs.requestedGzip = true
+ }
+
+ go cs.doRequest(req, streamf)
waitDone := func() error {
- if cc.syncHooks != nil {
- cc.syncHooks.blockUntil(func() bool {
- select {
- case <-cs.donec:
- case <-ctx.Done():
- case <-cs.reqCancel:
- default:
- return false
- }
- return true
- })
- }
select {
case <-cs.donec:
return nil
@@ -8800,24 +8516,7 @@ func (cc *http2ClientConn) roundTrip(req *Request, streamf func(*http2clientStre
return err
}
- if streamf != nil {
- streamf(cs)
- }
-
for {
- if cc.syncHooks != nil {
- cc.syncHooks.blockUntil(func() bool {
- select {
- case <-cs.respHeaderRecv:
- case <-cs.abort:
- case <-ctx.Done():
- case <-cs.reqCancel:
- default:
- return false
- }
- return true
- })
- }
select {
case <-cs.respHeaderRecv:
return handleResponseHeaders()
@@ -8847,8 +8546,9 @@ func (cc *http2ClientConn) roundTrip(req *Request, streamf func(*http2clientStre
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *http2clientStream) doRequest(req *Request) {
- err := cs.writeRequest(req)
+func (cs *http2clientStream) doRequest(req *Request, streamf func(*http2clientStream)) {
+ cs.cc.t.markNewGoroutine()
+ err := cs.writeRequest(req, streamf)
cs.cleanupWriteRequest(err)
}
@@ -8859,7 +8559,7 @@ func (cs *http2clientStream) doRequest(req *Request) {
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *http2clientStream) writeRequest(req *Request) (err error) {
+func (cs *http2clientStream) writeRequest(req *Request, streamf func(*http2clientStream)) (err error) {
cc := cs.cc
ctx := cs.ctx
@@ -8873,21 +8573,6 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) {
if cc.reqHeaderMu == nil {
panic("RoundTrip on uninitialized ClientConn") // for tests
}
- var newStreamHook func(*http2clientStream)
- if cc.syncHooks != nil {
- newStreamHook = cc.syncHooks.newstream
- cc.syncHooks.blockUntil(func() bool {
- select {
- case cc.reqHeaderMu <- struct{}{}:
- <-cc.reqHeaderMu
- case <-cs.reqCancel:
- case <-ctx.Done():
- default:
- return false
- }
- return true
- })
- }
select {
case cc.reqHeaderMu <- struct{}{}:
case <-cs.reqCancel:
@@ -8912,28 +8597,8 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) {
}
cc.mu.Unlock()
- if newStreamHook != nil {
- newStreamHook(cs)
- }
-
- // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
- if !cc.t.disableCompression() &&
- req.Header.Get("Accept-Encoding") == "" &&
- req.Header.Get("Range") == "" &&
- !cs.isHead {
- // Request gzip only, not deflate. Deflate is ambiguous and
- // not as universally supported anyway.
- // See: https://zlib.net/zlib_faq.html#faq39
- //
- // Note that we don't request this for HEAD requests,
- // due to a bug in nginx:
- // http://trac.nginx.org/nginx/ticket/358
- // https://golang.org/issue/5522
- //
- // We don't request gzip if the request is for a range, since
- // auto-decoding a portion of a gzipped document will just fail
- // anyway. See https://golang.org/issue/8923
- cs.requestedGzip = true
+ if streamf != nil {
+ streamf(cs)
}
continueTimeout := cc.t.expectContinueTimeout()
@@ -8996,7 +8661,7 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) {
var respHeaderTimer <-chan time.Time
var respHeaderRecv chan struct{}
if d := cc.responseHeaderTimeout(); d != 0 {
- timer := cc.newTimer(d)
+ timer := cc.t.newTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C()
respHeaderRecv = cs.respHeaderRecv
@@ -9005,21 +8670,6 @@ func (cs *http2clientStream) writeRequest(req *Request) (err error) {
// or until the request is aborted (via context, error, or otherwise),
// whichever comes first.
for {
- if cc.syncHooks != nil {
- cc.syncHooks.blockUntil(func() bool {
- select {
- case <-cs.peerClosed:
- case <-respHeaderTimer:
- case <-respHeaderRecv:
- case <-cs.abort:
- case <-ctx.Done():
- case <-cs.reqCancel:
- default:
- return false
- }
- return true
- })
- }
select {
case <-cs.peerClosed:
return nil
@@ -9168,7 +8818,7 @@ func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) e
return nil
}
cc.pendingRequests++
- cc.condWait()
+ cc.cond.Wait()
cc.pendingRequests--
select {
case <-cs.abort:
@@ -9431,7 +9081,7 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er
cs.flow.take(take)
return take, nil
}
- cc.condWait()
+ cc.cond.Wait()
}
}
@@ -9714,7 +9364,7 @@ func (cc *http2ClientConn) forgetStreamID(id uint32) {
}
// Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
- cc.condBroadcast()
+ cc.cond.Broadcast()
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
@@ -9736,6 +9386,7 @@ type http2clientConnReadLoop struct {
// readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *http2ClientConn) readLoop() {
+ cc.t.markNewGoroutine()
rl := &http2clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
@@ -9802,7 +9453,7 @@ func (rl *http2clientConnReadLoop) cleanup() {
cs.abortStreamLocked(err)
}
}
- cc.condBroadcast()
+ cc.cond.Broadcast()
cc.mu.Unlock()
}
@@ -9839,7 +9490,7 @@ func (rl *http2clientConnReadLoop) run() error {
readIdleTimeout := cc.t.ReadIdleTimeout
var t http2timer
if readIdleTimeout != 0 {
- t = cc.afterFunc(readIdleTimeout, cc.healthCheck)
+ t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
}
for {
f, err := cc.fr.ReadFrame()
@@ -10437,7 +10088,7 @@ func (rl *http2clientConnReadLoop) processSettingsNoWrite(f *http2SettingsFrame)
for _, cs := range cc.streams {
cs.flow.add(delta)
}
- cc.condBroadcast()
+ cc.cond.Broadcast()
cc.initialWindowSize = s.Val
case http2SettingHeaderTableSize:
@@ -10492,7 +10143,7 @@ func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame
return http2ConnectionError(http2ErrCodeFlowControl)
}
- cc.condBroadcast()
+ cc.cond.Broadcast()
return nil
}
@@ -10536,7 +10187,8 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error {
}
var pingError error
errc := make(chan struct{})
- cc.goRun(func() {
+ go func() {
+ cc.t.markNewGoroutine()
cc.wmu.Lock()
defer cc.wmu.Unlock()
if pingError = cc.fr.WritePing(false, p); pingError != nil {
@@ -10547,20 +10199,7 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error {
close(errc)
return
}
- })
- if cc.syncHooks != nil {
- cc.syncHooks.blockUntil(func() bool {
- select {
- case <-c:
- case <-errc:
- case <-ctx.Done():
- case <-cc.readerDone:
- default:
- return false
- }
- return true
- })
- }
+ }()
select {
case <-c:
return nil
@@ -11874,8 +11513,8 @@ func (ws *http2priorityWriteScheduler) addClosedOrIdleNode(list *[]*http2priorit
}
func (ws *http2priorityWriteScheduler) removeNode(n *http2priorityNode) {
- for k := n.kids; k != nil; k = k.next {
- k.setParent(n.parent)
+ for n.kids != nil {
+ n.kids.setParent(n.parent)
}
n.setParent(nil)
delete(ws.nodes, n.id)