diff options
Diffstat (limited to 'src/vendor/golang.org/x/net/quic/stream.go')
| -rw-r--r-- | src/vendor/golang.org/x/net/quic/stream.go | 1041 |
1 files changed, 1041 insertions, 0 deletions
diff --git a/src/vendor/golang.org/x/net/quic/stream.go b/src/vendor/golang.org/x/net/quic/stream.go new file mode 100644 index 0000000000..383a6c160a --- /dev/null +++ b/src/vendor/golang.org/x/net/quic/stream.go @@ -0,0 +1,1041 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package quic + +import ( + "context" + "errors" + "fmt" + "io" + "math" + + "golang.org/x/net/internal/quic/quicwire" +) + +// A Stream is an ordered byte stream. +// +// Streams may be bidirectional, read-only, or write-only. +// Methods inappropriate for a stream's direction +// (for example, [Write] to a read-only stream) +// return errors. +// +// It is not safe to perform concurrent reads from or writes to a stream. +// It is safe, however, to read and write at the same time. +// +// Reads and writes are buffered. +// It is generally not necessary to wrap a stream in a [bufio.ReadWriter] +// or otherwise apply additional buffering. +// +// To cancel reads or writes, use the [SetReadContext] and [SetWriteContext] methods. +type Stream struct { + id streamID + conn *Conn + + // Contexts used for read/write operations. + // Intentionally not mutex-guarded, to allow the race detector to catch concurrent access. + inctx context.Context + outctx context.Context + + // ingate's lock guards receive-related state. + // + // The gate condition is set if a read from the stream will not block, + // either because the stream has available data or because the read will fail. + ingate gate + in pipe // received data + inwin int64 // last MAX_STREAM_DATA sent to the peer + insendmax sentVal // set when we should send MAX_STREAM_DATA to the peer + inmaxbuf int64 // maximum amount of data we will buffer + insize int64 // stream final size; -1 before this is known + inset rangeset[int64] // received ranges + inclosed sentVal // set by CloseRead + inresetcode int64 // RESET_STREAM code received from the peer; -1 if not reset + + // outgate's lock guards send-related state. + // + // The gate condition is set if a write to the stream will not block, + // either because the stream has available flow control or because + // the write will fail. + outgate gate + out pipe // buffered data to send + outflushed int64 // offset of last flush call + outwin int64 // maximum MAX_STREAM_DATA received from the peer + outmaxsent int64 // maximum data offset we've sent to the peer + outmaxbuf int64 // maximum amount of data we will buffer + outunsent rangeset[int64] // ranges buffered but not yet sent (only flushed data) + outacked rangeset[int64] // ranges sent and acknowledged + outopened sentVal // set if we should open the stream + outclosed sentVal // set by CloseWrite + outblocked sentVal // set when a write to the stream is blocked by flow control + outreset sentVal // set by Reset + outresetcode uint64 // reset code to send in RESET_STREAM + outdone chan struct{} // closed when all data sent + + // Unsynchronized buffers, used for lock-free fast path. + inbuf []byte // received data + inbufoff int // bytes of inbuf which have been consumed + outbuf []byte // written data + outbufoff int // bytes of outbuf which contain data to write + + // Atomic stream state bits. + // + // These bits provide a fast way to coordinate between the + // send and receive sides of the stream, and the conn's loop. + // + // streamIn* bits must be set with ingate held. + // streamOut* bits must be set with outgate held. + // streamConn* bits are set by the conn's loop. + // streamQueue* bits must be set with streamsState.sendMu held. + state atomicBits[streamState] + + prev, next *Stream // guarded by streamsState.sendMu +} + +type streamState uint32 + +const ( + // streamInSendMeta is set when there are frames to send for the + // inbound side of the stream. For example, MAX_STREAM_DATA. + // Inbound frames are never flow-controlled. + streamInSendMeta = streamState(1 << iota) + + // streamOutSendMeta is set when there are non-flow-controlled frames + // to send for the outbound side of the stream. For example, STREAM_DATA_BLOCKED. + // streamOutSendData is set when there are no non-flow-controlled outbound frames + // and the stream has data to send. + // + // At most one of streamOutSendMeta and streamOutSendData is set at any time. + streamOutSendMeta + streamOutSendData + + // streamInDone and streamOutDone are set when the inbound or outbound + // sides of the stream are finished. When both are set, the stream + // can be removed from the Conn and forgotten. + streamInDone + streamOutDone + + // streamConnRemoved is set when the stream has been removed from the conn. + streamConnRemoved + + // streamQueueMeta and streamQueueData indicate which of the streamsState + // send queues the conn is currently on. + streamQueueMeta + streamQueueData +) + +type streamQueue int + +const ( + noQueue = streamQueue(iota) + metaQueue // streamsState.queueMeta + dataQueue // streamsState.queueData +) + +// streamResetByConnClose is assigned to Stream.inresetcode to indicate that a stream +// was implicitly reset when the connection closed. It's out of the range of +// possible reset codes the peer can send. +const streamResetByConnClose = math.MaxInt64 + +// wantQueue returns the send queue the stream should be on. +func (s streamState) wantQueue() streamQueue { + switch { + case s&(streamInSendMeta|streamOutSendMeta) != 0: + return metaQueue + case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone: + return metaQueue + case s&streamOutSendData != 0: + // The stream has no non-flow-controlled frames to send, + // but does have data. Put it on the data queue, which is only + // processed when flow control is available. + return dataQueue + } + return noQueue +} + +// inQueue returns the send queue the stream is currently on. +func (s streamState) inQueue() streamQueue { + switch { + case s&streamQueueMeta != 0: + return metaQueue + case s&streamQueueData != 0: + return dataQueue + } + return noQueue +} + +// newStream returns a new stream. +// +// The stream's ingate and outgate are locked. +// (We create the stream with locked gates so after the caller +// initializes the flow control window, +// unlocking outgate will set the stream writability state.) +func newStream(c *Conn, id streamID) *Stream { + s := &Stream{ + conn: c, + id: id, + insize: -1, // -1 indicates the stream size is unknown + inresetcode: -1, // -1 indicates no RESET_STREAM received + ingate: newLockedGate(), + outgate: newLockedGate(), + inctx: context.Background(), + outctx: context.Background(), + } + if !s.IsReadOnly() { + s.outdone = make(chan struct{}) + } + return s +} + +// ID returns the QUIC stream ID of s. +// +// As specified in RFC 9000, the two least significant bits of a stream ID +// indicate the initiator and directionality of the stream. The upper bits are +// the stream number. +func (s *Stream) ID() int64 { + return int64(s.id) +} + +// SetReadContext sets the context used for reads from the stream. +// +// It is not safe to call SetReadContext concurrently. +func (s *Stream) SetReadContext(ctx context.Context) { + s.inctx = ctx +} + +// SetWriteContext sets the context used for writes to the stream. +// The write context is also used by Close when waiting for writes to be +// received by the peer. +// +// It is not safe to call SetWriteContext concurrently. +func (s *Stream) SetWriteContext(ctx context.Context) { + s.outctx = ctx +} + +// IsReadOnly reports whether the stream is read-only +// (a unidirectional stream created by the peer). +func (s *Stream) IsReadOnly() bool { + return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side +} + +// IsWriteOnly reports whether the stream is write-only +// (a unidirectional stream created locally). +func (s *Stream) IsWriteOnly() bool { + return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side +} + +// Read reads data from the stream. +// +// Read returns as soon as at least one byte of data is available. +// +// If the peer closes the stream cleanly, Read returns io.EOF after +// returning all data sent by the peer. +// If the peer aborts reads on the stream, Read returns +// an error wrapping StreamResetCode. +// +// It is not safe to call Read concurrently. +func (s *Stream) Read(b []byte) (n int, err error) { + if s.IsWriteOnly() { + return 0, errors.New("read from write-only stream") + } + if len(s.inbuf) > s.inbufoff { + // Fast path: If s.inbuf contains unread bytes, return them immediately + // without taking a lock. + n = copy(b, s.inbuf[s.inbufoff:]) + s.inbufoff += n + return n, nil + } + if err := s.ingate.waitAndLock(s.inctx); err != nil { + return 0, err + } + if s.inbufoff > 0 { + // Discard bytes consumed by the fast path above. + s.in.discardBefore(s.in.start + int64(s.inbufoff)) + s.inbufoff = 0 + s.inbuf = nil + } + // bytesRead contains the number of bytes of connection-level flow control to return. + // We return flow control for bytes read by this Read call, as well as bytes moved + // to the fast-path read buffer (s.inbuf). + var bytesRead int64 + defer func() { + s.inUnlock() + s.conn.handleStreamBytesReadOffLoop(bytesRead) // must be done with ingate unlocked + }() + if s.inresetcode != -1 { + if s.inresetcode == streamResetByConnClose { + if err := s.conn.finalError(); err != nil { + return 0, err + } + } + return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode)) + } + if s.inclosed.isSet() { + return 0, errors.New("read from closed stream") + } + if s.insize == s.in.start { + return 0, io.EOF + } + // Getting here indicates the stream contains data to be read. + if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start { + panic("BUG: inconsistent input stream state") + } + if size := int(s.inset[0].end - s.in.start); size < len(b) { + b = b[:size] + } + bytesRead = int64(len(b)) + start := s.in.start + end := start + int64(len(b)) + s.in.copy(start, b) + s.in.discardBefore(end) + if end == s.insize { + // We have read up to the end of the stream. + // No need to update stream flow control. + return len(b), io.EOF + } + if len(s.inset) > 0 && s.inset[0].start <= s.in.start && s.inset[0].end > s.in.start { + // If we have more readable bytes available, put the next chunk of data + // in s.inbuf for lock-free reads. + s.inbuf = s.in.peek(s.inset[0].end - s.in.start) + bytesRead += int64(len(s.inbuf)) + } + if s.insize == -1 || s.insize > s.inwin { + newWindow := s.in.start + int64(len(s.inbuf)) + s.inmaxbuf + addedWindow := newWindow - s.inwin + if shouldUpdateFlowControl(s.inmaxbuf, addedWindow) { + // Update stream flow control with a STREAM_MAX_DATA frame. + s.insendmax.setUnsent() + } + } + return len(b), nil +} + +// ReadByte reads and returns a single byte from the stream. +// +// It is not safe to call ReadByte concurrently. +func (s *Stream) ReadByte() (byte, error) { + if len(s.inbuf) > s.inbufoff { + b := s.inbuf[s.inbufoff] + s.inbufoff++ + return b, nil + } + var b [1]byte + n, err := s.Read(b[:]) + if n > 0 { + return b[0], nil + } + return 0, err +} + +// shouldUpdateFlowControl determines whether to send a flow control window update. +// +// We want to balance keeping the peer well-supplied with flow control with not sending +// many small updates. +func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool { + return addedWindow >= maxWindow/8 +} + +// Write writes data to the stream. +// +// Write writes data to the stream write buffer. +// Buffered data is only sent when the buffer is sufficiently full. +// Call the Flush method to ensure buffered data is sent. +func (s *Stream) Write(b []byte) (n int, err error) { + if s.IsReadOnly() { + return 0, errors.New("write to read-only stream") + } + if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) { + // Fast path: The data to write fits in s.outbuf. + copy(s.outbuf[s.outbufoff:], b) + s.outbufoff += len(b) + return len(b), nil + } + canWrite := s.outgate.lock() + s.flushFastOutputBuffer() + for { + // The first time through this loop, we may or may not be write blocked. + // We exit the loop after writing all data, so on subsequent passes through + // the loop we are always write blocked. + if len(b) > 0 && !canWrite { + // Our send buffer is full. Wait for the peer to ack some data. + s.outUnlock() + if err := s.outgate.waitAndLock(s.outctx); err != nil { + return n, err + } + // Successfully returning from waitAndLockGate means we are no longer + // write blocked. (Unlike traditional condition variables, gates do not + // have spurious wakeups.) + } + if err := s.writeErrorLocked(); err != nil { + s.outUnlock() + return n, err + } + if len(b) == 0 { + break + } + // Write limit is our send buffer limit. + // This is a stream offset. + lim := s.out.start + s.outmaxbuf + // Amount to write is min(the full buffer, data up to the write limit). + // This is a number of bytes. + nn := min(int64(len(b)), lim-s.out.end) + // Copy the data into the output buffer. + s.out.writeAt(b[:nn], s.out.end) + b = b[nn:] + n += int(nn) + // Possibly flush the output buffer. + // We automatically flush if: + // - We have enough data to consume the send window. + // Sending this data may cause the peer to extend the window. + // - We have buffered as much data as we're willing do. + // We need to send data to clear out buffer space. + // - We have enough data to fill a 1-RTT packet using the smallest + // possible maximum datagram size (1200 bytes, less header byte, + // connection ID, packet number, and AEAD overhead). + const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead + shouldFlush := s.out.end >= s.outwin || // peer send window is full + s.out.end >= lim || // local send buffer is full + (s.out.end-s.outflushed) >= autoFlushSize // enough data buffered + if shouldFlush { + s.flushLocked() + } + if s.out.end > s.outwin { + // We're blocked by flow control. + // Send a STREAM_DATA_BLOCKED frame to let the peer know. + s.outblocked.set() + } + // If we have bytes left to send, we're blocked. + canWrite = false + } + if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 { + // If s.out has space allocated and available to be written into, + // then reference it in s.outbuf for fast-path writes. + // + // It's perhaps a bit pointless to limit s.outbuf to the send buffer limit. + // We've already allocated this buffer so we aren't saving any memory + // by not using it. + // For now, we limit it anyway to make it easier to reason about limits. + // + // We set the limit to one less than the send buffer limit (the -1 above) + // so that a write which completely fills the buffer will overflow + // s.outbuf and trigger a flush. + s.outbuf = s.out.availableBuffer() + if int64(len(s.outbuf)) > lim { + s.outbuf = s.outbuf[:lim] + } + } + s.outUnlock() + return n, nil +} + +// WriteByte writes a single byte to the stream. +func (s *Stream) WriteByte(c byte) error { + if s.outbufoff < len(s.outbuf) { + s.outbuf[s.outbufoff] = c + s.outbufoff++ + return nil + } + b := [1]byte{c} + _, err := s.Write(b[:]) + return err +} + +func (s *Stream) flushFastOutputBuffer() { + if s.outbuf == nil { + return + } + // Commit data previously written to s.outbuf. + // s.outbuf is a reference to a buffer in s.out, so we just need to record + // that the output buffer has been extended. + s.out.end += int64(s.outbufoff) + s.outbuf = nil + s.outbufoff = 0 +} + +// Flush flushes data written to the stream. +// It does not wait for the peer to acknowledge receipt of the data. +// Use Close to wait for the peer's acknowledgement. +func (s *Stream) Flush() error { + if s.IsReadOnly() { + return errors.New("flush of read-only stream") + } + s.outgate.lock() + defer s.outUnlock() + if err := s.writeErrorLocked(); err != nil { + return err + } + s.flushLocked() + return nil +} + +// writeErrorLocked returns the error (if any) which should be returned by write operations +// due to the stream being reset or closed. +func (s *Stream) writeErrorLocked() error { + if s.outreset.isSet() { + if s.outresetcode == streamResetByConnClose { + if err := s.conn.finalError(); err != nil { + return err + } + } + return errors.New("write to reset stream") + } + if s.outclosed.isSet() { + return errors.New("write to closed stream") + } + return nil +} + +func (s *Stream) flushLocked() { + s.flushFastOutputBuffer() + s.outopened.set() + if s.outflushed < s.outwin { + s.outunsent.add(s.outflushed, min(s.outwin, s.out.end)) + } + s.outflushed = s.out.end +} + +// Close closes the stream. +// Any blocked stream operations will be unblocked and return errors. +// +// Close flushes any data in the stream write buffer and waits for the peer to +// acknowledge receipt of the data. +// If the stream has been reset, it waits for the peer to acknowledge the reset. +// If the context expires before the peer receives the stream's data, +// Close discards the buffer and returns the context error. +func (s *Stream) Close() error { + s.CloseRead() + if s.IsReadOnly() { + return nil + } + s.CloseWrite() + // TODO: Return code from peer's RESET_STREAM frame? + if err := s.conn.waitOnDone(s.outctx, s.outdone); err != nil { + return err + } + s.outgate.lock() + defer s.outUnlock() + if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) { + return nil + } + return errors.New("stream reset") +} + +// CloseRead aborts reads on the stream. +// Any blocked reads will be unblocked and return errors. +// +// CloseRead notifies the peer that the stream has been closed for reading. +// It does not wait for the peer to acknowledge the closure. +// Use Close to wait for the peer's acknowledgement. +func (s *Stream) CloseRead() { + if s.IsWriteOnly() { + return + } + s.ingate.lock() + if s.inset.isrange(0, s.insize) || s.inresetcode != -1 { + // We've already received all data from the peer, + // so there's no need to send STOP_SENDING. + // This is the same as saying we sent one and they got it. + s.inclosed.setReceived() + } else { + s.inclosed.set() + } + discarded := s.in.end - s.in.start + s.in.discardBefore(s.in.end) + s.inUnlock() + s.conn.handleStreamBytesReadOffLoop(discarded) // must be done with ingate unlocked +} + +// CloseWrite aborts writes on the stream. +// Any blocked writes will be unblocked and return errors. +// +// CloseWrite sends any data in the stream write buffer to the peer. +// It does not wait for the peer to acknowledge receipt of the data. +// Use Close to wait for the peer's acknowledgement. +func (s *Stream) CloseWrite() { + if s.IsReadOnly() { + return + } + s.outgate.lock() + defer s.outUnlock() + s.outclosed.set() + s.flushLocked() +} + +// Reset aborts writes on the stream and notifies the peer +// that the stream was terminated abruptly. +// Any blocked writes will be unblocked and return errors. +// +// Reset sends the application protocol error code, which must be +// less than 2^62, to the peer. +// It does not wait for the peer to acknowledge receipt of the error. +// Use Close to wait for the peer's acknowledgement. +// +// Reset does not affect reads. +// Use CloseRead to abort reads on the stream. +func (s *Stream) Reset(code uint64) { + const userClosed = true + s.resetInternal(code, userClosed) +} + +// resetInternal resets the send side of the stream. +// +// If userClosed is true, this is s.Reset. +// If userClosed is false, this is a reaction to a STOP_SENDING frame. +func (s *Stream) resetInternal(code uint64, userClosed bool) { + s.outgate.lock() + defer s.outUnlock() + if s.IsReadOnly() { + return + } + if userClosed { + // Mark that the user closed the stream. + s.outclosed.set() + } + if s.outreset.isSet() { + return + } + if code > quicwire.MaxVarint { + code = quicwire.MaxVarint + } + // We could check here to see if the stream is closed and the + // peer has acked all the data and the FIN, but sending an + // extra RESET_STREAM in this case is harmless. + s.outreset.set() + s.outresetcode = code + s.outbuf = nil + s.outbufoff = 0 + s.out.discardBefore(s.out.end) + s.outunsent = rangeset[int64]{} + s.outblocked.clear() +} + +// connHasClosed indicates the stream's conn has closed. +func (s *Stream) connHasClosed() { + // If we're in the closing state, the user closed the conn. + // Otherwise, we the peer initiated the close. + // This only matters for the error we're going to return from stream operations. + localClose := s.conn.lifetime.state == connStateClosing + + s.ingate.lock() + if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 { + if localClose { + s.inclosed.set() + } else { + s.inresetcode = streamResetByConnClose + } + } + s.inUnlock() + + s.outgate.lock() + if localClose { + s.outclosed.set() + s.outreset.set() + } else { + s.outresetcode = streamResetByConnClose + s.outreset.setReceived() + } + s.outUnlock() +} + +// inUnlock unlocks s.ingate. +// It sets the gate condition if reads from s will not block. +// If s has receive-related frames to write or if both directions +// are done and the stream should be removed, it notifies the Conn. +func (s *Stream) inUnlock() { + state := s.inUnlockNoQueue() + s.conn.maybeQueueStreamForSend(s, state) +} + +// inUnlockNoQueue is inUnlock, +// but reports whether s has frames to write rather than notifying the Conn. +func (s *Stream) inUnlockNoQueue() streamState { + nextByte := s.in.start + int64(len(s.inbuf)) + canRead := s.inset.contains(nextByte) || // data available to read + s.insize == s.in.start+int64(len(s.inbuf)) || // at EOF + s.inresetcode != -1 || // reset by peer + s.inclosed.isSet() // closed locally + defer s.ingate.unlock(canRead) + var state streamState + switch { + case s.IsWriteOnly(): + state = streamInDone + case s.inresetcode != -1: // reset by peer + fallthrough + case s.in.start == s.insize: // all data received and read + // We don't increase MAX_STREAMS until the user calls ReadClose or Close, + // so the receive side is not finished until inclosed is set. + if s.inclosed.isSet() { + state = streamInDone + } + case s.insendmax.shouldSend(): // STREAM_MAX_DATA + state = streamInSendMeta + case s.inclosed.shouldSend(): // STOP_SENDING + state = streamInSendMeta + } + const mask = streamInDone | streamInSendMeta + return s.state.set(state, mask) +} + +// outUnlock unlocks s.outgate. +// It sets the gate condition if writes to s will not block. +// If s has send-related frames to write or if both directions +// are done and the stream should be removed, it notifies the Conn. +func (s *Stream) outUnlock() { + state := s.outUnlockNoQueue() + s.conn.maybeQueueStreamForSend(s, state) +} + +// outUnlockNoQueue is outUnlock, +// but reports whether s has frames to write rather than notifying the Conn. +func (s *Stream) outUnlockNoQueue() streamState { + isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) || // all data acked + s.outreset.isSet() // reset locally + if isDone { + select { + case <-s.outdone: + default: + if !s.IsReadOnly() { + close(s.outdone) + } + } + } + lim := s.out.start + s.outmaxbuf + canWrite := lim > s.out.end || // available send buffer + s.outclosed.isSet() || // closed locally + s.outreset.isSet() // reset locally + defer s.outgate.unlock(canWrite) + var state streamState + switch { + case s.IsReadOnly(): + state = streamOutDone + case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end): // all data sent and acked + fallthrough + case s.outreset.isReceived(): // RESET_STREAM sent and acked + // We don't increase MAX_STREAMS until the user calls WriteClose or Close, + // so the send side is not finished until outclosed is set. + if s.outclosed.isSet() { + state = streamOutDone + } + case s.outreset.shouldSend(): // RESET_STREAM + state = streamOutSendMeta + case s.outreset.isSet(): // RESET_STREAM sent but not acknowledged + case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED + state = streamOutSendMeta + case len(s.outunsent) > 0: // STREAM frame with data + if s.outunsent.min() < s.outmaxsent { + state = streamOutSendMeta // resent data, will not consume flow control + } else { + state = streamOutSendData // new data, requires flow control + } + case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit + state = streamOutSendMeta + case s.outopened.shouldSend(): // STREAM frame with no data + state = streamOutSendMeta + } + const mask = streamOutDone | streamOutSendMeta | streamOutSendData + return s.state.set(state, mask) +} + +// handleData handles data received in a STREAM frame. +func (s *Stream) handleData(off int64, b []byte, fin bool) error { + s.ingate.lock() + defer s.inUnlock() + end := off + int64(len(b)) + if err := s.checkStreamBounds(end, fin); err != nil { + return err + } + if s.inclosed.isSet() || s.inresetcode != -1 { + // The user read-closed the stream, or the peer reset it. + // Either way, we can discard this frame. + return nil + } + if s.insize == -1 && end > s.in.end { + added := end - s.in.end + if err := s.conn.handleStreamBytesReceived(added); err != nil { + return err + } + } + s.in.writeAt(b, off) + s.inset.add(off, end) + if fin { + s.insize = end + // The peer has enough flow control window to send the entire stream. + s.insendmax.clear() + } + return nil +} + +// handleReset handles a RESET_STREAM frame. +func (s *Stream) handleReset(code uint64, finalSize int64) error { + s.ingate.lock() + defer s.inUnlock() + const fin = true + if err := s.checkStreamBounds(finalSize, fin); err != nil { + return err + } + if s.inresetcode != -1 { + // The stream was already reset. + return nil + } + if s.insize == -1 { + added := finalSize - s.in.end + if err := s.conn.handleStreamBytesReceived(added); err != nil { + return err + } + } + s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start) + s.in.discardBefore(s.in.end) + s.inresetcode = int64(code) + s.insize = finalSize + return nil +} + +// checkStreamBounds validates the stream offset in a STREAM or RESET_STREAM frame. +func (s *Stream) checkStreamBounds(end int64, fin bool) error { + if end > s.inwin { + // The peer sent us data past the maximum flow control window we gave them. + return localTransportError{ + code: errFlowControl, + reason: "stream flow control window exceeded", + } + } + if s.insize != -1 && end > s.insize { + // The peer sent us data past the final size of the stream they previously gave us. + return localTransportError{ + code: errFinalSize, + reason: "data received past end of stream", + } + } + if fin && s.insize != -1 && end != s.insize { + // The peer changed the final size of the stream. + return localTransportError{ + code: errFinalSize, + reason: "final size of stream changed", + } + } + if fin && end < s.in.end { + // The peer has previously sent us data past the final size. + return localTransportError{ + code: errFinalSize, + reason: "end of stream occurs before prior data", + } + } + return nil +} + +// handleStopSending handles a STOP_SENDING frame. +func (s *Stream) handleStopSending(code uint64) error { + // Peer requests that we reset this stream. + // https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4 + const userReset = false + s.resetInternal(code, userReset) + return nil +} + +// handleMaxStreamData handles an update received in a MAX_STREAM_DATA frame. +func (s *Stream) handleMaxStreamData(maxStreamData int64) error { + s.outgate.lock() + defer s.outUnlock() + if maxStreamData <= s.outwin { + return nil + } + if s.outflushed > s.outwin { + s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed)) + } + s.outwin = maxStreamData + if s.out.end > s.outwin { + // We've still got more data than flow control window. + s.outblocked.setUnsent() + } else { + s.outblocked.clear() + } + return nil +} + +// ackOrLoss handles the fate of stream frames other than STREAM. +func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) { + // Frames which carry new information each time they are sent + // (MAX_STREAM_DATA, STREAM_DATA_BLOCKED) must only be marked + // as received if the most recent packet carrying this frame is acked. + // + // Frames which are always the same (STOP_SENDING, RESET_STREAM) + // can be marked as received if any packet carrying this frame is acked. + switch ftype { + case frameTypeResetStream: + s.outgate.lock() + s.outreset.ackOrLoss(pnum, fate) + s.outUnlock() + case frameTypeStopSending: + s.ingate.lock() + s.inclosed.ackOrLoss(pnum, fate) + s.inUnlock() + case frameTypeMaxStreamData: + s.ingate.lock() + s.insendmax.ackLatestOrLoss(pnum, fate) + s.inUnlock() + case frameTypeStreamDataBlocked: + s.outgate.lock() + s.outblocked.ackLatestOrLoss(pnum, fate) + s.outUnlock() + default: + panic("unhandled frame type") + } +} + +// ackOrLossData handles the fate of a STREAM frame. +func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) { + s.outgate.lock() + defer s.outUnlock() + s.outopened.ackOrLoss(pnum, fate) + if fin { + s.outclosed.ackOrLoss(pnum, fate) + } + if s.outreset.isSet() { + // If the stream has been reset, we don't care any more. + return + } + switch fate { + case packetAcked: + s.outacked.add(start, end) + s.outunsent.sub(start, end) + // If this ack is for data at the start of the send buffer, we can now discard it. + if s.outacked.contains(s.out.start) { + s.out.discardBefore(s.outacked[0].end) + } + case packetLost: + // Mark everything lost, but not previously acked, as needing retransmission. + // We do this by adding all the lost bytes to outunsent, and then + // removing everything already acked. + s.outunsent.add(start, end) + for _, a := range s.outacked { + s.outunsent.sub(a.start, a.end) + } + } +} + +// appendInFramesLocked appends STOP_SENDING and MAX_STREAM_DATA frames +// to the current packet. +// +// It returns true if no more frames need appending, +// false if not everything fit in the current packet. +func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool { + if s.inclosed.shouldSendPTO(pto) { + // We don't currently have an API for setting the error code. + // Just send zero. + code := uint64(0) + if !w.appendStopSendingFrame(s.id, code) { + return false + } + s.inclosed.setSent(pnum) + } + // TODO: STOP_SENDING + if s.insendmax.shouldSendPTO(pto) { + // MAX_STREAM_DATA + maxStreamData := s.in.start + s.inmaxbuf + if !w.appendMaxStreamDataFrame(s.id, maxStreamData) { + return false + } + s.inwin = maxStreamData + s.insendmax.setSent(pnum) + } + return true +} + +// appendOutFramesLocked appends RESET_STREAM, STREAM_DATA_BLOCKED, and STREAM frames +// to the current packet. +// +// It returns true if no more frames need appending, +// false if not everything fit in the current packet. +func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool { + if s.outreset.isSet() { + // RESET_STREAM + if s.outreset.shouldSendPTO(pto) { + if !w.appendResetStreamFrame(s.id, s.outresetcode, min(s.outwin, s.out.end)) { + return false + } + s.outreset.setSent(pnum) + s.frameOpensStream(pnum) + } + return true + } + if s.outblocked.shouldSendPTO(pto) { + // STREAM_DATA_BLOCKED + if !w.appendStreamDataBlockedFrame(s.id, s.outwin) { + return false + } + s.outblocked.setSent(pnum) + s.frameOpensStream(pnum) + } + for { + // STREAM + off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto) + if end := off + size; end > s.outmaxsent { + // This will require connection-level flow control to send. + end = min(end, s.outmaxsent+s.conn.streams.outflow.avail()) + end = max(end, off) + size = end - off + } + fin := s.outclosed.isSet() && off+size == s.out.end + shouldSend := size > 0 || // have data to send + s.outopened.shouldSendPTO(pto) || // should open the stream + (fin && s.outclosed.shouldSendPTO(pto)) // should close the stream + if !shouldSend { + return true + } + b, added := w.appendStreamFrame(s.id, off, int(size), fin) + if !added { + return false + } + s.out.copy(off, b) + end := off + int64(len(b)) + if end > s.outmaxsent { + s.conn.streams.outflow.consume(end - s.outmaxsent) + s.outmaxsent = end + } + s.outunsent.sub(off, end) + s.frameOpensStream(pnum) + if fin { + s.outclosed.setSent(pnum) + } + if pto { + return true + } + if int64(len(b)) < size { + return false + } + } +} + +// frameOpensStream records that we're sending a frame that will open the stream. +// +// If we don't have an acknowledgement from the peer for a previous frame opening the stream, +// record this packet as being the latest one to open it. +func (s *Stream) frameOpensStream(pnum packetNumber) { + if !s.outopened.isReceived() { + s.outopened.setSent(pnum) + } +} + +// dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM. +func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) { + switch { + case pto: + // On PTO, resend unacked data that fits in the probe packet. + // For simplicity, we send the range starting at s.out.start + // (which is definitely unacked, or else we would have discarded it) + // up to the next acked byte (if any). + // + // This may miss unacked data starting after that acked byte, + // but avoids resending data the peer has acked. + for _, r := range outacked { + if r.start > start { + return start, r.start - start + } + } + return start, end - start + case outunsent.numRanges() > 0: + return outunsent.min(), outunsent[0].size() + default: + return end, 0 + } +} |
