aboutsummaryrefslogtreecommitdiff
path: root/src/net/http
diff options
context:
space:
mode:
authorDmitri Shuralyov <dmitshur@golang.org>2024-11-21 19:54:12 -0500
committerGopher Robot <gobot@golang.org>2024-11-22 01:35:05 +0000
commitca14eaf77c86bd5492329d2be6f1a82afe7802f5 (patch)
tree1b6daab038ea1094e63584a460aebbec56491b30 /src/net/http
parente8d95619978c4602d4446f113b3b69b7a22308fa (diff)
downloadgo-ca14eaf77c86bd5492329d2be6f1a82afe7802f5.tar.xz
all: update golang.org/x/net [generated]
A part of the keeping Go's vendored dependencies and generated code up to date. This updates h2_bundle.go with unencrypted HTTP/2 support. For #36905. For #67816. [git-generate] cd src go get golang.org/x/net@v0.31.0 go mod tidy go mod vendor cd cmd go get golang.org/x/net@v0.31.0 go mod tidy go mod vendor go generate -run=bundle std Change-Id: I2b77f651b990f260fbe7d551c7a819518f1c983f Reviewed-on: https://go-review.googlesource.com/c/go/+/631035 Reviewed-by: Dmitri Shuralyov <dmitshur@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Damien Neil <dneil@google.com> Auto-Submit: Dmitri Shuralyov <dmitshur@golang.org>
Diffstat (limited to 'src/net/http')
-rw-r--r--src/net/http/h2_bundle.go849
1 files changed, 633 insertions, 216 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go
index af86c1430d..6b40923a86 100644
--- a/src/net/http/h2_bundle.go
+++ b/src/net/http/h2_bundle.go
@@ -883,7 +883,7 @@ func (c *http2dialCall) dial(ctx context.Context, addr string) {
// This code decides which ones live or die.
// The return value used is whether c was used.
// c is never closed.
-func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {
+func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c net.Conn) (used bool, err error) {
p.mu.Lock()
for _, cc := range p.conns[key] {
if cc.CanTakeNewRequest() {
@@ -919,8 +919,8 @@ type http2addConnCall struct {
err error
}
-func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {
- cc, err := t.NewClientConn(tc)
+func (c *http2addConnCall) run(t *http2Transport, key string, nc net.Conn) {
+ cc, err := t.NewClientConn(nc)
p := c.p
p.mu.Lock()
@@ -1035,6 +1035,169 @@ func http2shouldRetryDial(call *http2dialCall, req *Request) bool {
return call.ctx.Err() != nil
}
+// http2Config is a package-internal version of net/http.HTTP2Config.
+//
+// http.HTTP2Config was added in Go 1.24.
+// When running with a version of net/http that includes HTTP2Config,
+// we merge the configuration with the fields in Transport or Server
+// to produce an http2Config.
+//
+// Zero valued fields in http2Config are interpreted as in the
+// net/http.HTTPConfig documentation.
+//
+// Precedence order for reconciling configurations is:
+//
+// - Use the net/http.{Server,Transport}.HTTP2Config value, when non-zero.
+// - Otherwise use the http2.{Server.Transport} value.
+// - If the resulting value is zero or out of range, use a default.
+type http2http2Config struct {
+ MaxConcurrentStreams uint32
+ MaxDecoderHeaderTableSize uint32
+ MaxEncoderHeaderTableSize uint32
+ MaxReadFrameSize uint32
+ MaxUploadBufferPerConnection int32
+ MaxUploadBufferPerStream int32
+ SendPingTimeout time.Duration
+ PingTimeout time.Duration
+ WriteByteTimeout time.Duration
+ PermitProhibitedCipherSuites bool
+ CountError func(errType string)
+}
+
+// configFromServer merges configuration settings from
+// net/http.Server.HTTP2Config and http2.Server.
+func http2configFromServer(h1 *Server, h2 *http2Server) http2http2Config {
+ conf := http2http2Config{
+ MaxConcurrentStreams: h2.MaxConcurrentStreams,
+ MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize,
+ MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize,
+ MaxReadFrameSize: h2.MaxReadFrameSize,
+ MaxUploadBufferPerConnection: h2.MaxUploadBufferPerConnection,
+ MaxUploadBufferPerStream: h2.MaxUploadBufferPerStream,
+ SendPingTimeout: h2.ReadIdleTimeout,
+ PingTimeout: h2.PingTimeout,
+ WriteByteTimeout: h2.WriteByteTimeout,
+ PermitProhibitedCipherSuites: h2.PermitProhibitedCipherSuites,
+ CountError: h2.CountError,
+ }
+ http2fillNetHTTPServerConfig(&conf, h1)
+ http2setConfigDefaults(&conf, true)
+ return conf
+}
+
+// configFromServer merges configuration settings from h2 and h2.t1.HTTP2
+// (the net/http Transport).
+func http2configFromTransport(h2 *http2Transport) http2http2Config {
+ conf := http2http2Config{
+ MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize,
+ MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize,
+ MaxReadFrameSize: h2.MaxReadFrameSize,
+ SendPingTimeout: h2.ReadIdleTimeout,
+ PingTimeout: h2.PingTimeout,
+ WriteByteTimeout: h2.WriteByteTimeout,
+ }
+
+ // Unlike most config fields, where out-of-range values revert to the default,
+ // Transport.MaxReadFrameSize clips.
+ if conf.MaxReadFrameSize < http2minMaxFrameSize {
+ conf.MaxReadFrameSize = http2minMaxFrameSize
+ } else if conf.MaxReadFrameSize > http2maxFrameSize {
+ conf.MaxReadFrameSize = http2maxFrameSize
+ }
+
+ if h2.t1 != nil {
+ http2fillNetHTTPTransportConfig(&conf, h2.t1)
+ }
+ http2setConfigDefaults(&conf, false)
+ return conf
+}
+
+func http2setDefault[T ~int | ~int32 | ~uint32 | ~int64](v *T, minval, maxval, defval T) {
+ if *v < minval || *v > maxval {
+ *v = defval
+ }
+}
+
+func http2setConfigDefaults(conf *http2http2Config, server bool) {
+ http2setDefault(&conf.MaxConcurrentStreams, 1, math.MaxUint32, http2defaultMaxStreams)
+ http2setDefault(&conf.MaxEncoderHeaderTableSize, 1, math.MaxUint32, http2initialHeaderTableSize)
+ http2setDefault(&conf.MaxDecoderHeaderTableSize, 1, math.MaxUint32, http2initialHeaderTableSize)
+ if server {
+ http2setDefault(&conf.MaxUploadBufferPerConnection, http2initialWindowSize, math.MaxInt32, 1<<20)
+ } else {
+ http2setDefault(&conf.MaxUploadBufferPerConnection, http2initialWindowSize, math.MaxInt32, http2transportDefaultConnFlow)
+ }
+ if server {
+ http2setDefault(&conf.MaxUploadBufferPerStream, 1, math.MaxInt32, 1<<20)
+ } else {
+ http2setDefault(&conf.MaxUploadBufferPerStream, 1, math.MaxInt32, http2transportDefaultStreamFlow)
+ }
+ http2setDefault(&conf.MaxReadFrameSize, http2minMaxFrameSize, http2maxFrameSize, http2defaultMaxReadFrameSize)
+ http2setDefault(&conf.PingTimeout, 1, math.MaxInt64, 15*time.Second)
+}
+
+// adjustHTTP1MaxHeaderSize converts a limit in bytes on the size of an HTTP/1 header
+// to an HTTP/2 MAX_HEADER_LIST_SIZE value.
+func http2adjustHTTP1MaxHeaderSize(n int64) int64 {
+ // http2's count is in a slightly different unit and includes 32 bytes per pair.
+ // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
+ const perFieldOverhead = 32 // per http2 spec
+ const typicalHeaders = 10 // conservative
+ return n + typicalHeaders*perFieldOverhead
+}
+
+// fillNetHTTPServerConfig sets fields in conf from srv.HTTP2.
+func http2fillNetHTTPServerConfig(conf *http2http2Config, srv *Server) {
+ http2fillNetHTTPConfig(conf, srv.HTTP2)
+}
+
+// fillNetHTTPServerConfig sets fields in conf from tr.HTTP2.
+func http2fillNetHTTPTransportConfig(conf *http2http2Config, tr *Transport) {
+ http2fillNetHTTPConfig(conf, tr.HTTP2)
+}
+
+func http2fillNetHTTPConfig(conf *http2http2Config, h2 *HTTP2Config) {
+ if h2 == nil {
+ return
+ }
+ if h2.MaxConcurrentStreams != 0 {
+ conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
+ }
+ if h2.MaxEncoderHeaderTableSize != 0 {
+ conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize)
+ }
+ if h2.MaxDecoderHeaderTableSize != 0 {
+ conf.MaxDecoderHeaderTableSize = uint32(h2.MaxDecoderHeaderTableSize)
+ }
+ if h2.MaxConcurrentStreams != 0 {
+ conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
+ }
+ if h2.MaxReadFrameSize != 0 {
+ conf.MaxReadFrameSize = uint32(h2.MaxReadFrameSize)
+ }
+ if h2.MaxReceiveBufferPerConnection != 0 {
+ conf.MaxUploadBufferPerConnection = int32(h2.MaxReceiveBufferPerConnection)
+ }
+ if h2.MaxReceiveBufferPerStream != 0 {
+ conf.MaxUploadBufferPerStream = int32(h2.MaxReceiveBufferPerStream)
+ }
+ if h2.SendPingTimeout != 0 {
+ conf.SendPingTimeout = h2.SendPingTimeout
+ }
+ if h2.PingTimeout != 0 {
+ conf.PingTimeout = h2.PingTimeout
+ }
+ if h2.WriteByteTimeout != 0 {
+ conf.WriteByteTimeout = h2.WriteByteTimeout
+ }
+ if h2.PermitProhibitedCipherSuites {
+ conf.PermitProhibitedCipherSuites = true
+ }
+ if h2.CountError != nil {
+ conf.CountError = h2.CountError
+ }
+}
+
// Buffer chunks are allocated from a pool to reduce pressure on GC.
// The maximum wasted space per dataBuffer is 2x the largest size class,
// which happens when the dataBuffer has multiple chunks and there is
@@ -3550,13 +3713,19 @@ func (cw http2closeWaiter) Wait() {
// Its buffered writer is lazily allocated as needed, to minimize
// idle memory usage with many connections.
type http2bufferedWriter struct {
- _ http2incomparable
- w io.Writer // immutable
- bw *bufio.Writer // non-nil when data is buffered
+ _ http2incomparable
+ group http2synctestGroupInterface // immutable
+ conn net.Conn // immutable
+ bw *bufio.Writer // non-nil when data is buffered
+ byteTimeout time.Duration // immutable, WriteByteTimeout
}
-func http2newBufferedWriter(w io.Writer) *http2bufferedWriter {
- return &http2bufferedWriter{w: w}
+func http2newBufferedWriter(group http2synctestGroupInterface, conn net.Conn, timeout time.Duration) *http2bufferedWriter {
+ return &http2bufferedWriter{
+ group: group,
+ conn: conn,
+ byteTimeout: timeout,
+ }
}
// bufWriterPoolBufferSize is the size of bufio.Writer's
@@ -3583,7 +3752,7 @@ func (w *http2bufferedWriter) Available() int {
func (w *http2bufferedWriter) Write(p []byte) (n int, err error) {
if w.bw == nil {
bw := http2bufWriterPool.Get().(*bufio.Writer)
- bw.Reset(w.w)
+ bw.Reset((*http2bufferedWriterTimeoutWriter)(w))
w.bw = bw
}
return w.bw.Write(p)
@@ -3601,6 +3770,38 @@ func (w *http2bufferedWriter) Flush() error {
return err
}
+type http2bufferedWriterTimeoutWriter http2bufferedWriter
+
+func (w *http2bufferedWriterTimeoutWriter) Write(p []byte) (n int, err error) {
+ return http2writeWithByteTimeout(w.group, w.conn, w.byteTimeout, p)
+}
+
+// writeWithByteTimeout writes to conn.
+// If more than timeout passes without any bytes being written to the connection,
+// the write fails.
+func http2writeWithByteTimeout(group http2synctestGroupInterface, conn net.Conn, timeout time.Duration, p []byte) (n int, err error) {
+ if timeout <= 0 {
+ return conn.Write(p)
+ }
+ for {
+ var now time.Time
+ if group == nil {
+ now = time.Now()
+ } else {
+ now = group.Now()
+ }
+ conn.SetWriteDeadline(now.Add(timeout))
+ nn, err := conn.Write(p[n:])
+ n += nn
+ if n == len(p) || nn == 0 || !errors.Is(err, os.ErrDeadlineExceeded) {
+ // Either we finished the write, made no progress, or hit the deadline.
+ // Whichever it is, we're done now.
+ conn.SetWriteDeadline(time.Time{})
+ return n, err
+ }
+ }
+}
+
func http2mustUint31(v int32) uint32 {
if v < 0 || v > 2147483647 {
panic("out of range")
@@ -3882,10 +4083,14 @@ func (p *http2pipe) Done() <-chan struct{} {
}
const (
- http2prefaceTimeout = 10 * time.Second
- http2firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
- http2handlerChunkWriteSize = 4 << 10
- http2defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
+ http2prefaceTimeout = 10 * time.Second
+ http2firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
+ http2handlerChunkWriteSize = 4 << 10
+ http2defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
+
+ // maxQueuedControlFrames is the maximum number of control frames like
+ // SETTINGS, PING and RST_STREAM that will be queued for writing before
+ // the connection is closed to prevent memory exhaustion attacks.
http2maxQueuedControlFrames = 10000
)
@@ -3957,6 +4162,22 @@ type http2Server struct {
// If zero or negative, there is no timeout.
IdleTimeout time.Duration
+ // ReadIdleTimeout is the timeout after which a health check using a ping
+ // frame will be carried out if no frame is received on the connection.
+ // If zero, no health check is performed.
+ ReadIdleTimeout time.Duration
+
+ // PingTimeout is the timeout after which the connection will be closed
+ // if a response to a ping is not received.
+ // If zero, a default of 15 seconds is used.
+ PingTimeout time.Duration
+
+ // WriteByteTimeout is the timeout after which a connection will be
+ // closed if no data can be written to it. The timeout begins when data is
+ // available to write, and is extended whenever any bytes are written.
+ // If zero or negative, there is no timeout.
+ WriteByteTimeout time.Duration
+
// MaxUploadBufferPerConnection is the size of the initial flow
// control window for each connections. The HTTP/2 spec does not
// allow this to be smaller than 65535 or larger than 2^32-1.
@@ -4019,57 +4240,6 @@ func (s *http2Server) afterFunc(d time.Duration, f func()) http2timer {
return http2timeTimer{time.AfterFunc(d, f)}
}
-func (s *http2Server) initialConnRecvWindowSize() int32 {
- if s.MaxUploadBufferPerConnection >= http2initialWindowSize {
- return s.MaxUploadBufferPerConnection
- }
- return 1 << 20
-}
-
-func (s *http2Server) initialStreamRecvWindowSize() int32 {
- if s.MaxUploadBufferPerStream > 0 {
- return s.MaxUploadBufferPerStream
- }
- return 1 << 20
-}
-
-func (s *http2Server) maxReadFrameSize() uint32 {
- if v := s.MaxReadFrameSize; v >= http2minMaxFrameSize && v <= http2maxFrameSize {
- return v
- }
- return http2defaultMaxReadFrameSize
-}
-
-func (s *http2Server) maxConcurrentStreams() uint32 {
- if v := s.MaxConcurrentStreams; v > 0 {
- return v
- }
- return http2defaultMaxStreams
-}
-
-func (s *http2Server) maxDecoderHeaderTableSize() uint32 {
- if v := s.MaxDecoderHeaderTableSize; v > 0 {
- return v
- }
- return http2initialHeaderTableSize
-}
-
-func (s *http2Server) maxEncoderHeaderTableSize() uint32 {
- if v := s.MaxEncoderHeaderTableSize; v > 0 {
- return v
- }
- return http2initialHeaderTableSize
-}
-
-// maxQueuedControlFrames is the maximum number of control frames like
-// SETTINGS, PING and RST_STREAM that will be queued for writing before
-// the connection is closed to prevent memory exhaustion attacks.
-func (s *http2Server) maxQueuedControlFrames() int {
- // TODO: if anybody asks, add a Server field, and remember to define the
- // behavior of negative values.
- return http2maxQueuedControlFrames
-}
-
type http2serverInternalState struct {
mu sync.Mutex
activeConns map[*http2serverConn]struct{}
@@ -4166,7 +4336,7 @@ func http2ConfigureServer(s *Server, conf *http2Server) error {
if s.TLSNextProto == nil {
s.TLSNextProto = map[string]func(*Server, *tls.Conn, Handler){}
}
- protoHandler := func(hs *Server, c *tls.Conn, h Handler) {
+ protoHandler := func(hs *Server, c net.Conn, h Handler, sawClientPreface bool) {
if http2testHookOnConn != nil {
http2testHookOnConn()
}
@@ -4183,12 +4353,31 @@ func http2ConfigureServer(s *Server, conf *http2Server) error {
ctx = bc.BaseContext()
}
conf.ServeConn(c, &http2ServeConnOpts{
- Context: ctx,
- Handler: h,
- BaseConfig: hs,
+ Context: ctx,
+ Handler: h,
+ BaseConfig: hs,
+ SawClientPreface: sawClientPreface,
})
}
- s.TLSNextProto[http2NextProtoTLS] = protoHandler
+ s.TLSNextProto[http2NextProtoTLS] = func(hs *Server, c *tls.Conn, h Handler) {
+ protoHandler(hs, c, h, false)
+ }
+ // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns.
+ //
+ // A connection passed in this method has already had the HTTP/2 preface read from it.
+ s.TLSNextProto[http2nextProtoUnencryptedHTTP2] = func(hs *Server, c *tls.Conn, h Handler) {
+ nc, err := http2unencryptedNetConnFromTLSConn(c)
+ if err != nil {
+ if lg := hs.ErrorLog; lg != nil {
+ lg.Print(err)
+ } else {
+ log.Print(err)
+ }
+ go c.Close()
+ return
+ }
+ protoHandler(hs, nc, h, true)
+ }
return nil
}
@@ -4270,13 +4459,15 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func(
baseCtx, cancel := http2serverConnBaseContext(c, opts)
defer cancel()
+ http1srv := opts.baseConfig()
+ conf := http2configFromServer(http1srv, s)
sc := &http2serverConn{
srv: s,
- hs: opts.baseConfig(),
+ hs: http1srv,
conn: c,
baseCtx: baseCtx,
remoteAddrStr: c.RemoteAddr().String(),
- bw: http2newBufferedWriter(c),
+ bw: http2newBufferedWriter(s.group, c, conf.WriteByteTimeout),
handler: opts.handler(),
streams: make(map[uint32]*http2stream),
readFrameCh: make(chan http2readFrameResult),
@@ -4286,9 +4477,12 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func(
bodyReadCh: make(chan http2bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
- advMaxStreams: s.maxConcurrentStreams(),
+ advMaxStreams: conf.MaxConcurrentStreams,
initialStreamSendWindowSize: http2initialWindowSize,
+ initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
maxFrameSize: http2initialMaxFrameSize,
+ pingTimeout: conf.PingTimeout,
+ countErrorFunc: conf.CountError,
serveG: http2newGoroutineLock(),
pushEnabled: true,
sawClientPreface: opts.SawClientPreface,
@@ -4321,15 +4515,15 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func(
sc.flow.add(http2initialWindowSize)
sc.inflow.init(http2initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
- sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize())
+ sc.hpackEncoder.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
fr := http2NewFramer(sc.bw, c)
- if s.CountError != nil {
- fr.countError = s.CountError
+ if conf.CountError != nil {
+ fr.countError = conf.CountError
}
- fr.ReadMetaHeaders = hpack.NewDecoder(s.maxDecoderHeaderTableSize(), nil)
+ fr.ReadMetaHeaders = hpack.NewDecoder(conf.MaxDecoderHeaderTableSize, nil)
fr.MaxHeaderListSize = sc.maxHeaderListSize()
- fr.SetMaxReadFrameSize(s.maxReadFrameSize())
+ fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
sc.framer = fr
if tc, ok := c.(http2connectionStater); ok {
@@ -4362,7 +4556,7 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func(
// So for now, do nothing here again.
}
- if !s.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) {
+ if !conf.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) {
// "Endpoints MAY choose to generate a connection error
// (Section 5.4.1) of type INADEQUATE_SECURITY if one of
// the prohibited cipher suites are negotiated."
@@ -4399,7 +4593,7 @@ func (s *http2Server) serveConn(c net.Conn, opts *http2ServeConnOpts, newf func(
opts.UpgradeRequest = nil
}
- sc.serve()
+ sc.serve(conf)
}
func http2serverConnBaseContext(c net.Conn, opts *http2ServeConnOpts) (ctx context.Context, cancel func()) {
@@ -4439,6 +4633,7 @@ type http2serverConn struct {
tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
writeSched http2WriteScheduler
+ countErrorFunc func(errType string)
// Everything following is owned by the serve loop; use serveG.check():
serveG http2goroutineLock // used to verify funcs are on serve()
@@ -4458,6 +4653,7 @@ type http2serverConn struct {
streams map[uint32]*http2stream
unstartedHandlers []http2unstartedHandler
initialStreamSendWindowSize int32
+ initialStreamRecvWindowSize int32
maxFrameSize int32
peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
@@ -4468,9 +4664,14 @@ type http2serverConn struct {
inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
+ pingSent bool
+ sentPingData [8]byte
goAwayCode http2ErrCode
shutdownTimer http2timer // nil until used
idleTimer http2timer // nil if unused
+ readIdleTimeout time.Duration
+ pingTimeout time.Duration
+ readIdleTimer http2timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -4485,11 +4686,7 @@ func (sc *http2serverConn) maxHeaderListSize() uint32 {
if n <= 0 {
n = DefaultMaxHeaderBytes
}
- // http2's count is in a slightly different unit and includes 32 bytes per pair.
- // So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
- const perFieldOverhead = 32 // per http2 spec
- const typicalHeaders = 10 // conservative
- return uint32(n + typicalHeaders*perFieldOverhead)
+ return uint32(http2adjustHTTP1MaxHeaderSize(int64(n)))
}
func (sc *http2serverConn) curOpenStreams() uint32 {
@@ -4756,7 +4953,7 @@ func (sc *http2serverConn) notePanic() {
}
}
-func (sc *http2serverConn) serve() {
+func (sc *http2serverConn) serve(conf http2http2Config) {
sc.serveG.check()
defer sc.notePanic()
defer sc.conn.Close()
@@ -4770,18 +4967,18 @@ func (sc *http2serverConn) serve() {
sc.writeFrame(http2FrameWriteRequest{
write: http2writeSettings{
- {http2SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
+ {http2SettingMaxFrameSize, conf.MaxReadFrameSize},
{http2SettingMaxConcurrentStreams, sc.advMaxStreams},
{http2SettingMaxHeaderListSize, sc.maxHeaderListSize()},
- {http2SettingHeaderTableSize, sc.srv.maxDecoderHeaderTableSize()},
- {http2SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
+ {http2SettingHeaderTableSize, conf.MaxDecoderHeaderTableSize},
+ {http2SettingInitialWindowSize, uint32(sc.initialStreamRecvWindowSize)},
},
})
sc.unackedSettings++
// Each connection starts with initialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
- if diff := sc.srv.initialConnRecvWindowSize() - http2initialWindowSize; diff > 0 {
+ if diff := conf.MaxUploadBufferPerConnection - http2initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}
@@ -4801,11 +4998,18 @@ func (sc *http2serverConn) serve() {
defer sc.idleTimer.Stop()
}
+ if conf.SendPingTimeout > 0 {
+ sc.readIdleTimeout = conf.SendPingTimeout
+ sc.readIdleTimer = sc.srv.afterFunc(conf.SendPingTimeout, sc.onReadIdleTimer)
+ defer sc.readIdleTimer.Stop()
+ }
+
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := sc.srv.afterFunc(http2firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
+ lastFrameTime := sc.srv.now()
loopNum := 0
for {
loopNum++
@@ -4819,6 +5023,7 @@ func (sc *http2serverConn) serve() {
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
+ lastFrameTime = sc.srv.now()
// Process any written frames before reading new frames from the client since a
// written frame could have triggered a new stream to be started.
if sc.writingFrameAsync {
@@ -4850,6 +5055,8 @@ func (sc *http2serverConn) serve() {
case http2idleTimerMsg:
sc.vlogf("connection is idle")
sc.goAway(http2ErrCodeNo)
+ case http2readIdleTimerMsg:
+ sc.handlePingTimer(lastFrameTime)
case http2shutdownTimerMsg:
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
@@ -4872,7 +5079,7 @@ func (sc *http2serverConn) serve() {
// If the peer is causing us to generate a lot of control frames,
// but not reading them from us, assume they are trying to make us
// run out of memory.
- if sc.queuedControlFrames > sc.srv.maxQueuedControlFrames() {
+ if sc.queuedControlFrames > http2maxQueuedControlFrames {
sc.vlogf("http2: too many control frames in send queue, closing connection")
return
}
@@ -4888,12 +5095,39 @@ func (sc *http2serverConn) serve() {
}
}
+func (sc *http2serverConn) handlePingTimer(lastFrameReadTime time.Time) {
+ if sc.pingSent {
+ sc.vlogf("timeout waiting for PING response")
+ sc.conn.Close()
+ return
+ }
+
+ pingAt := lastFrameReadTime.Add(sc.readIdleTimeout)
+ now := sc.srv.now()
+ if pingAt.After(now) {
+ // We received frames since arming the ping timer.
+ // Reset it for the next possible timeout.
+ sc.readIdleTimer.Reset(pingAt.Sub(now))
+ return
+ }
+
+ sc.pingSent = true
+ // Ignore crypto/rand.Read errors: It generally can't fail, and worse case if it does
+ // is we send a PING frame containing 0s.
+ _, _ = rand.Read(sc.sentPingData[:])
+ sc.writeFrame(http2FrameWriteRequest{
+ write: &http2writePing{data: sc.sentPingData},
+ })
+ sc.readIdleTimer.Reset(sc.pingTimeout)
+}
+
type http2serverMessage int
// Message values sent to serveMsgCh.
var (
http2settingsTimerMsg = new(http2serverMessage)
http2idleTimerMsg = new(http2serverMessage)
+ http2readIdleTimerMsg = new(http2serverMessage)
http2shutdownTimerMsg = new(http2serverMessage)
http2gracefulShutdownMsg = new(http2serverMessage)
http2handlerDoneMsg = new(http2serverMessage)
@@ -4903,6 +5137,8 @@ func (sc *http2serverConn) onSettingsTimer() { sc.sendServeMsg(http2settingsTime
func (sc *http2serverConn) onIdleTimer() { sc.sendServeMsg(http2idleTimerMsg) }
+func (sc *http2serverConn) onReadIdleTimer() { sc.sendServeMsg(http2readIdleTimerMsg) }
+
func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) }
func (sc *http2serverConn) sendServeMsg(msg interface{}) {
@@ -5155,6 +5391,10 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) {
sc.writingFrame = false
sc.writingFrameAsync = false
+ if res.err != nil {
+ sc.conn.Close()
+ }
+
wr := res.wr
if http2writeEndsStream(wr.write) {
@@ -5429,6 +5669,11 @@ func (sc *http2serverConn) processFrame(f http2Frame) error {
func (sc *http2serverConn) processPing(f *http2PingFrame) error {
sc.serveG.check()
if f.IsAck() {
+ if sc.pingSent && sc.sentPingData == f.Data {
+ // This is a response to a PING we sent.
+ sc.pingSent = false
+ sc.readIdleTimer.Reset(sc.readIdleTimeout)
+ }
// 6.7 PING: " An endpoint MUST NOT respond to PING frames
// containing this flag."
return nil
@@ -5995,7 +6240,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState
st.cw.Init()
st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialStreamSendWindowSize)
- st.inflow.init(sc.srv.initialStreamRecvWindowSize())
+ st.inflow.init(sc.initialStreamRecvWindowSize)
if sc.hs.WriteTimeout > 0 {
st.writeDeadline = sc.srv.afterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
}
@@ -6690,6 +6935,11 @@ func (w *http2responseWriter) SetWriteDeadline(deadline time.Time) error {
return nil
}
+func (w *http2responseWriter) EnableFullDuplex() error {
+ // We always support full duplex responses, so this is a no-op.
+ return nil
+}
+
func (w *http2responseWriter) Flush() {
w.FlushError()
}
@@ -7136,7 +7386,7 @@ func (sc *http2serverConn) countError(name string, err error) error {
if sc == nil || sc.srv == nil {
return err
}
- f := sc.srv.CountError
+ f := sc.countErrorFunc
if f == nil {
return err
}
@@ -7339,6 +7589,20 @@ func (t *http2Transport) markNewGoroutine() {
}
}
+func (t *http2Transport) now() time.Time {
+ if t != nil && t.http2transportTestHooks != nil {
+ return t.http2transportTestHooks.group.Now()
+ }
+ return time.Now()
+}
+
+func (t *http2Transport) timeSince(when time.Time) time.Duration {
+ if t != nil && t.http2transportTestHooks != nil {
+ return t.now().Sub(when)
+ }
+ return time.Since(when)
+}
+
// newTimer creates a new time.Timer, or a synthetic timer in tests.
func (t *http2Transport) newTimer(d time.Duration) http2timer {
if t.http2transportTestHooks != nil {
@@ -7363,40 +7627,26 @@ func (t *http2Transport) contextWithTimeout(ctx context.Context, d time.Duration
}
func (t *http2Transport) maxHeaderListSize() uint32 {
- if t.MaxHeaderListSize == 0 {
+ n := int64(t.MaxHeaderListSize)
+ if t.t1 != nil && t.t1.MaxResponseHeaderBytes != 0 {
+ n = t.t1.MaxResponseHeaderBytes
+ if n > 0 {
+ n = http2adjustHTTP1MaxHeaderSize(n)
+ }
+ }
+ if n <= 0 {
return 10 << 20
}
- if t.MaxHeaderListSize == 0xffffffff {
+ if n >= 0xffffffff {
return 0
}
- return t.MaxHeaderListSize
-}
-
-func (t *http2Transport) maxFrameReadSize() uint32 {
- if t.MaxReadFrameSize == 0 {
- return 0 // use the default provided by the peer
- }
- if t.MaxReadFrameSize < http2minMaxFrameSize {
- return http2minMaxFrameSize
- }
- if t.MaxReadFrameSize > http2maxFrameSize {
- return http2maxFrameSize
- }
- return t.MaxReadFrameSize
+ return uint32(n)
}
func (t *http2Transport) disableCompression() bool {
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
}
-func (t *http2Transport) pingTimeout() time.Duration {
- if t.PingTimeout == 0 {
- return 15 * time.Second
- }
- return t.PingTimeout
-
-}
-
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
// It returns an error if t1 has already been HTTP/2-enabled.
//
@@ -7432,8 +7682,8 @@ func http2configureTransports(t1 *Transport) (*http2Transport, error) {
if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
}
- upgradeFn := func(authority string, c *tls.Conn) RoundTripper {
- addr := http2authorityAddr("https", authority)
+ upgradeFn := func(scheme, authority string, c net.Conn) RoundTripper {
+ addr := http2authorityAddr(scheme, authority)
if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
go c.Close()
return http2erringRoundTripper{err}
@@ -7444,18 +7694,37 @@ func http2configureTransports(t1 *Transport) (*http2Transport, error) {
// was unknown)
go c.Close()
}
+ if scheme == "http" {
+ return (*http2unencryptedTransport)(t2)
+ }
return t2
}
- if m := t1.TLSNextProto; len(m) == 0 {
- t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{
- "h2": upgradeFn,
+ if t1.TLSNextProto == nil {
+ t1.TLSNextProto = make(map[string]func(string, *tls.Conn) RoundTripper)
+ }
+ t1.TLSNextProto[http2NextProtoTLS] = func(authority string, c *tls.Conn) RoundTripper {
+ return upgradeFn("https", authority, c)
+ }
+ // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns.
+ t1.TLSNextProto[http2nextProtoUnencryptedHTTP2] = func(authority string, c *tls.Conn) RoundTripper {
+ nc, err := http2unencryptedNetConnFromTLSConn(c)
+ if err != nil {
+ go c.Close()
+ return http2erringRoundTripper{err}
}
- } else {
- m["h2"] = upgradeFn
+ return upgradeFn("http", authority, nc)
}
return t2, nil
}
+// unencryptedTransport is a Transport with a RoundTrip method that
+// always permits http:// URLs.
+type http2unencryptedTransport http2Transport
+
+func (t *http2unencryptedTransport) RoundTrip(req *Request) (*Response, error) {
+ return (*http2Transport)(t).RoundTripOpt(req, http2RoundTripOpt{allowHTTP: true})
+}
+
func (t *http2Transport) connPool() http2ClientConnPool {
t.connPoolOnce.Do(t.initConnPool)
return t.connPoolOrDef
@@ -7475,7 +7744,7 @@ type http2ClientConn struct {
t *http2Transport
tconn net.Conn // usually *tls.Conn, except specialized impls
tlsState *tls.ConnectionState // nil only for specialized impls
- reused uint32 // whether conn is being reused; atomic
+ atomicReused uint32 // whether conn is being reused; atomic
singleUse bool // whether being used for a single http.Request
getConnCalled bool // used by clientConnPool
@@ -7506,11 +7775,22 @@ type http2ClientConn struct {
lastActive time.Time
lastIdle time.Time // time last idle
// Settings from peer: (also guarded by wmu)
- maxFrameSize uint32
- maxConcurrentStreams uint32
- peerMaxHeaderListSize uint64
- peerMaxHeaderTableSize uint32
- initialWindowSize uint32
+ maxFrameSize uint32
+ maxConcurrentStreams uint32
+ peerMaxHeaderListSize uint64
+ peerMaxHeaderTableSize uint32
+ initialWindowSize uint32
+ initialStreamRecvWindowSize int32
+ readIdleTimeout time.Duration
+ pingTimeout time.Duration
+
+ // pendingResets is the number of RST_STREAM frames we have sent to the peer,
+ // without confirming that the peer has received them. When we send a RST_STREAM,
+ // we bundle it with a PING frame, unless a PING is already in flight. We count
+ // the reset stream against the connection's concurrency limit until we get
+ // a PING response. This limits the number of requests we'll try to send to a
+ // completely unresponsive connection.
+ pendingResets int
// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
// Write to reqHeaderMu to lock it, read from it to unlock.
@@ -7568,12 +7848,12 @@ type http2clientStream struct {
sentHeaders bool
// owned by clientConnReadLoop:
- firstByte bool // got the first response byte
- pastHeaders bool // got first MetaHeadersFrame (actual headers)
- pastTrailers bool // got optional second MetaHeadersFrame (trailers)
- num1xx uint8 // number of 1xx responses seen
- readClosed bool // peer sent an END_STREAM flag
- readAborted bool // read loop reset the stream
+ firstByte bool // got the first response byte
+ pastHeaders bool // got first MetaHeadersFrame (actual headers)
+ pastTrailers bool // got optional second MetaHeadersFrame (trailers)
+ readClosed bool // peer sent an END_STREAM flag
+ readAborted bool // read loop reset the stream
+ totalHeaderSize int64 // total size of 1xx headers seen
trailer Header // accumulated trailers
resTrailer *Header // client's Response.Trailer
@@ -7635,6 +7915,7 @@ func (cs *http2clientStream) closeReqBodyLocked() {
}
type http2stickyErrWriter struct {
+ group http2synctestGroupInterface
conn net.Conn
timeout time.Duration
err *error
@@ -7644,22 +7925,9 @@ func (sew http2stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
- for {
- if sew.timeout != 0 {
- sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
- }
- nn, err := sew.conn.Write(p[n:])
- n += nn
- if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
- // Keep extending the deadline so long as we're making progress.
- continue
- }
- if sew.timeout != 0 {
- sew.conn.SetWriteDeadline(time.Time{})
- }
- *sew.err = err
- return n, err
- }
+ n, err = http2writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
+ *sew.err = err
+ return n, err
}
// noCachedConnError is the concrete type of ErrNoCachedConn, which
@@ -7691,6 +7959,8 @@ type http2RoundTripOpt struct {
// no cached connection is available, RoundTripOpt
// will return ErrNoCachedConn.
OnlyCachedConn bool
+
+ allowHTTP bool // allow http:// URLs
}
func (t *http2Transport) RoundTrip(req *Request) (*Response, error) {
@@ -7723,7 +7993,14 @@ func http2authorityAddr(scheme string, authority string) (addr string) {
// RoundTripOpt is like RoundTrip, but takes options.
func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Response, error) {
- if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) {
+ switch req.URL.Scheme {
+ case "https":
+ // Always okay.
+ case "http":
+ if !t.AllowHTTP && !opt.allowHTTP {
+ return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
+ }
+ default:
return nil, errors.New("http2: unsupported scheme")
}
@@ -7734,7 +8011,7 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
return nil, err
}
- reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1)
+ reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
http2traceGotConn(req, cc, reused)
res, err := cc.RoundTrip(req)
if err != nil && retry <= 6 {
@@ -7759,6 +8036,22 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
}
}
}
+ if err == http2errClientConnNotEstablished {
+ // This ClientConn was created recently,
+ // this is the first request to use it,
+ // and the connection is closed and not usable.
+ //
+ // In this state, cc.idleTimer will remove the conn from the pool
+ // when it fires. Stop the timer and remove it here so future requests
+ // won't try to use this connection.
+ //
+ // If the timer has already fired and we're racing it, the redundant
+ // call to MarkDead is harmless.
+ if cc.idleTimer != nil {
+ cc.idleTimer.Stop()
+ }
+ t.connPool().MarkDead(cc)
+ }
if err != nil {
t.vlogf("RoundTrip failure: %v", err)
return nil, err
@@ -7777,9 +8070,10 @@ func (t *http2Transport) CloseIdleConnections() {
}
var (
- http2errClientConnClosed = errors.New("http2: client conn is closed")
- http2errClientConnUnusable = errors.New("http2: client conn not usable")
- http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
+ http2errClientConnClosed = errors.New("http2: client conn is closed")
+ http2errClientConnUnusable = errors.New("http2: client conn not usable")
+ http2errClientConnNotEstablished = errors.New("http2: client conn could not be established")
+ http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
// shouldRetryRequest is called by RoundTrip when a request fails to get
@@ -7895,44 +8189,37 @@ func (t *http2Transport) expectContinueTimeout() time.Duration {
return t.t1.ExpectContinueTimeout
}
-func (t *http2Transport) maxDecoderHeaderTableSize() uint32 {
- if v := t.MaxDecoderHeaderTableSize; v > 0 {
- return v
- }
- return http2initialHeaderTableSize
-}
-
-func (t *http2Transport) maxEncoderHeaderTableSize() uint32 {
- if v := t.MaxEncoderHeaderTableSize; v > 0 {
- return v
- }
- return http2initialHeaderTableSize
-}
-
func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
return t.newClientConn(c, t.disableKeepAlives())
}
func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) {
+ conf := http2configFromTransport(t)
cc := &http2ClientConn{
- t: t,
- tconn: c,
- readerDone: make(chan struct{}),
- nextStreamID: 1,
- maxFrameSize: 16 << 10, // spec default
- initialWindowSize: 65535, // spec default
- maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
- peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
- streams: make(map[uint32]*http2clientStream),
- singleUse: singleUse,
- wantSettingsAck: true,
- pings: make(map[[8]byte]chan struct{}),
- reqHeaderMu: make(chan struct{}, 1),
+ t: t,
+ tconn: c,
+ readerDone: make(chan struct{}),
+ nextStreamID: 1,
+ maxFrameSize: 16 << 10, // spec default
+ initialWindowSize: 65535, // spec default
+ initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
+ maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
+ streams: make(map[uint32]*http2clientStream),
+ singleUse: singleUse,
+ wantSettingsAck: true,
+ readIdleTimeout: conf.SendPingTimeout,
+ pingTimeout: conf.PingTimeout,
+ pings: make(map[[8]byte]chan struct{}),
+ reqHeaderMu: make(chan struct{}, 1),
+ lastActive: t.now(),
}
+ var group http2synctestGroupInterface
if t.http2transportTestHooks != nil {
t.markNewGoroutine()
t.http2transportTestHooks.newclientconn(cc)
c = cc.tconn
+ group = t.group
}
if http2VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
@@ -7944,24 +8231,23 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(http2stickyErrWriter{
+ group: group,
conn: c,
- timeout: t.WriteByteTimeout,
+ timeout: conf.WriteByteTimeout,
err: &cc.werr,
})
cc.br = bufio.NewReader(c)
cc.fr = http2NewFramer(cc.bw, cc.br)
- if t.maxFrameReadSize() != 0 {
- cc.fr.SetMaxReadFrameSize(t.maxFrameReadSize())
- }
+ cc.fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
if t.CountError != nil {
cc.fr.countError = t.CountError
}
- maxHeaderTableSize := t.maxDecoderHeaderTableSize()
+ maxHeaderTableSize := conf.MaxDecoderHeaderTableSize
cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
cc.henc = hpack.NewEncoder(&cc.hbuf)
- cc.henc.SetMaxDynamicTableSizeLimit(t.maxEncoderHeaderTableSize())
+ cc.henc.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
cc.peerMaxHeaderTableSize = http2initialHeaderTableSize
if cs, ok := c.(http2connectionStater); ok {
@@ -7971,11 +8257,9 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
initialSettings := []http2Setting{
{ID: http2SettingEnablePush, Val: 0},
- {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},
- }
- if max := t.maxFrameReadSize(); max != 0 {
- initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxFrameSize, Val: max})
+ {ID: http2SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
}
+ initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxFrameSize, Val: conf.MaxReadFrameSize})
if max := t.maxHeaderListSize(); max != 0 {
initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})
}
@@ -7985,8 +8269,8 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
cc.bw.Write(http2clientPreface)
cc.fr.WriteSettings(initialSettings...)
- cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)
- cc.inflow.init(http2transportDefaultConnFlow + http2initialWindowSize)
+ cc.fr.WriteWindowUpdate(0, uint32(conf.MaxUploadBufferPerConnection))
+ cc.inflow.init(conf.MaxUploadBufferPerConnection + http2initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
cc.Close()
@@ -8004,7 +8288,7 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
}
func (cc *http2ClientConn) healthCheck() {
- pingTimeout := cc.t.pingTimeout()
+ pingTimeout := cc.pingTimeout
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
@@ -8132,7 +8416,7 @@ func (cc *http2ClientConn) State() http2ClientConnState {
return http2ClientConnState{
Closed: cc.closed,
Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
- StreamsActive: len(cc.streams),
+ StreamsActive: len(cc.streams) + cc.pendingResets,
StreamsReserved: cc.streamsReserved,
StreamsPending: cc.pendingRequests,
LastIdle: cc.lastIdle,
@@ -8164,16 +8448,38 @@ func (cc *http2ClientConn) idleStateLocked() (st http2clientConnIdleState) {
// writing it.
maxConcurrentOkay = true
} else {
- maxConcurrentOkay = int64(len(cc.streams)+cc.streamsReserved+1) <= int64(cc.maxConcurrentStreams)
+ // We can take a new request if the total of
+ // - active streams;
+ // - reservation slots for new streams; and
+ // - streams for which we have sent a RST_STREAM and a PING,
+ // but received no subsequent frame
+ // is less than the concurrency limit.
+ maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
}
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
!cc.doNotReuse &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()
+
+ // If this connection has never been used for a request and is closed,
+ // then let it take a request (which will fail).
+ //
+ // This avoids a situation where an error early in a connection's lifetime
+ // goes unreported.
+ if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed {
+ st.canTakeNewRequest = true
+ }
+
return
}
+// currentRequestCountLocked reports the number of concurrency slots currently in use,
+// including active streams, reserved slots, and reset streams waiting for acknowledgement.
+func (cc *http2ClientConn) currentRequestCountLocked() int {
+ return len(cc.streams) + cc.streamsReserved + cc.pendingResets
+}
+
func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
st := cc.idleStateLocked()
return st.canTakeNewRequest
@@ -8186,7 +8492,7 @@ func (cc *http2ClientConn) tooIdleLocked() bool {
// times are compared based on their wall time. We don't want
// to reuse a connection that's been sitting idle during
// VM/laptop suspend if monotonic time was also frozen.
- return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
+ return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -8750,6 +9056,7 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) {
cs.reqBodyClosed = make(chan struct{})
}
bodyClosed := cs.reqBodyClosed
+ closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
cc.mu.Unlock()
if mustCloseBody {
cs.reqBody.Close()
@@ -8774,16 +9081,40 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) {
if cs.sentHeaders {
if se, ok := err.(http2StreamError); ok {
if se.Cause != http2errFromPeer {
- cc.writeStreamReset(cs.ID, se.Code, err)
+ cc.writeStreamReset(cs.ID, se.Code, false, err)
}
} else {
- cc.writeStreamReset(cs.ID, http2ErrCodeCancel, err)
+ // We're cancelling an in-flight request.
+ //
+ // This could be due to the server becoming unresponsive.
+ // To avoid sending too many requests on a dead connection,
+ // we let the request continue to consume a concurrency slot
+ // until we can confirm the server is still responding.
+ // We do this by sending a PING frame along with the RST_STREAM
+ // (unless a ping is already in flight).
+ //
+ // For simplicity, we don't bother tracking the PING payload:
+ // We reset cc.pendingResets any time we receive a PING ACK.
+ //
+ // We skip this if the conn is going to be closed on idle,
+ // because it's short lived and will probably be closed before
+ // we get the ping response.
+ ping := false
+ if !closeOnIdle {
+ cc.mu.Lock()
+ if cc.pendingResets == 0 {
+ ping = true
+ }
+ cc.pendingResets++
+ cc.mu.Unlock()
+ }
+ cc.writeStreamReset(cs.ID, http2ErrCodeCancel, ping, err)
}
}
cs.bufPipe.CloseWithError(err) // no-op if already closed
} else {
if cs.sentHeaders && !cs.sentEndStream {
- cc.writeStreamReset(cs.ID, http2ErrCodeNo, nil)
+ cc.writeStreamReset(cs.ID, http2ErrCodeNo, false, nil)
}
cs.bufPipe.CloseWithError(http2errRequestCanceled)
}
@@ -8805,12 +9136,17 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) {
// Must hold cc.mu.
func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) error {
for {
- cc.lastActive = time.Now()
+ if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
+ // This is the very first request sent to this connection.
+ // Return a fatal error which aborts the retry loop.
+ return http2errClientConnNotEstablished
+ }
+ cc.lastActive = cc.t.now()
if cc.closed || !cc.canTakeNewRequestLocked() {
return http2errClientConnUnusable
}
cc.lastIdle = time.Time{}
- if int64(len(cc.streams)) < int64(cc.maxConcurrentStreams) {
+ if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
return nil
}
cc.pendingRequests++
@@ -9337,7 +9673,7 @@ type http2resAndError struct {
func (cc *http2ClientConn) addStreamLocked(cs *http2clientStream) {
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
- cs.inflow.init(http2transportDefaultStreamFlow)
+ cs.inflow.init(cc.initialStreamRecvWindowSize)
cs.ID = cc.nextStreamID
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
@@ -9353,10 +9689,10 @@ func (cc *http2ClientConn) forgetStreamID(id uint32) {
if len(cc.streams) != slen-1 {
panic("forgetting unknown stream id")
}
- cc.lastActive = time.Now()
+ cc.lastActive = cc.t.now()
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
- cc.lastIdle = time.Now()
+ cc.lastIdle = cc.t.now()
}
// Wake up writeRequestBody via clientStream.awaitFlowControl and
// wake up RoundTrip if there is a pending request.
@@ -9416,7 +9752,6 @@ func http2isEOFOrNetReadError(err error) bool {
func (rl *http2clientConnReadLoop) cleanup() {
cc := rl.cc
- cc.t.connPool().MarkDead(cc)
defer cc.closeConn()
defer close(cc.readerDone)
@@ -9440,6 +9775,24 @@ func (rl *http2clientConnReadLoop) cleanup() {
}
cc.closed = true
+ // If the connection has never been used, and has been open for only a short time,
+ // leave it in the connection pool for a little while.
+ //
+ // This avoids a situation where new connections are constantly created,
+ // added to the pool, fail, and are removed from the pool, without any error
+ // being surfaced to the user.
+ const unusedWaitTime = 5 * time.Second
+ idleTime := cc.t.now().Sub(cc.lastActive)
+ if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime {
+ cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
+ cc.t.connPool().MarkDead(cc)
+ })
+ } else {
+ cc.mu.Unlock() // avoid any deadlocks in MarkDead
+ cc.t.connPool().MarkDead(cc)
+ cc.mu.Lock()
+ }
+
for _, cs := range cc.streams {
select {
case <-cs.peerClosed:
@@ -9483,7 +9836,7 @@ func (cc *http2ClientConn) countReadFrameError(err error) {
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
gotSettings := false
- readIdleTimeout := cc.t.ReadIdleTimeout
+ readIdleTimeout := cc.readIdleTimeout
var t http2timer
if readIdleTimeout != 0 {
t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
@@ -9667,15 +10020,34 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
if f.StreamEnded() {
return nil, errors.New("1xx informational response with END_STREAM flag")
}
- cs.num1xx++
- const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http
- if cs.num1xx > max1xxResponses {
- return nil, errors.New("http2: too many 1xx informational responses")
- }
if fn := cs.get1xxTraceFunc(); fn != nil {
+ // If the 1xx response is being delivered to the user,
+ // then they're responsible for limiting the number
+ // of responses.
if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
return nil, err
}
+ } else {
+ // If the user didn't examine the 1xx response, then we
+ // limit the size of all 1xx headers.
+ //
+ // This differs a bit from the HTTP/1 implementation, which
+ // limits the size of all 1xx headers plus the final response.
+ // Use the larger limit of MaxHeaderListSize and
+ // net/http.Transport.MaxResponseHeaderBytes.
+ limit := int64(cs.cc.t.maxHeaderListSize())
+ if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes > limit {
+ limit = t1.MaxResponseHeaderBytes
+ }
+ for _, h := range f.Fields {
+ cs.totalHeaderSize += int64(h.Size())
+ }
+ if cs.totalHeaderSize > limit {
+ if http2VerboseLogs {
+ log.Printf("http2: 1xx informational responses too large")
+ }
+ return nil, errors.New("header list too large")
+ }
}
if statusCode == 100 {
http2traceGot100Continue(cs.trace)
@@ -10219,6 +10591,11 @@ func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error {
close(c)
delete(cc.pings, f.Data)
}
+ if cc.pendingResets > 0 {
+ // See clientStream.cleanupWriteRequest.
+ cc.pendingResets = 0
+ cc.cond.Broadcast()
+ }
return nil
}
cc := rl.cc
@@ -10241,13 +10618,20 @@ func (rl *http2clientConnReadLoop) processPushPromise(f *http2PushPromiseFrame)
return http2ConnectionError(http2ErrCodeProtocol)
}
-func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, err error) {
+// writeStreamReset sends a RST_STREAM frame.
+// When ping is true, it also sends a PING frame with a random payload.
+func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, ping bool, err error) {
// TODO: map err to more interesting error codes, once the
// HTTP community comes up with some. But currently for
// RST_STREAM there's no equivalent to GOAWAY frame's debug
// data, and the error codes are all pretty vague ("cancel").
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
+ if ping {
+ var payload [8]byte
+ rand.Read(payload[:])
+ cc.fr.WritePing(false, payload)
+ }
cc.bw.Flush()
cc.wmu.Unlock()
}
@@ -10404,7 +10788,7 @@ func http2traceGotConn(req *Request, cc *http2ClientConn, reused bool) {
cc.mu.Lock()
ci.WasIdle = len(cc.streams) == 0 && reused
if ci.WasIdle && !cc.lastActive.IsZero() {
- ci.IdleTime = time.Since(cc.lastActive)
+ ci.IdleTime = cc.t.timeSince(cc.lastActive)
}
cc.mu.Unlock()
@@ -10472,6 +10856,27 @@ func (t *http2Transport) dialTLSWithContext(ctx context.Context, network, addr s
return tlsCn, nil
}
+const http2nextProtoUnencryptedHTTP2 = "unencrypted_http2"
+
+// unencryptedNetConnFromTLSConn retrieves a net.Conn wrapped in a *tls.Conn.
+//
+// TLSNextProto functions accept a *tls.Conn.
+//
+// When passing an unencrypted HTTP/2 connection to a TLSNextProto function,
+// we pass a *tls.Conn with an underlying net.Conn containing the unencrypted connection.
+// To be extra careful about mistakes (accidentally dropping TLS encryption in a place
+// where we want it), the tls.Conn contains a net.Conn with an UnencryptedNetConn method
+// that returns the actual connection we want to use.
+func http2unencryptedNetConnFromTLSConn(tc *tls.Conn) (net.Conn, error) {
+ conner, ok := tc.NetConn().(interface {
+ UnencryptedNetConn() net.Conn
+ })
+ if !ok {
+ return nil, errors.New("http2: TLS conn unexpectedly found in unencrypted handoff")
+ }
+ return conner.UnencryptedNetConn(), nil
+}
+
// writeFramer is implemented by any type that is used to write frames.
type http2writeFramer interface {
writeFrame(http2writeContext) error
@@ -10588,6 +10993,18 @@ func (se http2StreamError) writeFrame(ctx http2writeContext) error {
func (se http2StreamError) staysWithinBuffer(max int) bool { return http2frameHeaderLen+4 <= max }
+type http2writePing struct {
+ data [8]byte
+}
+
+func (w http2writePing) writeFrame(ctx http2writeContext) error {
+ return ctx.Framer().WritePing(false, w.data)
+}
+
+func (w http2writePing) staysWithinBuffer(max int) bool {
+ return http2frameHeaderLen+len(w.data) <= max
+}
+
type http2writePingAck struct{ pf *http2PingFrame }
func (w http2writePingAck) writeFrame(ctx http2writeContext) error {