aboutsummaryrefslogtreecommitdiff
path: root/src/vendor/golang.org/x/net/quic/endpoint.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/vendor/golang.org/x/net/quic/endpoint.go')
-rw-r--r--src/vendor/golang.org/x/net/quic/endpoint.go472
1 files changed, 472 insertions, 0 deletions
diff --git a/src/vendor/golang.org/x/net/quic/endpoint.go b/src/vendor/golang.org/x/net/quic/endpoint.go
new file mode 100644
index 0000000000..3d68073cd6
--- /dev/null
+++ b/src/vendor/golang.org/x/net/quic/endpoint.go
@@ -0,0 +1,472 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package quic
+
+import (
+ "context"
+ "crypto/rand"
+ "errors"
+ "net"
+ "net/netip"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// An Endpoint handles QUIC traffic on a network address.
+// It can accept inbound connections or create outbound ones.
+//
+// Multiple goroutines may invoke methods on an Endpoint simultaneously.
+type Endpoint struct {
+ listenConfig *Config
+ packetConn packetConn
+ testHooks endpointTestHooks
+ resetGen statelessResetTokenGenerator
+ retry retryState
+
+ acceptQueue queue[*Conn] // new inbound connections
+ connsMap connsMap // only accessed by the listen loop
+
+ connsMu sync.Mutex
+ conns map[*Conn]struct{}
+ closing bool // set when Close is called
+ closec chan struct{} // closed when the listen loop exits
+}
+
+type endpointTestHooks interface {
+ newConn(c *Conn)
+}
+
+// A packetConn is the interface to sending and receiving UDP packets.
+type packetConn interface {
+ Close() error
+ LocalAddr() netip.AddrPort
+ Read(f func(*datagram))
+ Write(datagram) error
+}
+
+// Listen listens on a local network address.
+//
+// The config is used to for connections accepted by the endpoint.
+// If the config is nil, the endpoint will not accept connections.
+func Listen(network, address string, listenConfig *Config) (*Endpoint, error) {
+ if listenConfig != nil && listenConfig.TLSConfig == nil {
+ return nil, errors.New("TLSConfig is not set")
+ }
+ a, err := net.ResolveUDPAddr(network, address)
+ if err != nil {
+ return nil, err
+ }
+ udpConn, err := net.ListenUDP(network, a)
+ if err != nil {
+ return nil, err
+ }
+ pc, err := newNetUDPConn(udpConn)
+ if err != nil {
+ return nil, err
+ }
+ return newEndpoint(pc, listenConfig, nil)
+}
+
+// NewEndpoint creates an endpoint using a net.PacketConn as the underlying transport.
+//
+// If the PacketConn is not a *net.UDPConn, the endpoint may be slower and lack
+// access to some features of the network.
+func NewEndpoint(conn net.PacketConn, config *Config) (*Endpoint, error) {
+ var pc packetConn
+ var err error
+ switch conn := conn.(type) {
+ case *net.UDPConn:
+ pc, err = newNetUDPConn(conn)
+ default:
+ pc, err = newNetPacketConn(conn)
+ }
+ if err != nil {
+ return nil, err
+ }
+ return newEndpoint(pc, config, nil)
+}
+
+func newEndpoint(pc packetConn, config *Config, hooks endpointTestHooks) (*Endpoint, error) {
+ e := &Endpoint{
+ listenConfig: config,
+ packetConn: pc,
+ testHooks: hooks,
+ conns: make(map[*Conn]struct{}),
+ acceptQueue: newQueue[*Conn](),
+ closec: make(chan struct{}),
+ }
+ var statelessResetKey [32]byte
+ if config != nil {
+ statelessResetKey = config.StatelessResetKey
+ }
+ e.resetGen.init(statelessResetKey)
+ e.connsMap.init()
+ if config != nil && config.RequireAddressValidation {
+ if err := e.retry.init(); err != nil {
+ return nil, err
+ }
+ }
+ go e.listen()
+ return e, nil
+}
+
+// LocalAddr returns the local network address.
+func (e *Endpoint) LocalAddr() netip.AddrPort {
+ return e.packetConn.LocalAddr()
+}
+
+// Close closes the Endpoint.
+// Any blocked operations on the Endpoint or associated Conns and Stream will be unblocked
+// and return errors.
+//
+// Close aborts every open connection.
+// Data in stream read and write buffers is discarded.
+// It waits for the peers of any open connection to acknowledge the connection has been closed.
+func (e *Endpoint) Close(ctx context.Context) error {
+ e.acceptQueue.close(errors.New("endpoint closed"))
+
+ // It isn't safe to call Conn.Abort or conn.exit with connsMu held,
+ // so copy the list of conns.
+ var conns []*Conn
+ e.connsMu.Lock()
+ if !e.closing {
+ e.closing = true // setting e.closing prevents new conns from being created
+ for c := range e.conns {
+ conns = append(conns, c)
+ }
+ if len(e.conns) == 0 {
+ e.packetConn.Close()
+ }
+ }
+ e.connsMu.Unlock()
+
+ for _, c := range conns {
+ c.Abort(localTransportError{code: errNo})
+ }
+ select {
+ case <-e.closec:
+ case <-ctx.Done():
+ for _, c := range conns {
+ c.exit()
+ }
+ return ctx.Err()
+ }
+ return nil
+}
+
+// Accept waits for and returns the next connection.
+func (e *Endpoint) Accept(ctx context.Context) (*Conn, error) {
+ return e.acceptQueue.get(ctx)
+}
+
+// Dial creates and returns a connection to a network address.
+// The config cannot be nil.
+func (e *Endpoint) Dial(ctx context.Context, network, address string, config *Config) (*Conn, error) {
+ u, err := net.ResolveUDPAddr(network, address)
+ if err != nil {
+ return nil, err
+ }
+ addr := u.AddrPort()
+ addr = netip.AddrPortFrom(addr.Addr().Unmap(), addr.Port())
+ c, err := e.newConn(time.Now(), config, clientSide, newServerConnIDs{}, address, addr)
+ if err != nil {
+ return nil, err
+ }
+ if err := c.waitReady(ctx); err != nil {
+ c.Abort(nil)
+ return nil, err
+ }
+ return c, nil
+}
+
+func (e *Endpoint) newConn(now time.Time, config *Config, side connSide, cids newServerConnIDs, peerHostname string, peerAddr netip.AddrPort) (*Conn, error) {
+ e.connsMu.Lock()
+ defer e.connsMu.Unlock()
+ if e.closing {
+ return nil, errors.New("endpoint closed")
+ }
+ c, err := newConn(now, side, cids, peerHostname, peerAddr, config, e)
+ if err != nil {
+ return nil, err
+ }
+ e.conns[c] = struct{}{}
+ return c, nil
+}
+
+// serverConnEstablished is called by a conn when the handshake completes
+// for an inbound (serverSide) connection.
+func (e *Endpoint) serverConnEstablished(c *Conn) {
+ e.acceptQueue.put(c)
+}
+
+// connDrained is called by a conn when it leaves the draining state,
+// either when the peer acknowledges connection closure or the drain timeout expires.
+func (e *Endpoint) connDrained(c *Conn) {
+ var cids [][]byte
+ for i := range c.connIDState.local {
+ cids = append(cids, c.connIDState.local[i].cid)
+ }
+ var tokens []statelessResetToken
+ for i := range c.connIDState.remote {
+ tokens = append(tokens, c.connIDState.remote[i].resetToken)
+ }
+ e.connsMap.updateConnIDs(func(conns *connsMap) {
+ for _, cid := range cids {
+ conns.retireConnID(c, cid)
+ }
+ for _, token := range tokens {
+ conns.retireResetToken(c, token)
+ }
+ })
+ e.connsMu.Lock()
+ defer e.connsMu.Unlock()
+ delete(e.conns, c)
+ if e.closing && len(e.conns) == 0 {
+ e.packetConn.Close()
+ }
+}
+
+func (e *Endpoint) listen() {
+ defer close(e.closec)
+ e.packetConn.Read(func(m *datagram) {
+ if e.connsMap.updateNeeded.Load() {
+ e.connsMap.applyUpdates()
+ }
+ e.handleDatagram(m)
+ })
+}
+
+func (e *Endpoint) handleDatagram(m *datagram) {
+ dstConnID, ok := dstConnIDForDatagram(m.b)
+ if !ok {
+ m.recycle()
+ return
+ }
+ c := e.connsMap.byConnID[string(dstConnID)]
+ if c == nil {
+ // TODO: Move this branch into a separate goroutine to avoid blocking
+ // the endpoint while processing packets.
+ e.handleUnknownDestinationDatagram(m)
+ return
+ }
+
+ // TODO: This can block the endpoint while waiting for the conn to accept the dgram.
+ // Think about buffering between the receive loop and the conn.
+ c.sendMsg(m)
+}
+
+func (e *Endpoint) handleUnknownDestinationDatagram(m *datagram) {
+ defer func() {
+ if m != nil {
+ m.recycle()
+ }
+ }()
+ const minimumValidPacketSize = 21
+ if len(m.b) < minimumValidPacketSize {
+ return
+ }
+ now := time.Now()
+ // Check to see if this is a stateless reset.
+ var token statelessResetToken
+ copy(token[:], m.b[len(m.b)-len(token):])
+ if c := e.connsMap.byResetToken[token]; c != nil {
+ c.sendMsg(func(now time.Time, c *Conn) {
+ c.handleStatelessReset(now, token)
+ })
+ return
+ }
+ // If this is a 1-RTT packet, there's nothing productive we can do with it.
+ // Send a stateless reset if possible.
+ if !isLongHeader(m.b[0]) {
+ e.maybeSendStatelessReset(m.b, m.peerAddr)
+ return
+ }
+ p, ok := parseGenericLongHeaderPacket(m.b)
+ if !ok || len(m.b) < paddedInitialDatagramSize {
+ return
+ }
+ switch p.version {
+ case quicVersion1:
+ case 0:
+ // Version Negotiation for an unknown connection.
+ return
+ default:
+ // Unknown version.
+ e.sendVersionNegotiation(p, m.peerAddr)
+ return
+ }
+ if getPacketType(m.b) != packetTypeInitial {
+ // This packet isn't trying to create a new connection.
+ // It might be associated with some connection we've lost state for.
+ // We are technically permitted to send a stateless reset for
+ // a long-header packet, but this isn't generally useful. See:
+ // https://www.rfc-editor.org/rfc/rfc9000#section-10.3-16
+ return
+ }
+ if e.listenConfig == nil {
+ // We are not configured to accept connections.
+ return
+ }
+ cids := newServerConnIDs{
+ srcConnID: p.srcConnID,
+ dstConnID: p.dstConnID,
+ }
+ if e.listenConfig.RequireAddressValidation {
+ var ok bool
+ cids.retrySrcConnID = p.dstConnID
+ cids.originalDstConnID, ok = e.validateInitialAddress(now, p, m.peerAddr)
+ if !ok {
+ return
+ }
+ } else {
+ cids.originalDstConnID = p.dstConnID
+ }
+ var err error
+ c, err := e.newConn(now, e.listenConfig, serverSide, cids, "", m.peerAddr)
+ if err != nil {
+ // The accept queue is probably full.
+ // We could send a CONNECTION_CLOSE to the peer to reject the connection.
+ // Currently, we just drop the datagram.
+ // https://www.rfc-editor.org/rfc/rfc9000.html#section-5.2.2-5
+ return
+ }
+ c.sendMsg(m)
+ m = nil // don't recycle, sendMsg takes ownership
+}
+
+func (e *Endpoint) maybeSendStatelessReset(b []byte, peerAddr netip.AddrPort) {
+ if !e.resetGen.canReset {
+ // Config.StatelessResetKey isn't set, so we don't send stateless resets.
+ return
+ }
+ // The smallest possible valid packet a peer can send us is:
+ // 1 byte of header
+ // connIDLen bytes of destination connection ID
+ // 1 byte of packet number
+ // 1 byte of payload
+ // 16 bytes AEAD expansion
+ if len(b) < 1+connIDLen+1+1+16 {
+ return
+ }
+ // TODO: Rate limit stateless resets.
+ cid := b[1:][:connIDLen]
+ token := e.resetGen.tokenForConnID(cid)
+ // We want to generate a stateless reset that is as short as possible,
+ // but long enough to be difficult to distinguish from a 1-RTT packet.
+ //
+ // The minimal 1-RTT packet is:
+ // 1 byte of header
+ // 0-20 bytes of destination connection ID
+ // 1-4 bytes of packet number
+ // 1 byte of payload
+ // 16 bytes AEAD expansion
+ //
+ // Assuming the maximum possible connection ID and packet number size,
+ // this gives 1 + 20 + 4 + 1 + 16 = 42 bytes.
+ //
+ // We also must generate a stateless reset that is shorter than the datagram
+ // we are responding to, in order to ensure that reset loops terminate.
+ //
+ // See: https://www.rfc-editor.org/rfc/rfc9000#section-10.3
+ size := min(len(b)-1, 42)
+ // Reuse the input buffer for generating the stateless reset.
+ b = b[:size]
+ rand.Read(b[:len(b)-statelessResetTokenLen])
+ b[0] &^= headerFormLong // clear long header bit
+ b[0] |= fixedBit // set fixed bit
+ copy(b[len(b)-statelessResetTokenLen:], token[:])
+ e.sendDatagram(datagram{
+ b: b,
+ peerAddr: peerAddr,
+ })
+}
+
+func (e *Endpoint) sendVersionNegotiation(p genericLongPacket, peerAddr netip.AddrPort) {
+ m := newDatagram()
+ m.b = appendVersionNegotiation(m.b[:0], p.srcConnID, p.dstConnID, quicVersion1)
+ m.peerAddr = peerAddr
+ e.sendDatagram(*m)
+ m.recycle()
+}
+
+func (e *Endpoint) sendConnectionClose(in genericLongPacket, peerAddr netip.AddrPort, code transportError) {
+ keys := initialKeys(in.dstConnID, serverSide)
+ var w packetWriter
+ p := longPacket{
+ ptype: packetTypeInitial,
+ version: quicVersion1,
+ num: 0,
+ dstConnID: in.srcConnID,
+ srcConnID: in.dstConnID,
+ }
+ const pnumMaxAcked = 0
+ w.reset(paddedInitialDatagramSize)
+ w.startProtectedLongHeaderPacket(pnumMaxAcked, p)
+ w.appendConnectionCloseTransportFrame(code, 0, "")
+ w.finishProtectedLongHeaderPacket(pnumMaxAcked, keys.w, p)
+ buf := w.datagram()
+ if len(buf) == 0 {
+ return
+ }
+ e.sendDatagram(datagram{
+ b: buf,
+ peerAddr: peerAddr,
+ })
+}
+
+func (e *Endpoint) sendDatagram(dgram datagram) error {
+ return e.packetConn.Write(dgram)
+}
+
+// A connsMap is an endpoint's mapping of conn ids and reset tokens to conns.
+type connsMap struct {
+ byConnID map[string]*Conn
+ byResetToken map[statelessResetToken]*Conn
+
+ updateMu sync.Mutex
+ updateNeeded atomic.Bool
+ updates []func(*connsMap)
+}
+
+func (m *connsMap) init() {
+ m.byConnID = map[string]*Conn{}
+ m.byResetToken = map[statelessResetToken]*Conn{}
+}
+
+func (m *connsMap) addConnID(c *Conn, cid []byte) {
+ m.byConnID[string(cid)] = c
+}
+
+func (m *connsMap) retireConnID(c *Conn, cid []byte) {
+ delete(m.byConnID, string(cid))
+}
+
+func (m *connsMap) addResetToken(c *Conn, token statelessResetToken) {
+ m.byResetToken[token] = c
+}
+
+func (m *connsMap) retireResetToken(c *Conn, token statelessResetToken) {
+ delete(m.byResetToken, token)
+}
+
+func (m *connsMap) updateConnIDs(f func(*connsMap)) {
+ m.updateMu.Lock()
+ defer m.updateMu.Unlock()
+ m.updates = append(m.updates, f)
+ m.updateNeeded.Store(true)
+}
+
+// applyUpdates is called by the datagram receive loop to update its connection ID map.
+func (m *connsMap) applyUpdates() {
+ m.updateMu.Lock()
+ defer m.updateMu.Unlock()
+ for _, f := range m.updates {
+ f(m)
+ }
+ clear(m.updates)
+ m.updates = m.updates[:0]
+ m.updateNeeded.Store(false)
+}