diff options
| author | Dmitri Shuralyov <dmitshur@golang.org> | 2024-11-21 19:54:12 -0500 |
|---|---|---|
| committer | Gopher Robot <gobot@golang.org> | 2024-11-22 01:35:05 +0000 |
| commit | ca14eaf77c86bd5492329d2be6f1a82afe7802f5 (patch) | |
| tree | 1b6daab038ea1094e63584a460aebbec56491b30 /src/net/http | |
| parent | e8d95619978c4602d4446f113b3b69b7a22308fa (diff) | |
| download | go-ca14eaf77c86bd5492329d2be6f1a82afe7802f5.tar.xz | |
all: update golang.org/x/net [generated]
A part of the keeping Go's vendored dependencies and generated code
up to date.
This updates h2_bundle.go with unencrypted HTTP/2 support.
For #36905.
For #67816.
[git-generate]
cd src
go get golang.org/x/net@v0.31.0
go mod tidy
go mod vendor
cd cmd
go get golang.org/x/net@v0.31.0
go mod tidy
go mod vendor
go generate -run=bundle std
Change-Id: I2b77f651b990f260fbe7d551c7a819518f1c983f
Reviewed-on: https://go-review.googlesource.com/c/go/+/631035
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Auto-Submit: Dmitri Shuralyov <dmitshur@golang.org>
Diffstat (limited to 'src/net/http')
| -rw-r--r-- | src/net/http/h2_bundle.go | 849 |
1 files changed, 633 insertions, 216 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index af86c1430d..6b40923a86 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -883,7 +883,7 @@ func (c *http2dialCall) dial(ctx context.Context, addr string) { // This code decides which ones live or die. // The return value used is whether c was used. // c is never closed. -func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) { +func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c net.Conn) (used bool, err error) { p.mu.Lock() for _, cc := range p.conns[key] { if cc.CanTakeNewRequest() { @@ -919,8 +919,8 @@ type http2addConnCall struct { err error } -func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) { - cc, err := t.NewClientConn(tc) +func (c *http2addConnCall) run(t *http2Transport, key string, nc net.Conn) { + cc, err := t.NewClientConn(nc) p := c.p p.mu.Lock() @@ -1035,6 +1035,169 @@ func http2shouldRetryDial(call *http2dialCall, req *Request) bool { return call.ctx.Err() != nil } +// http2Config is a package-internal version of net/http.HTTP2Config. +// +// http.HTTP2Config was added in Go 1.24. +// When running with a version of net/http that includes HTTP2Config, +// we merge the configuration with the fields in Transport or Server +// to produce an http2Config. +// +// Zero valued fields in http2Config are interpreted as in the +// net/http.HTTPConfig documentation. +// +// Precedence order for reconciling configurations is: +// +// - Use the net/http.{Server,Transport}.HTTP2Config value, when non-zero. +// - Otherwise use the http2.{Server.Transport} value. +// - If the resulting value is zero or out of range, use a default. +type http2http2Config struct { + MaxConcurrentStreams uint32 + MaxDecoderHeaderTableSize uint32 + MaxEncoderHeaderTableSize uint32 + MaxReadFrameSize uint32 + MaxUploadBufferPerConnection int32 + MaxUploadBufferPerStream int32 + SendPingTimeout time.Duration + PingTimeout time.Duration + WriteByteTimeout time.Duration + PermitProhibitedCipherSuites bool + CountError func(errType string) +} + +// configFromServer merges configuration settings from +// net/http.Server.HTTP2Config and http2.Server. +func http2configFromServer(h1 *Server, h2 *http2Server) http2http2Config { + conf := http2http2Config{ + MaxConcurrentStreams: h2.MaxConcurrentStreams, + MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize, + MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize, + MaxReadFrameSize: h2.MaxReadFrameSize, + MaxUploadBufferPerConnection: h2.MaxUploadBufferPerConnection, + MaxUploadBufferPerStream: h2.MaxUploadBufferPerStream, + SendPingTimeout: h2.ReadIdleTimeout, + PingTimeout: h2.PingTimeout, + WriteByteTimeout: h2.WriteByteTimeout, + PermitProhibitedCipherSuites: h2.PermitProhibitedCipherSuites, + CountError: h2.CountError, + } + http2fillNetHTTPServerConfig(&conf, h1) + http2setConfigDefaults(&conf, true) + return conf +} + +// configFromServer merges configuration settings from h2 and h2.t1.HTTP2 +// (the net/http Transport). +func http2configFromTransport(h2 *http2Transport) http2http2Config { + conf := http2http2Config{ + MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize, + MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize, + MaxReadFrameSize: h2.MaxReadFrameSize, + SendPingTimeout: h2.ReadIdleTimeout, + PingTimeout: h2.PingTimeout, + WriteByteTimeout: h2.WriteByteTimeout, + } + + // Unlike most config fields, where out-of-range values revert to the default, + // Transport.MaxReadFrameSize clips. + if conf.MaxReadFrameSize < http2minMaxFrameSize { + conf.MaxReadFrameSize = http2minMaxFrameSize + } else if conf.MaxReadFrameSize > http2maxFrameSize { + conf.MaxReadFrameSize = http2maxFrameSize + } + + if h2.t1 != nil { + http2fillNetHTTPTransportConfig(&conf, h2.t1) + } + http2setConfigDefaults(&conf, false) + return conf +} + +func http2setDefault[T ~int | ~int32 | ~uint32 | ~int64](v *T, minval, maxval, defval T) { + if *v < minval || *v > maxval { + *v = defval + } +} + +func http2setConfigDefaults(conf *http2http2Config, server bool) { + http2setDefault(&conf.MaxConcurrentStreams, 1, math.MaxUint32, http2defaultMaxStreams) + http2setDefault(&conf.MaxEncoderHeaderTableSize, 1, math.MaxUint32, http2initialHeaderTableSize) + http2setDefault(&conf.MaxDecoderHeaderTableSize, 1, math.MaxUint32, http2initialHeaderTableSize) + if server { + http2setDefault(&conf.MaxUploadBufferPerConnection, http2initialWindowSize, math.MaxInt32, 1<<20) + } else { + http2setDefault(&conf.MaxUploadBufferPerConnection, http2initialWindowSize, math.MaxInt32, http2transportDefaultConnFlow) + } + if server { + http2setDefault(&conf.MaxUploadBufferPerStream, 1, math.MaxInt32, 1<<20) + } else { + http2setDefault(&conf.MaxUploadBufferPerStream, 1, math.MaxInt32, http2transportDefaultStreamFlow) + } + http2setDefault(&conf.MaxReadFrameSize, http2minMaxFrameSize, http2maxFrameSize, http2defaultMaxReadFrameSize) + http2setDefault(&conf.PingTimeout, 1, math.MaxInt64, 15*time.Second) +} + +// adjustHTTP1MaxHeaderSize converts a limit in bytes on the size of an HTTP/1 header +// to an HTTP/2 MAX_HEADER_LIST_SIZE value. +func http2adjustHTTP1MaxHeaderSize(n int64) int64 { + // http2's count is in a slightly different unit and includes 32 bytes per pair. + // So, take the net/http.Server value and pad it up a bit, assuming 10 headers. + const perFieldOverhead = 32 // per http2 spec + const typicalHeaders = 10 // conservative + return n + typicalHeaders*perFieldOverhead +} + +// fillNetHTTPServerConfig sets fields in conf from srv.HTTP2. +func http2fillNetHTTPServerConfig(conf *http2http2Config, srv *Server) { + http2fillNetHTTPConfig(conf, srv.HTTP2) +} + +// fillNetHTTPServerConfig sets fields in conf from tr.HTTP2. +func http2fillNetHTTPTransportConfig(conf *http2http2Config, tr *Transport) { + http2fillNetHTTPConfig(conf, tr.HTTP2) +} + +func http2fillNetHTTPConfig(conf *http2http2Config, h2 *HTTP2Config) { + if h2 == nil { + return + } + if h2.MaxConcurrentStreams != 0 { + conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams) + } + if h2.MaxEncoderHeaderTableSize != 0 { + conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize) + } + if h2.MaxDecoderHeaderTableSize != 0 { + conf.MaxDecoderHeaderTableSize = uint32(h2.MaxDecoderHeaderTableSize) + } + if h2.MaxConcurrentStreams != 0 { + conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams) + } + if h2.MaxReadFrameSize != 0 { + conf.MaxReadFrameSize = uint32(h2.MaxReadFrameSize) + } + if h2.MaxReceiveBufferPerConnection != 0 { + conf.MaxUploadBufferPerConnection = int32(h2.MaxReceiveBufferPerConnection) + } + if h2.MaxReceiveBufferPerStream != 0 { + conf.MaxUploadBufferPerStream = int32(h2.MaxReceiveBufferPerStream) + } + if h2.SendPingTimeout != 0 { + conf.SendPingTimeout = h2.SendPingTimeout + } + if h2.PingTimeout != 0 { + conf.PingTimeout = h2.PingTimeout + } + if h2.WriteByteTimeout != 0 { + conf.WriteByteTimeout = h2.WriteByteTimeout + } + if h2.PermitProhibitedCipherSuites { + conf.PermitProhibitedCipherSuites = true + } + if h2.CountError != nil { + conf.CountError = h2.CountError + } +} + // Buffer chunks are allocated from a pool to reduce pressure on GC. // The maximum wasted space per dataBuffer is 2x the largest size class, // which happens when the dataBuffer has multiple chunks and there is @@ -3550,13 +3713,19 @@ func (cw http2closeWaiter) Wait() { // Its buffered writer is lazily allocated as needed, to minimize // idle memory usage with many connections. type http2bufferedWriter struct { - _ http2incomparable - w io.Writer // immutable - bw *bufio.Writer // non-nil when data is buffered + _ http2incomparable + group http2synctestGroupInterface // immutable + conn net.Conn // immutable + bw *bufio.Writer // non-nil when data is buffered + byteTimeout time.Duration // immutable, WriteByteTimeout } -func http2newBufferedWriter(w io.Writer) *http2bufferedWriter { - return &http2bufferedWriter{w: w} +func http2newBufferedWriter(group http2synctestGroupInterface, conn net.Conn, timeout time.Duration) *http2bufferedWriter { + return &http2bufferedWriter{ + group: group, + conn: conn, + byteTimeout: timeout, + } } // bufWriterPoolBufferSize is the size of bufio.Writer's @@ -3583,7 +3752,7 @@ func (w *http2bufferedWriter) Available() int { func (w *http2bufferedWriter) Write(p []byte) (n int, err error) { if w.bw == nil { bw := http2bufWriterPool.Get().(*bufio.Writer) - bw.Reset(w.w) + bw.Reset((*http2bufferedWriterTimeoutWriter)(w)) w.bw = bw } return w.bw.Write(p) @@ -3601,6 +3770,38 @@ func (w *http2bufferedWriter) Flush() error { return err } +type http2bufferedWriterTimeoutWriter http2bufferedWriter + +func (w *http2bufferedWriterTimeoutWriter) Write(p []byte) (n int, err error) { + return http2writeWithByteTimeout(w.group, w.conn, w.byteTimeout, p) +} + +// writeWithByteTimeout writes to conn. +// If more than timeout passes without any bytes being written to the connection, +// the write fails. +func http2writeWithByteTimeout(group http2synctestGroupInterface, conn net.Conn, timeout time.Duration, p []byte) (n int, err error) { + if timeout <= 0 { + return conn.Write(p) + } + for { + var now time.Time + if group == nil { + now = time.Now() + } else { + now = group.Now() + } + conn.SetWriteDeadline(now.Add(timeout)) + nn, err := conn.Write(p[n:]) + n += nn + if n == len(p) || nn == 0 || !errors.Is(err, os.ErrDeadlineExceeded) { + // Either we finished the write, made no progress, or hit the deadline. + // Whichever it is, we're done now. + conn.SetWriteDeadline(time.Time{}) + return n, err + } + } +} + func http2mustUint31(v int32) uint32 { if v < 0 || v > 2147483647 { panic("out of range") @@ -3882,10 +4083,14 @@ func (p *http2pipe) Done() <-chan struct{} { } const ( - http2prefaceTimeout = 10 * time.Second - http2firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway - http2handlerChunkWriteSize = 4 << 10 - http2defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to? + http2prefaceTimeout = 10 * time.Second + http2firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway + http2handlerChunkWriteSize = 4 << 10 + http2defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to? + + // maxQueuedControlFrames is the maximum number of control frames like + // SETTINGS, PING and RST_STREAM that will be queued for writing before + // the connection is closed to prevent memory exhaustion attacks. http2maxQueuedControlFrames = 10000 ) @@ -3957,6 +4162,22 @@ type http2Server struct { // If zero or negative, there is no timeout. IdleTimeout time.Duration + // ReadIdleTimeout is the timeout after which a health check using a ping + // frame will be carried out if no frame is received on the connection. + // If zero, no health check is performed. + ReadIdleTimeout time.Duration + + // PingTimeout is the timeout after which the connection will be closed + // if a response to a ping is not received. + // If zero, a default of 15 seconds is used. + PingTimeout time.Duration + + // WriteByteTimeout is the timeout after which a connection will be + // closed if no data can be written to it. The timeout begins when data is + // available to write, and is extended whenever any bytes are written. + // If zero or negative, there is no timeout. + WriteByteTimeout time.Duration + // MaxUploadBufferPerConnection is the size of the initial flow // control window for each connections. The HTTP/2 spec does not // allow this to be smaller than 65535 or larger than 2^32-1. @@ -4019,57 +4240,6 @@ func (s *http2Server) afterFunc(d time.Duration, f func()) http2timer { return http2timeTimer{time.AfterFunc(d, f)} } -func (s *http2Server) initialConnRecvWindowSize() int32 { - if s.MaxUploadBufferPerConnection >= http2initialWindowSize { - return s.MaxUploadBufferPerConnection - } - return 1 << 20 -} - -func (s *http2Server) initialStreamRecvWindowSize() int32 { - if s.MaxUploadBufferPerStream > 0 { - return s.MaxUploadBufferPerStream - } - return 1 << 20 -} - -func (s *http2Server) maxReadFrameSize() uint32 { - if v := s.MaxReadFrameSize; v >= http2minMaxFrameSize && v <= http2maxFrameSize { - return v - } - return http2defaultMaxReadFrameSize -} - -func (s *http2Server) maxConcurrentStreams() uint32 { - if v := s.MaxConcurrentStreams; v > 0 { - return v - } - return http2defaultMaxStreams -} - -func (s *http2Server) maxDecoderHeaderTableSize() uint32 { - if v := s.MaxDecoderHeaderTableSize; v > 0 { - return v - } - return http2initialHeaderTableSize -} - -func (s *http2Server) maxEncoderHeaderTableSize() uint32 { - if v := s.MaxEncoderHeaderTableSize; v > 0 { - return v - } - return http2initialHeaderTableSize -} - -// maxQueuedControlFrames is the maximum number of control frames like -// SETTINGS, PING and RST_STREAM that will be queued for writing before -// the connection is closed to prevent memory exhaustion attacks. -func (s *http2Server) maxQueuedControlFrames() int { - // TODO: if anybody asks, add a Server field, and remember to define the - // behavior of negative values. - return http2maxQueuedControlFrames -} - type http2serverInternalState struct { mu sync.Mutex activeConns map[*http2serverConn]struct{} @@ -4166,7 +4336,7 @@ func http2ConfigureServer(s *Server, conf *http2Server) error { if s.TLSNextProto == nil { s.TLSNextProto = map[string]func(*Server, *tls.Conn, Handler){} } - protoHandler := func(hs *Server, c *tls.Conn, h Handler) { + protoHandler := func(hs *Server, c net.Conn, h Handler, sawClientPreface bool) { if http2testHookOnConn != nil { http2testHookOnConn() } @@ -4183,12 +4353,31 @@ func http2ConfigureServer(s *Server, conf *http2Server) error { ctx = bc.BaseContext() } conf.ServeConn(c, &http2ServeConnOpts{ - Context: ctx, - Handler: h, - BaseConfig: hs, + Context: ctx, + Handler: h, + BaseConfig: hs, + SawClientPreface: sawClientPreface, }) } - s.TLSNextProto[http2NextProtoTLS] = protoHandler + s.TLSNextProto[http2NextProtoTLS] = func(hs *Server, c *tls.Conn, h Handler) { + protoHandler(hs, c, h, false) + } + // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns. + // + // A connection passed in this method has already had the HTTP/2 preface read from it. + s.TLSNextProto[http2nextProtoUnencryptedHTTP2] = func(hs *Server, c *tls.Conn, h Handler) { + nc, err := http2unencryptedNetConnFromTLSConn(c) + if err != nil { + if lg := hs.ErrorLog; lg != nil { + lg.Print(err) + } else { + log.Print(err) + } + go c.Close() + return + } + protoHandler(hs, nc, h, true) + } return nil } @@ -4270,13 +4459,15 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func( baseCtx, cancel := http2serverConnBaseContext(c, opts) defer cancel() + http1srv := opts.baseConfig() + conf := http2configFromServer(http1srv, s) sc := &http2serverConn{ srv: s, - hs: opts.baseConfig(), + hs: http1srv, conn: c, baseCtx: baseCtx, remoteAddrStr: c.RemoteAddr().String(), - bw: http2newBufferedWriter(c), + bw: http2newBufferedWriter(s.group, c, conf.WriteByteTimeout), handler: opts.handler(), streams: make(map[uint32]*http2stream), readFrameCh: make(chan http2readFrameResult), @@ -4286,9 +4477,12 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func( bodyReadCh: make(chan http2bodyReadMsg), // buffering doesn't matter either way doneServing: make(chan struct{}), clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value" - advMaxStreams: s.maxConcurrentStreams(), + advMaxStreams: conf.MaxConcurrentStreams, initialStreamSendWindowSize: http2initialWindowSize, + initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream, maxFrameSize: http2initialMaxFrameSize, + pingTimeout: conf.PingTimeout, + countErrorFunc: conf.CountError, serveG: http2newGoroutineLock(), pushEnabled: true, sawClientPreface: opts.SawClientPreface, @@ -4321,15 +4515,15 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func( sc.flow.add(http2initialWindowSize) sc.inflow.init(http2initialWindowSize) sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) - sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize()) + sc.hpackEncoder.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize) fr := http2NewFramer(sc.bw, c) - if s.CountError != nil { - fr.countError = s.CountError + if conf.CountError != nil { + fr.countError = conf.CountError } - fr.ReadMetaHeaders = hpack.NewDecoder(s.maxDecoderHeaderTableSize(), nil) + fr.ReadMetaHeaders = hpack.NewDecoder(conf.MaxDecoderHeaderTableSize, nil) fr.MaxHeaderListSize = sc.maxHeaderListSize() - fr.SetMaxReadFrameSize(s.maxReadFrameSize()) + fr.SetMaxReadFrameSize(conf.MaxReadFrameSize) sc.framer = fr if tc, ok := c.(http2connectionStater); ok { @@ -4362,7 +4556,7 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func( // So for now, do nothing here again. } - if !s.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) { + if !conf.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) { // "Endpoints MAY choose to generate a connection error // (Section 5.4.1) of type INADEQUATE_SECURITY if one of // the prohibited cipher suites are negotiated." @@ -4399,7 +4593,7 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func( opts.UpgradeRequest = nil } - sc.serve() + sc.serve(conf) } func http2serverConnBaseContext(c net.Conn, opts *http2ServeConnOpts) (ctx context.Context, cancel func()) { @@ -4439,6 +4633,7 @@ type http2serverConn struct { tlsState *tls.ConnectionState // shared by all handlers, like net/http remoteAddrStr string writeSched http2WriteScheduler + countErrorFunc func(errType string) // Everything following is owned by the serve loop; use serveG.check(): serveG http2goroutineLock // used to verify funcs are on serve() @@ -4458,6 +4653,7 @@ type http2serverConn struct { streams map[uint32]*http2stream unstartedHandlers []http2unstartedHandler initialStreamSendWindowSize int32 + initialStreamRecvWindowSize int32 maxFrameSize int32 peerMaxHeaderListSize uint32 // zero means unknown (default) canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case @@ -4468,9 +4664,14 @@ type http2serverConn struct { inGoAway bool // we've started to or sent GOAWAY inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop needToSendGoAway bool // we need to schedule a GOAWAY frame write + pingSent bool + sentPingData [8]byte goAwayCode http2ErrCode shutdownTimer http2timer // nil until used idleTimer http2timer // nil if unused + readIdleTimeout time.Duration + pingTimeout time.Duration + readIdleTimer http2timer // nil if unused // Owned by the writeFrameAsync goroutine: headerWriteBuf bytes.Buffer @@ -4485,11 +4686,7 @@ func (sc *http2serverConn) maxHeaderListSize() uint32 { if n <= 0 { n = DefaultMaxHeaderBytes } - // http2's count is in a slightly different unit and includes 32 bytes per pair. - // So, take the net/http.Server value and pad it up a bit, assuming 10 headers. - const perFieldOverhead = 32 // per http2 spec - const typicalHeaders = 10 // conservative - return uint32(n + typicalHeaders*perFieldOverhead) + return uint32(http2adjustHTTP1MaxHeaderSize(int64(n))) } func (sc *http2serverConn) curOpenStreams() uint32 { @@ -4756,7 +4953,7 @@ func (sc *http2serverConn) notePanic() { } } -func (sc *http2serverConn) serve() { +func (sc *http2serverConn) serve(conf http2http2Config) { sc.serveG.check() defer sc.notePanic() defer sc.conn.Close() @@ -4770,18 +4967,18 @@ func (sc *http2serverConn) serve() { sc.writeFrame(http2FrameWriteRequest{ write: http2writeSettings{ - {http2SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, + {http2SettingMaxFrameSize, conf.MaxReadFrameSize}, {http2SettingMaxConcurrentStreams, sc.advMaxStreams}, {http2SettingMaxHeaderListSize, sc.maxHeaderListSize()}, - {http2SettingHeaderTableSize, sc.srv.maxDecoderHeaderTableSize()}, - {http2SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())}, + {http2SettingHeaderTableSize, conf.MaxDecoderHeaderTableSize}, + {http2SettingInitialWindowSize, uint32(sc.initialStreamRecvWindowSize)}, }, }) sc.unackedSettings++ // Each connection starts with initialWindowSize inflow tokens. // If a higher value is configured, we add more tokens. - if diff := sc.srv.initialConnRecvWindowSize() - http2initialWindowSize; diff > 0 { + if diff := conf.MaxUploadBufferPerConnection - http2initialWindowSize; diff > 0 { sc.sendWindowUpdate(nil, int(diff)) } @@ -4801,11 +4998,18 @@ func (sc *http2serverConn) serve() { defer sc.idleTimer.Stop() } + if conf.SendPingTimeout > 0 { + sc.readIdleTimeout = conf.SendPingTimeout + sc.readIdleTimer = sc.srv.afterFunc(conf.SendPingTimeout, sc.onReadIdleTimer) + defer sc.readIdleTimer.Stop() + } + go sc.readFrames() // closed by defer sc.conn.Close above settingsTimer := sc.srv.afterFunc(http2firstSettingsTimeout, sc.onSettingsTimer) defer settingsTimer.Stop() + lastFrameTime := sc.srv.now() loopNum := 0 for { loopNum++ @@ -4819,6 +5023,7 @@ func (sc *http2serverConn) serve() { case res := <-sc.wroteFrameCh: sc.wroteFrame(res) case res := <-sc.readFrameCh: + lastFrameTime = sc.srv.now() // Process any written frames before reading new frames from the client since a // written frame could have triggered a new stream to be started. if sc.writingFrameAsync { @@ -4850,6 +5055,8 @@ func (sc *http2serverConn) serve() { case http2idleTimerMsg: sc.vlogf("connection is idle") sc.goAway(http2ErrCodeNo) + case http2readIdleTimerMsg: + sc.handlePingTimer(lastFrameTime) case http2shutdownTimerMsg: sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) return @@ -4872,7 +5079,7 @@ func (sc *http2serverConn) serve() { // If the peer is causing us to generate a lot of control frames, // but not reading them from us, assume they are trying to make us // run out of memory. - if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() { + if sc.queuedControlFrames > http2maxQueuedControlFrames { sc.vlogf("http2: too many control frames in send queue, closing connection") return } @@ -4888,12 +5095,39 @@ func (sc *http2serverConn) serve() { } } +func (sc *http2serverConn) handlePingTimer(lastFrameReadTime time.Time) { + if sc.pingSent { + sc.vlogf("timeout waiting for PING response") + sc.conn.Close() + return + } + + pingAt := lastFrameReadTime.Add(sc.readIdleTimeout) + now := sc.srv.now() + if pingAt.After(now) { + // We received frames since arming the ping timer. + // Reset it for the next possible timeout. + sc.readIdleTimer.Reset(pingAt.Sub(now)) + return + } + + sc.pingSent = true + // Ignore crypto/rand.Read errors: It generally can't fail, and worse case if it does + // is we send a PING frame containing 0s. + _, _ = rand.Read(sc.sentPingData[:]) + sc.writeFrame(http2FrameWriteRequest{ + write: &http2writePing{data: sc.sentPingData}, + }) + sc.readIdleTimer.Reset(sc.pingTimeout) +} + type http2serverMessage int // Message values sent to serveMsgCh. var ( http2settingsTimerMsg = new(http2serverMessage) http2idleTimerMsg = new(http2serverMessage) + http2readIdleTimerMsg = new(http2serverMessage) http2shutdownTimerMsg = new(http2serverMessage) http2gracefulShutdownMsg = new(http2serverMessage) http2handlerDoneMsg = new(http2serverMessage) @@ -4903,6 +5137,8 @@ func (sc *http2serverConn) onSettingsTimer() { sc.sendServeMsg(http2settingsTime func (sc *http2serverConn) onIdleTimer() { sc.sendServeMsg(http2idleTimerMsg) } +func (sc *http2serverConn) onReadIdleTimer() { sc.sendServeMsg(http2readIdleTimerMsg) } + func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) } func (sc *http2serverConn) sendServeMsg(msg interface{}) { @@ -5155,6 +5391,10 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) { sc.writingFrame = false sc.writingFrameAsync = false + if res.err != nil { + sc.conn.Close() + } + wr := res.wr if http2writeEndsStream(wr.write) { @@ -5429,6 +5669,11 @@ func (sc *http2serverConn) processFrame(f http2Frame) error { func (sc *http2serverConn) processPing(f *http2PingFrame) error { sc.serveG.check() if f.IsAck() { + if sc.pingSent && sc.sentPingData == f.Data { + // This is a response to a PING we sent. + sc.pingSent = false + sc.readIdleTimer.Reset(sc.readIdleTimeout) + } // 6.7 PING: " An endpoint MUST NOT respond to PING frames // containing this flag." return nil @@ -5995,7 +6240,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState st.cw.Init() st.flow.conn = &sc.flow // link to conn-level counter st.flow.add(sc.initialStreamSendWindowSize) - st.inflow.init(sc.srv.initialStreamRecvWindowSize()) + st.inflow.init(sc.initialStreamRecvWindowSize) if sc.hs.WriteTimeout > 0 { st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) } @@ -6690,6 +6935,11 @@ func (w *http2responseWriter) SetWriteDeadline(deadline time.Time) error { return nil } +func (w *http2responseWriter) EnableFullDuplex() error { + // We always support full duplex responses, so this is a no-op. + return nil +} + func (w *http2responseWriter) Flush() { w.FlushError() } @@ -7136,7 +7386,7 @@ func (sc *http2serverConn) countError(name string, err error) error { if sc == nil || sc.srv == nil { return err } - f := sc.srv.CountError + f := sc.countErrorFunc if f == nil { return err } @@ -7339,6 +7589,20 @@ func (t *http2Transport) markNewGoroutine() { } } +func (t *http2Transport) now() time.Time { + if t != nil && t.http2transportTestHooks != nil { + return t.http2transportTestHooks.group.Now() + } + return time.Now() +} + +func (t *http2Transport) timeSince(when time.Time) time.Duration { + if t != nil && t.http2transportTestHooks != nil { + return t.now().Sub(when) + } + return time.Since(when) +} + // newTimer creates a new time.Timer, or a synthetic timer in tests. func (t *http2Transport) newTimer(d time.Duration) http2timer { if t.http2transportTestHooks != nil { @@ -7363,40 +7627,26 @@ func (t *http2Transport) contextWithTimeout(ctx context.Context, d time.Duration } func (t *http2Transport) maxHeaderListSize() uint32 { - if t.MaxHeaderListSize == 0 { + n := int64(t.MaxHeaderListSize) + if t.t1 != nil && t.t1.MaxResponseHeaderBytes != 0 { + n = t.t1.MaxResponseHeaderBytes + if n > 0 { + n = http2adjustHTTP1MaxHeaderSize(n) + } + } + if n <= 0 { return 10 << 20 } - if t.MaxHeaderListSize == 0xffffffff { + if n >= 0xffffffff { return 0 } - return t.MaxHeaderListSize -} - -func (t *http2Transport) maxFrameReadSize() uint32 { - if t.MaxReadFrameSize == 0 { - return 0 // use the default provided by the peer - } - if t.MaxReadFrameSize < http2minMaxFrameSize { - return http2minMaxFrameSize - } - if t.MaxReadFrameSize > http2maxFrameSize { - return http2maxFrameSize - } - return t.MaxReadFrameSize + return uint32(n) } func (t *http2Transport) disableCompression() bool { return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression) } -func (t *http2Transport) pingTimeout() time.Duration { - if t.PingTimeout == 0 { - return 15 * time.Second - } - return t.PingTimeout - -} - // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2. // It returns an error if t1 has already been HTTP/2-enabled. // @@ -7432,8 +7682,8 @@ func http2configureTransports(t1 *Transport) (*http2Transport, error) { if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") { t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1") } - upgradeFn := func(authority string, c *tls.Conn) RoundTripper { - addr := http2authorityAddr("https", authority) + upgradeFn := func(scheme, authority string, c net.Conn) RoundTripper { + addr := http2authorityAddr(scheme, authority) if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil { go c.Close() return http2erringRoundTripper{err} @@ -7444,18 +7694,37 @@ func http2configureTransports(t1 *Transport) (*http2Transport, error) { // was unknown) go c.Close() } + if scheme == "http" { + return (*http2unencryptedTransport)(t2) + } return t2 } - if m := t1.TLSNextProto; len(m) == 0 { - t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{ - "h2": upgradeFn, + if t1.TLSNextProto == nil { + t1.TLSNextProto = make(map[string]func(string, *tls.Conn) RoundTripper) + } + t1.TLSNextProto[http2NextProtoTLS] = func(authority string, c *tls.Conn) RoundTripper { + return upgradeFn("https", authority, c) + } + // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns. + t1.TLSNextProto[http2nextProtoUnencryptedHTTP2] = func(authority string, c *tls.Conn) RoundTripper { + nc, err := http2unencryptedNetConnFromTLSConn(c) + if err != nil { + go c.Close() + return http2erringRoundTripper{err} } - } else { - m["h2"] = upgradeFn + return upgradeFn("http", authority, nc) } return t2, nil } +// unencryptedTransport is a Transport with a RoundTrip method that +// always permits http:// URLs. +type http2unencryptedTransport http2Transport + +func (t *http2unencryptedTransport) RoundTrip(req *Request) (*Response, error) { + return (*http2Transport)(t).RoundTripOpt(req, http2RoundTripOpt{allowHTTP: true}) +} + func (t *http2Transport) connPool() http2ClientConnPool { t.connPoolOnce.Do(t.initConnPool) return t.connPoolOrDef @@ -7475,7 +7744,7 @@ type http2ClientConn struct { t *http2Transport tconn net.Conn // usually *tls.Conn, except specialized impls tlsState *tls.ConnectionState // nil only for specialized impls - reused uint32 // whether conn is being reused; atomic + atomicReused uint32 // whether conn is being reused; atomic singleUse bool // whether being used for a single http.Request getConnCalled bool // used by clientConnPool @@ -7506,11 +7775,22 @@ type http2ClientConn struct { lastActive time.Time lastIdle time.Time // time last idle // Settings from peer: (also guarded by wmu) - maxFrameSize uint32 - maxConcurrentStreams uint32 - peerMaxHeaderListSize uint64 - peerMaxHeaderTableSize uint32 - initialWindowSize uint32 + maxFrameSize uint32 + maxConcurrentStreams uint32 + peerMaxHeaderListSize uint64 + peerMaxHeaderTableSize uint32 + initialWindowSize uint32 + initialStreamRecvWindowSize int32 + readIdleTimeout time.Duration + pingTimeout time.Duration + + // pendingResets is the number of RST_STREAM frames we have sent to the peer, + // without confirming that the peer has received them. When we send a RST_STREAM, + // we bundle it with a PING frame, unless a PING is already in flight. We count + // the reset stream against the connection's concurrency limit until we get + // a PING response. This limits the number of requests we'll try to send to a + // completely unresponsive connection. + pendingResets int // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests. // Write to reqHeaderMu to lock it, read from it to unlock. @@ -7568,12 +7848,12 @@ type http2clientStream struct { sentHeaders bool // owned by clientConnReadLoop: - firstByte bool // got the first response byte - pastHeaders bool // got first MetaHeadersFrame (actual headers) - pastTrailers bool // got optional second MetaHeadersFrame (trailers) - num1xx uint8 // number of 1xx responses seen - readClosed bool // peer sent an END_STREAM flag - readAborted bool // read loop reset the stream + firstByte bool // got the first response byte + pastHeaders bool // got first MetaHeadersFrame (actual headers) + pastTrailers bool // got optional second MetaHeadersFrame (trailers) + readClosed bool // peer sent an END_STREAM flag + readAborted bool // read loop reset the stream + totalHeaderSize int64 // total size of 1xx headers seen trailer Header // accumulated trailers resTrailer *Header // client's Response.Trailer @@ -7635,6 +7915,7 @@ func (cs *http2clientStream) closeReqBodyLocked() { } type http2stickyErrWriter struct { + group http2synctestGroupInterface conn net.Conn timeout time.Duration err *error @@ -7644,22 +7925,9 @@ func (sew http2stickyErrWriter) Write(p []byte) (n int, err error) { if *sew.err != nil { return 0, *sew.err } - for { - if sew.timeout != 0 { - sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout)) - } - nn, err := sew.conn.Write(p[n:]) - n += nn - if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) { - // Keep extending the deadline so long as we're making progress. - continue - } - if sew.timeout != 0 { - sew.conn.SetWriteDeadline(time.Time{}) - } - *sew.err = err - return n, err - } + n, err = http2writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p) + *sew.err = err + return n, err } // noCachedConnError is the concrete type of ErrNoCachedConn, which @@ -7691,6 +7959,8 @@ type http2RoundTripOpt struct { // no cached connection is available, RoundTripOpt // will return ErrNoCachedConn. OnlyCachedConn bool + + allowHTTP bool // allow http:// URLs } func (t *http2Transport) RoundTrip(req *Request) (*Response, error) { @@ -7723,7 +7993,14 @@ func http2authorityAddr(scheme string, authority string) (addr string) { // RoundTripOpt is like RoundTrip, but takes options. func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Response, error) { - if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) { + switch req.URL.Scheme { + case "https": + // Always okay. + case "http": + if !t.AllowHTTP && !opt.allowHTTP { + return nil, errors.New("http2: unencrypted HTTP/2 not enabled") + } + default: return nil, errors.New("http2: unsupported scheme") } @@ -7734,7 +8011,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) return nil, err } - reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1) + reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1) http2traceGotConn(req, cc, reused) res, err := cc.RoundTrip(req) if err != nil && retry <= 6 { @@ -7759,6 +8036,22 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res } } } + if err == http2errClientConnNotEstablished { + // This ClientConn was created recently, + // this is the first request to use it, + // and the connection is closed and not usable. + // + // In this state, cc.idleTimer will remove the conn from the pool + // when it fires. Stop the timer and remove it here so future requests + // won't try to use this connection. + // + // If the timer has already fired and we're racing it, the redundant + // call to MarkDead is harmless. + if cc.idleTimer != nil { + cc.idleTimer.Stop() + } + t.connPool().MarkDead(cc) + } if err != nil { t.vlogf("RoundTrip failure: %v", err) return nil, err @@ -7777,9 +8070,10 @@ func (t *http2Transport) CloseIdleConnections() { } var ( - http2errClientConnClosed = errors.New("http2: client conn is closed") - http2errClientConnUnusable = errors.New("http2: client conn not usable") - http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") + http2errClientConnClosed = errors.New("http2: client conn is closed") + http2errClientConnUnusable = errors.New("http2: client conn not usable") + http2errClientConnNotEstablished = errors.New("http2: client conn could not be established") + http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") ) // shouldRetryRequest is called by RoundTrip when a request fails to get @@ -7895,44 +8189,37 @@ func (t *http2Transport) expectContinueTimeout() time.Duration { return t.t1.ExpectContinueTimeout } -func (t *http2Transport) maxDecoderHeaderTableSize() uint32 { - if v := t.MaxDecoderHeaderTableSize; v > 0 { - return v - } - return http2initialHeaderTableSize -} - -func (t *http2Transport) maxEncoderHeaderTableSize() uint32 { - if v := t.MaxEncoderHeaderTableSize; v > 0 { - return v - } - return http2initialHeaderTableSize -} - func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { return t.newClientConn(c, t.disableKeepAlives()) } func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) { + conf := http2configFromTransport(t) cc := &http2ClientConn{ - t: t, - tconn: c, - readerDone: make(chan struct{}), - nextStreamID: 1, - maxFrameSize: 16 << 10, // spec default - initialWindowSize: 65535, // spec default - maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings. - peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. - streams: make(map[uint32]*http2clientStream), - singleUse: singleUse, - wantSettingsAck: true, - pings: make(map[[8]byte]chan struct{}), - reqHeaderMu: make(chan struct{}, 1), + t: t, + tconn: c, + readerDone: make(chan struct{}), + nextStreamID: 1, + maxFrameSize: 16 << 10, // spec default + initialWindowSize: 65535, // spec default + initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream, + maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings. + peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. + streams: make(map[uint32]*http2clientStream), + singleUse: singleUse, + wantSettingsAck: true, + readIdleTimeout: conf.SendPingTimeout, + pingTimeout: conf.PingTimeout, + pings: make(map[[8]byte]chan struct{}), + reqHeaderMu: make(chan struct{}, 1), + lastActive: t.now(), } + var group http2synctestGroupInterface if t.http2transportTestHooks != nil { t.markNewGoroutine() t.http2transportTestHooks.newclientconn(cc) c = cc.tconn + group = t.group } if http2VerboseLogs { t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) @@ -7944,24 +8231,23 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client // TODO: adjust this writer size to account for frame size + // MTU + crypto/tls record padding. cc.bw = bufio.NewWriter(http2stickyErrWriter{ + group: group, conn: c, - timeout: t.WriteByteTimeout, + timeout: conf.WriteByteTimeout, err: &cc.werr, }) cc.br = bufio.NewReader(c) cc.fr = http2NewFramer(cc.bw, cc.br) - if t.maxFrameReadSize() != 0 { - cc.fr.SetMaxReadFrameSize(t.maxFrameReadSize()) - } + cc.fr.SetMaxReadFrameSize(conf.MaxReadFrameSize) if t.CountError != nil { cc.fr.countError = t.CountError } - maxHeaderTableSize := t.maxDecoderHeaderTableSize() + maxHeaderTableSize := conf.MaxDecoderHeaderTableSize cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil) cc.fr.MaxHeaderListSize = t.maxHeaderListSize() cc.henc = hpack.NewEncoder(&cc.hbuf) - cc.henc.SetMaxDynamicTableSizeLimit(t.maxEncoderHeaderTableSize()) + cc.henc.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize) cc.peerMaxHeaderTableSize = http2initialHeaderTableSize if cs, ok := c.(http2connectionStater); ok { @@ -7971,11 +8257,9 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client initialSettings := []http2Setting{ {ID: http2SettingEnablePush, Val: 0}, - {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow}, - } - if max := t.maxFrameReadSize(); max != 0 { - initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxFrameSize, Val: max}) + {ID: http2SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)}, } + initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxFrameSize, Val: conf.MaxReadFrameSize}) if max := t.maxHeaderListSize(); max != 0 { initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max}) } @@ -7985,8 +8269,8 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client cc.bw.Write(http2clientPreface) cc.fr.WriteSettings(initialSettings...) - cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow) - cc.inflow.init(http2transportDefaultConnFlow + http2initialWindowSize) + cc.fr.WriteWindowUpdate(0, uint32(conf.MaxUploadBufferPerConnection)) + cc.inflow.init(conf.MaxUploadBufferPerConnection + http2initialWindowSize) cc.bw.Flush() if cc.werr != nil { cc.Close() @@ -8004,7 +8288,7 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client } func (cc *http2ClientConn) healthCheck() { - pingTimeout := cc.t.pingTimeout() + pingTimeout := cc.pingTimeout // We don't need to periodically ping in the health check, because the readLoop of ClientConn will // trigger the healthCheck again if there is no frame received. ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout) @@ -8132,7 +8416,7 @@ func (cc *http2ClientConn) State() http2ClientConnState { return http2ClientConnState{ Closed: cc.closed, Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil, - StreamsActive: len(cc.streams), + StreamsActive: len(cc.streams) + cc.pendingResets, StreamsReserved: cc.streamsReserved, StreamsPending: cc.pendingRequests, LastIdle: cc.lastIdle, @@ -8164,16 +8448,38 @@ func (cc *http2ClientConn) idleStateLocked() (st http2clientConnIdleState) { // writing it. maxConcurrentOkay = true } else { - maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams) + // We can take a new request if the total of + // - active streams; + // - reservation slots for new streams; and + // - streams for which we have sent a RST_STREAM and a PING, + // but received no subsequent frame + // is less than the concurrency limit. + maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) } st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && !cc.doNotReuse && int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 && !cc.tooIdleLocked() + + // If this connection has never been used for a request and is closed, + // then let it take a request (which will fail). + // + // This avoids a situation where an error early in a connection's lifetime + // goes unreported. + if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed { + st.canTakeNewRequest = true + } + return } +// currentRequestCountLocked reports the number of concurrency slots currently in use, +// including active streams, reserved slots, and reset streams waiting for acknowledgement. +func (cc *http2ClientConn) currentRequestCountLocked() int { + return len(cc.streams) + cc.streamsReserved + cc.pendingResets +} + func (cc *http2ClientConn) canTakeNewRequestLocked() bool { st := cc.idleStateLocked() return st.canTakeNewRequest @@ -8186,7 +8492,7 @@ func (cc *http2ClientConn) tooIdleLocked() bool { // times are compared based on their wall time. We don't want // to reuse a connection that's been sitting idle during // VM/laptop suspend if monotonic time was also frozen. - return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout + return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout } // onIdleTimeout is called from a time.AfterFunc goroutine. It will @@ -8750,6 +9056,7 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) { cs.reqBodyClosed = make(chan struct{}) } bodyClosed := cs.reqBodyClosed + closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil cc.mu.Unlock() if mustCloseBody { cs.reqBody.Close() @@ -8774,16 +9081,40 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) { if cs.sentHeaders { if se, ok := err.(http2StreamError); ok { if se.Cause != http2errFromPeer { - cc.writeStreamReset(cs.ID, se.Code, err) + cc.writeStreamReset(cs.ID, se.Code, false, err) } } else { - cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err) + // We're cancelling an in-flight request. + // + // This could be due to the server becoming unresponsive. + // To avoid sending too many requests on a dead connection, + // we let the request continue to consume a concurrency slot + // until we can confirm the server is still responding. + // We do this by sending a PING frame along with the RST_STREAM + // (unless a ping is already in flight). + // + // For simplicity, we don't bother tracking the PING payload: + // We reset cc.pendingResets any time we receive a PING ACK. + // + // We skip this if the conn is going to be closed on idle, + // because it's short lived and will probably be closed before + // we get the ping response. + ping := false + if !closeOnIdle { + cc.mu.Lock() + if cc.pendingResets == 0 { + ping = true + } + cc.pendingResets++ + cc.mu.Unlock() + } + cc.writeStreamReset(cs.ID, http2ErrCodeCancel, ping, err) } } cs.bufPipe.CloseWithError(err) // no-op if already closed } else { if cs.sentHeaders && !cs.sentEndStream { - cc.writeStreamReset(cs.ID, http2ErrCodeNo, nil) + cc.writeStreamReset(cs.ID, http2ErrCodeNo, false, nil) } cs.bufPipe.CloseWithError(http2errRequestCanceled) } @@ -8805,12 +9136,17 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) { // Must hold cc.mu. func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) error { for { - cc.lastActive = time.Now() + if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 { + // This is the very first request sent to this connection. + // Return a fatal error which aborts the retry loop. + return http2errClientConnNotEstablished + } + cc.lastActive = cc.t.now() if cc.closed || !cc.canTakeNewRequestLocked() { return http2errClientConnUnusable } cc.lastIdle = time.Time{} - if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) { + if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) { return nil } cc.pendingRequests++ @@ -9337,7 +9673,7 @@ type http2resAndError struct { func (cc *http2ClientConn) addStreamLocked(cs *http2clientStream) { cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) - cs.inflow.init(http2transportDefaultStreamFlow) + cs.inflow.init(cc.initialStreamRecvWindowSize) cs.ID = cc.nextStreamID cc.nextStreamID += 2 cc.streams[cs.ID] = cs @@ -9353,10 +9689,10 @@ func (cc *http2ClientConn) forgetStreamID(id uint32) { if len(cc.streams) != slen-1 { panic("forgetting unknown stream id") } - cc.lastActive = time.Now() + cc.lastActive = cc.t.now() if len(cc.streams) == 0 && cc.idleTimer != nil { cc.idleTimer.Reset(cc.idleTimeout) - cc.lastIdle = time.Now() + cc.lastIdle = cc.t.now() } // Wake up writeRequestBody via clientStream.awaitFlowControl and // wake up RoundTrip if there is a pending request. @@ -9416,7 +9752,6 @@ func http2isEOFOrNetReadError(err error) bool { func (rl *http2clientConnReadLoop) cleanup() { cc := rl.cc - cc.t.connPool().MarkDead(cc) defer cc.closeConn() defer close(cc.readerDone) @@ -9440,6 +9775,24 @@ func (rl *http2clientConnReadLoop) cleanup() { } cc.closed = true + // If the connection has never been used, and has been open for only a short time, + // leave it in the connection pool for a little while. + // + // This avoids a situation where new connections are constantly created, + // added to the pool, fail, and are removed from the pool, without any error + // being surfaced to the user. + const unusedWaitTime = 5 * time.Second + idleTime := cc.t.now().Sub(cc.lastActive) + if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime { + cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() { + cc.t.connPool().MarkDead(cc) + }) + } else { + cc.mu.Unlock() // avoid any deadlocks in MarkDead + cc.t.connPool().MarkDead(cc) + cc.mu.Lock() + } + for _, cs := range cc.streams { select { case <-cs.peerClosed: @@ -9483,7 +9836,7 @@ func (cc *http2ClientConn) countReadFrameError(err error) { func (rl *http2clientConnReadLoop) run() error { cc := rl.cc gotSettings := false - readIdleTimeout := cc.t.ReadIdleTimeout + readIdleTimeout := cc.readIdleTimeout var t http2timer if readIdleTimeout != 0 { t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck) @@ -9667,15 +10020,34 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http if f.StreamEnded() { return nil, errors.New("1xx informational response with END_STREAM flag") } - cs.num1xx++ - const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http - if cs.num1xx > max1xxResponses { - return nil, errors.New("http2: too many 1xx informational responses") - } if fn := cs.get1xxTraceFunc(); fn != nil { + // If the 1xx response is being delivered to the user, + // then they're responsible for limiting the number + // of responses. if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil { return nil, err } + } else { + // If the user didn't examine the 1xx response, then we + // limit the size of all 1xx headers. + // + // This differs a bit from the HTTP/1 implementation, which + // limits the size of all 1xx headers plus the final response. + // Use the larger limit of MaxHeaderListSize and + // net/http.Transport.MaxResponseHeaderBytes. + limit := int64(cs.cc.t.maxHeaderListSize()) + if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes > limit { + limit = t1.MaxResponseHeaderBytes + } + for _, h := range f.Fields { + cs.totalHeaderSize += int64(h.Size()) + } + if cs.totalHeaderSize > limit { + if http2VerboseLogs { + log.Printf("http2: 1xx informational responses too large") + } + return nil, errors.New("header list too large") + } } if statusCode == 100 { http2traceGot100Continue(cs.trace) @@ -10219,6 +10591,11 @@ func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error { close(c) delete(cc.pings, f.Data) } + if cc.pendingResets > 0 { + // See clientStream.cleanupWriteRequest. + cc.pendingResets = 0 + cc.cond.Broadcast() + } return nil } cc := rl.cc @@ -10241,13 +10618,20 @@ func (rl *http2clientConnReadLoop) processPushPromise(f *http2PushPromiseFrame) return http2ConnectionError(http2ErrCodeProtocol) } -func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, err error) { +// writeStreamReset sends a RST_STREAM frame. +// When ping is true, it also sends a PING frame with a random payload. +func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, ping bool, err error) { // TODO: map err to more interesting error codes, once the // HTTP community comes up with some. But currently for // RST_STREAM there's no equivalent to GOAWAY frame's debug // data, and the error codes are all pretty vague ("cancel"). cc.wmu.Lock() cc.fr.WriteRSTStream(streamID, code) + if ping { + var payload [8]byte + rand.Read(payload[:]) + cc.fr.WritePing(false, payload) + } cc.bw.Flush() cc.wmu.Unlock() } @@ -10404,7 +10788,7 @@ func http2traceGotConn(req *Request, cc *http2ClientConn, reused bool) { cc.mu.Lock() ci.WasIdle = len(cc.streams) == 0 && reused if ci.WasIdle && !cc.lastActive.IsZero() { - ci.IdleTime = time.Since(cc.lastActive) + ci.IdleTime = cc.t.timeSince(cc.lastActive) } cc.mu.Unlock() @@ -10472,6 +10856,27 @@ func (t *http2Transport) dialTLSWithContext(ctx context.Context, network, addr s return tlsCn, nil } +const http2nextProtoUnencryptedHTTP2 = "unencrypted_http2" + +// unencryptedNetConnFromTLSConn retrieves a net.Conn wrapped in a *tls.Conn. +// +// TLSNextProto functions accept a *tls.Conn. +// +// When passing an unencrypted HTTP/2 connection to a TLSNextProto function, +// we pass a *tls.Conn with an underlying net.Conn containing the unencrypted connection. +// To be extra careful about mistakes (accidentally dropping TLS encryption in a place +// where we want it), the tls.Conn contains a net.Conn with an UnencryptedNetConn method +// that returns the actual connection we want to use. +func http2unencryptedNetConnFromTLSConn(tc *tls.Conn) (net.Conn, error) { + conner, ok := tc.NetConn().(interface { + UnencryptedNetConn() net.Conn + }) + if !ok { + return nil, errors.New("http2: TLS conn unexpectedly found in unencrypted handoff") + } + return conner.UnencryptedNetConn(), nil +} + // writeFramer is implemented by any type that is used to write frames. type http2writeFramer interface { writeFrame(http2writeContext) error @@ -10588,6 +10993,18 @@ func (se http2StreamError) writeFrame(ctx http2writeContext) error { func (se http2StreamError) staysWithinBuffer(max int) bool { return http2frameHeaderLen+4 <= max } +type http2writePing struct { + data [8]byte +} + +func (w http2writePing) writeFrame(ctx http2writeContext) error { + return ctx.Framer().WritePing(false, w.data) +} + +func (w http2writePing) staysWithinBuffer(max int) bool { + return http2frameHeaderLen+len(w.data) <= max +} + type http2writePingAck struct{ pf *http2PingFrame } func (w http2writePingAck) writeFrame(ctx http2writeContext) error { |
