diff options
| author | David Chase <drchase@google.com> | 2023-02-09 13:04:21 -0500 |
|---|---|---|
| committer | David Chase <drchase@google.com> | 2023-02-10 18:59:52 +0000 |
| commit | ae2f1201ed446dcea172173a5f48a35b6fd9e40a (patch) | |
| tree | 4c4ff31019afbd9efecf55452bc61eea517449cc /src/net/http | |
| parent | 851f6fd61425c810959c7ab51e6dc86f8a63c970 (diff) | |
| download | go-ae2f1201ed446dcea172173a5f48a35b6fd9e40a.tar.xz | |
vendor, cmd/vendor: update standard library dependencies
Change-Id: I6facfae14e850f6c9bf3bcb53489c8b475bbb860
Reviewed-on: https://go-review.googlesource.com/c/go/+/467297
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Damien Neil <dneil@google.com>
Run-TryBot: David Chase <drchase@google.com>
Diffstat (limited to 'src/net/http')
| -rw-r--r-- | src/net/http/h2_bundle.go | 269 |
1 files changed, 149 insertions, 120 deletions
diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index 1e0b83d493..eb3fd159f8 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -1303,23 +1303,91 @@ var ( http2errPseudoAfterRegular = errors.New("pseudo header field after regular") ) -// flow is the flow control window's size. -type http2flow struct { +// inflowMinRefresh is the minimum number of bytes we'll send for a +// flow control window update. +const http2inflowMinRefresh = 4 << 10 + +// inflow accounts for an inbound flow control window. +// It tracks both the latest window sent to the peer (used for enforcement) +// and the accumulated unsent window. +type http2inflow struct { + avail int32 + unsent int32 +} + +// init sets the initial window. +func (f *http2inflow) init(n int32) { + f.avail = n +} + +// add adds n bytes to the window, with a maximum window size of max, +// indicating that the peer can now send us more data. +// For example, the user read from a {Request,Response} body and consumed +// some of the buffered data, so the peer can now send more. +// It returns the number of bytes to send in a WINDOW_UPDATE frame to the peer. +// Window updates are accumulated and sent when the unsent capacity +// is at least inflowMinRefresh or will at least double the peer's available window. +func (f *http2inflow) add(n int) (connAdd int32) { + if n < 0 { + panic("negative update") + } + unsent := int64(f.unsent) + int64(n) + // "A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets." + // RFC 7540 Section 6.9.1. + const maxWindow = 1<<31 - 1 + if unsent+int64(f.avail) > maxWindow { + panic("flow control update exceeds maximum window size") + } + f.unsent = int32(unsent) + if f.unsent < http2inflowMinRefresh && f.unsent < f.avail { + // If there aren't at least inflowMinRefresh bytes of window to send, + // and this update won't at least double the window, buffer the update for later. + return 0 + } + f.avail += f.unsent + f.unsent = 0 + return int32(unsent) +} + +// take attempts to take n bytes from the peer's flow control window. +// It reports whether the window has available capacity. +func (f *http2inflow) take(n uint32) bool { + if n > uint32(f.avail) { + return false + } + f.avail -= int32(n) + return true +} + +// takeInflows attempts to take n bytes from two inflows, +// typically connection-level and stream-level flows. +// It reports whether both windows have available capacity. +func http2takeInflows(f1, f2 *http2inflow, n uint32) bool { + if n > uint32(f1.avail) || n > uint32(f2.avail) { + return false + } + f1.avail -= int32(n) + f2.avail -= int32(n) + return true +} + +// outflow is the outbound flow control window's size. +type http2outflow struct { _ http2incomparable // n is the number of DATA bytes we're allowed to send. - // A flow is kept both on a conn and a per-stream. + // An outflow is kept both on a conn and a per-stream. n int32 - // conn points to the shared connection-level flow that is - // shared by all streams on that conn. It is nil for the flow + // conn points to the shared connection-level outflow that is + // shared by all streams on that conn. It is nil for the outflow // that's on the conn directly. - conn *http2flow + conn *http2outflow } -func (f *http2flow) setConnFlow(cf *http2flow) { f.conn = cf } +func (f *http2outflow) setConnFlow(cf *http2outflow) { f.conn = cf } -func (f *http2flow) available() int32 { +func (f *http2outflow) available() int32 { n := f.n if f.conn != nil && f.conn.n < n { n = f.conn.n @@ -1327,7 +1395,7 @@ func (f *http2flow) available() int32 { return n } -func (f *http2flow) take(n int32) { +func (f *http2outflow) take(n int32) { if n > f.available() { panic("internal error: took too much") } @@ -1339,7 +1407,7 @@ func (f *http2flow) take(n int32) { // add adds n bytes (positive or negative) to the flow control window. // It returns false if the sum would exceed 2^31-1. -func (f *http2flow) add(n int32) bool { +func (f *http2outflow) add(n int32) bool { sum := f.n + n if (sum > n) == (f.n > 0) { f.n = sum @@ -4187,7 +4255,7 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { // configured value for inflow, that will be updated when we send a // WINDOW_UPDATE shortly after sending SETTINGS. sc.flow.add(http2initialWindowSize) - sc.inflow.add(http2initialWindowSize) + sc.inflow.init(http2initialWindowSize) sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) sc.hpackEncoder.SetMaxDynamicTableSizeLimit(s.maxEncoderHeaderTableSize()) @@ -4302,8 +4370,8 @@ type http2serverConn struct { wroteFrameCh chan http2frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes bodyReadCh chan http2bodyReadMsg // from handlers -> serve serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop - flow http2flow // conn-wide (not stream-specific) outbound flow control - inflow http2flow // conn-wide inbound flow control + flow http2outflow // conn-wide (not stream-specific) outbound flow control + inflow http2inflow // conn-wide inbound flow control tlsState *tls.ConnectionState // shared by all handlers, like net/http remoteAddrStr string writeSched http2WriteScheduler @@ -4380,10 +4448,10 @@ type http2stream struct { cancelCtx func() // owned by serverConn's serve loop: - bodyBytes int64 // body bytes seen so far - declBodyBytes int64 // or -1 if undeclared - flow http2flow // limits writing from Handler to client - inflow http2flow // what the client is allowed to POST/etc to us + bodyBytes int64 // body bytes seen so far + declBodyBytes int64 // or -1 if undeclared + flow http2outflow // limits writing from Handler to client + inflow http2inflow // what the client is allowed to POST/etc to us state http2streamState resetQueued bool // RST_STREAM queued for write; set by sc.resetStream gotTrailerHeader bool // HEADER frame for trailers was seen @@ -5247,7 +5315,7 @@ func (sc *http2serverConn) processFrame(f http2Frame) error { if sc.inGoAway && (sc.goAwayCode != http2ErrCodeNo || f.Header().StreamID > sc.maxClientStreamID) { if f, ok := f.(*http2DataFrame); ok { - if sc.inflow.available() < int32(f.Length) { + if !sc.inflow.take(f.Length) { return sc.countError("data_flow", http2streamError(f.Header().StreamID, http2ErrCodeFlowControl)) } sc.sendWindowUpdate(nil, int(f.Length)) // conn-level @@ -5519,14 +5587,9 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { // But still enforce their connection-level flow control, // and return any flow control bytes since we're not going // to consume them. - if sc.inflow.available() < int32(f.Length) { + if !sc.inflow.take(f.Length) { return sc.countError("data_flow", http2streamError(id, http2ErrCodeFlowControl)) } - // Deduct the flow control from inflow, since we're - // going to immediately add it back in - // sendWindowUpdate, which also schedules sending the - // frames. - sc.inflow.take(int32(f.Length)) sc.sendWindowUpdate(nil, int(f.Length)) // conn-level if st != nil && st.resetQueued { @@ -5541,10 +5604,9 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { // Sender sending more than they'd declared? if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { - if sc.inflow.available() < int32(f.Length) { + if !sc.inflow.take(f.Length) { return sc.countError("data_flow", http2streamError(id, http2ErrCodeFlowControl)) } - sc.inflow.take(int32(f.Length)) sc.sendWindowUpdate(nil, int(f.Length)) // conn-level st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) @@ -5555,10 +5617,9 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { } if f.Length > 0 { // Check whether the client has flow control quota. - if st.inflow.available() < int32(f.Length) { + if !http2takeInflows(&sc.inflow, &st.inflow, f.Length) { return sc.countError("flow_on_data_length", http2streamError(id, http2ErrCodeFlowControl)) } - st.inflow.take(int32(f.Length)) if len(data) > 0 { wrote, err := st.body.Write(data) @@ -5574,10 +5635,12 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error { // Return any padded flow control now, since we won't // refund it later on body reads. - if pad := int32(f.Length) - int32(len(data)); pad > 0 { - sc.sendWindowUpdate32(nil, pad) - sc.sendWindowUpdate32(st, pad) - } + // Call sendWindowUpdate even if there is no padding, + // to return buffered flow control credit if the sent + // window has shrunk. + pad := int32(f.Length) - int32(len(data)) + sc.sendWindowUpdate32(nil, pad) + sc.sendWindowUpdate32(st, pad) } if f.StreamEnded() { st.endStream() @@ -5849,8 +5912,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState st.cw.Init() st.flow.conn = &sc.flow // link to conn-level counter st.flow.add(sc.initialStreamSendWindowSize) - st.inflow.conn = &sc.inflow // link to conn-level counter - st.inflow.add(sc.srv.initialStreamRecvWindowSize()) + st.inflow.init(sc.srv.initialStreamRecvWindowSize()) if sc.hs.WriteTimeout != 0 { st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) } @@ -5942,7 +6004,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re tlsState = sc.tlsState } - needsContinue := rp.header.Get("Expect") == "100-continue" + needsContinue := httpguts.HeaderValuesContainsToken(rp.header["Expect"], "100-continue") if needsContinue { rp.header.Del("Expect") } @@ -6132,47 +6194,28 @@ func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) { } // st may be nil for conn-level -func (sc *http2serverConn) sendWindowUpdate(st *http2stream, n int) { - sc.serveG.check() - // "The legal range for the increment to the flow control - // window is 1 to 2^31-1 (2,147,483,647) octets." - // A Go Read call on 64-bit machines could in theory read - // a larger Read than this. Very unlikely, but we handle it here - // rather than elsewhere for now. - const maxUint31 = 1<<31 - 1 - for n > maxUint31 { - sc.sendWindowUpdate32(st, maxUint31) - n -= maxUint31 - } - sc.sendWindowUpdate32(st, int32(n)) +func (sc *http2serverConn) sendWindowUpdate32(st *http2stream, n int32) { + sc.sendWindowUpdate(st, int(n)) } // st may be nil for conn-level -func (sc *http2serverConn) sendWindowUpdate32(st *http2stream, n int32) { +func (sc *http2serverConn) sendWindowUpdate(st *http2stream, n int) { sc.serveG.check() - if n == 0 { - return - } - if n < 0 { - panic("negative update") - } var streamID uint32 - if st != nil { + var send int32 + if st == nil { + send = sc.inflow.add(n) + } else { streamID = st.id + send = st.inflow.add(n) + } + if send == 0 { + return } sc.writeFrame(http2FrameWriteRequest{ - write: http2writeWindowUpdate{streamID: streamID, n: uint32(n)}, + write: http2writeWindowUpdate{streamID: streamID, n: uint32(send)}, stream: st, }) - var ok bool - if st == nil { - ok = sc.inflow.add(n) - } else { - ok = st.inflow.add(n) - } - if !ok { - panic("internal error; sent too many window updates without decrements?") - } } // requestBody is the Handler's Request.Body type. @@ -7004,10 +7047,6 @@ const ( // we buffer per stream. http2transportDefaultStreamFlow = 4 << 20 - // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send - // a stream-level WINDOW_UPDATE for at a time. - http2transportDefaultStreamMinRefresh = 4 << 10 - http2defaultUserAgent = "Go-http-client/2.0" // initialMaxConcurrentStreams is a connections maxConcurrentStreams until @@ -7265,11 +7304,11 @@ type http2ClientConn struct { idleTimeout time.Duration // or 0 for never idleTimer *time.Timer - mu sync.Mutex // guards following - cond *sync.Cond // hold mu; broadcast on flow/closed changes - flow http2flow // our conn-level flow control quota (cs.flow is per stream) - inflow http2flow // peer's conn-level flow control - doNotReuse bool // whether conn is marked to not be reused for any future requests + mu sync.Mutex // guards following + cond *sync.Cond // hold mu; broadcast on flow/closed changes + flow http2outflow // our conn-level flow control quota (cs.outflow is per stream) + inflow http2inflow // peer's conn-level flow control + doNotReuse bool // whether conn is marked to not be reused for any future requests closing bool closed bool seenSettings bool // true if we've seen a settings frame, false otherwise @@ -7333,10 +7372,10 @@ type http2clientStream struct { respHeaderRecv chan struct{} // closed when headers are received res *Response // set if respHeaderRecv is closed - flow http2flow // guarded by cc.mu - inflow http2flow // guarded by cc.mu - bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read - readErr error // sticky read error; owned by transportResponseBody.Read + flow http2outflow // guarded by cc.mu + inflow http2inflow // guarded by cc.mu + bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read + readErr error // sticky read error; owned by transportResponseBody.Read reqBody io.ReadCloser reqBodyContentLength int64 // -1 means unknown @@ -7769,7 +7808,7 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client cc.bw.Write(http2clientPreface) cc.fr.WriteSettings(initialSettings...) cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow) - cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize) + cc.inflow.init(http2transportDefaultConnFlow + http2initialWindowSize) cc.bw.Flush() if cc.werr != nil { cc.Close() @@ -8531,7 +8570,7 @@ func (cs *http2clientStream) cleanupWriteRequest(err error) { close(cs.donec) } -// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams. +// awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams. // Must hold cc.mu. func (cc *http2ClientConn) awaitOpenSlotForStreamLocked(cs *http2clientStream) error { for { @@ -9031,8 +9070,7 @@ type http2resAndError struct { func (cc *http2ClientConn) addStreamLocked(cs *http2clientStream) { cs.flow.add(int32(cc.initialWindowSize)) cs.flow.setConnFlow(&cc.flow) - cs.inflow.add(http2transportDefaultStreamFlow) - cs.inflow.setConnFlow(&cc.inflow) + cs.inflow.init(http2transportDefaultStreamFlow) cs.ID = cc.nextStreamID cc.nextStreamID += 2 cc.streams[cs.ID] = cs @@ -9491,21 +9529,10 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) { } cc.mu.Lock() - var connAdd, streamAdd int32 - // Check the conn-level first, before the stream-level. - if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 { - connAdd = http2transportDefaultConnFlow - v - cc.inflow.add(connAdd) - } + connAdd := cc.inflow.add(n) + var streamAdd int32 if err == nil { // No need to refresh if the stream is over or failed. - // Consider any buffered body data (read from the conn but not - // consumed by the client) when computing flow control for this - // stream. - v := int(cs.inflow.available()) + cs.bufPipe.Len() - if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh { - streamAdd = int32(http2transportDefaultStreamFlow - v) - cs.inflow.add(streamAdd) - } + streamAdd = cs.inflow.add(n) } cc.mu.Unlock() @@ -9533,17 +9560,15 @@ func (b http2transportResponseBody) Close() error { if unread > 0 { cc.mu.Lock() // Return connection-level flow control. - if unread > 0 { - cc.inflow.add(int32(unread)) - } + connAdd := cc.inflow.add(unread) cc.mu.Unlock() // TODO(dneil): Acquiring this mutex can block indefinitely. // Move flow control return to a goroutine? cc.wmu.Lock() // Return connection-level flow control. - if unread > 0 { - cc.fr.WriteWindowUpdate(0, uint32(unread)) + if connAdd > 0 { + cc.fr.WriteWindowUpdate(0, uint32(connAdd)) } cc.bw.Flush() cc.wmu.Unlock() @@ -9586,13 +9611,18 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { // But at least return their flow control: if f.Length > 0 { cc.mu.Lock() - cc.inflow.add(int32(f.Length)) + ok := cc.inflow.take(f.Length) + connAdd := cc.inflow.add(int(f.Length)) cc.mu.Unlock() - - cc.wmu.Lock() - cc.fr.WriteWindowUpdate(0, uint32(f.Length)) - cc.bw.Flush() - cc.wmu.Unlock() + if !ok { + return http2ConnectionError(http2ErrCodeFlowControl) + } + if connAdd > 0 { + cc.wmu.Lock() + cc.fr.WriteWindowUpdate(0, uint32(connAdd)) + cc.bw.Flush() + cc.wmu.Unlock() + } } return nil } @@ -9623,9 +9653,7 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { } // Check connection-level flow control. cc.mu.Lock() - if cs.inflow.available() >= int32(f.Length) { - cs.inflow.take(int32(f.Length)) - } else { + if !http2takeInflows(&cc.inflow, &cs.inflow, f.Length) { cc.mu.Unlock() return http2ConnectionError(http2ErrCodeFlowControl) } @@ -9647,19 +9675,20 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { } } - if refund > 0 { - cc.inflow.add(int32(refund)) - if !didReset { - cs.inflow.add(int32(refund)) - } + sendConn := cc.inflow.add(refund) + var sendStream int32 + if !didReset { + sendStream = cs.inflow.add(refund) } cc.mu.Unlock() - if refund > 0 { + if sendConn > 0 || sendStream > 0 { cc.wmu.Lock() - cc.fr.WriteWindowUpdate(0, uint32(refund)) - if !didReset { - cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) + if sendConn > 0 { + cc.fr.WriteWindowUpdate(0, uint32(sendConn)) + } + if sendStream > 0 { + cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream)) } cc.bw.Flush() cc.wmu.Unlock() |
