Detect QUIC blackhole after handshake complete

pull/4320/head
rPDmYQ 2025-02-15 14:50:50 +00:00 committed by GitHub
parent 427101e19f
commit 90a4155236
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 129 additions and 21 deletions

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
@ -36,6 +37,8 @@ const (
// net/third_party/quiche/src/quiche/quic/core/congestion_control/rtt_stats.cc
// kAlpha
chromeH3SmoothRTTAlpha = 0.125
h3MaxRoundTripScale = 3
)
type raceKeyType struct{}
@ -55,6 +58,18 @@ func isRaceInternalError(err error) bool {
return goerrors.Is(err, loseRaceError) || goerrors.Is(err, brokenSpanError)
}
type h3InitRoundTripTimeoutError struct {
err error
duration time.Duration
}
func (h *h3InitRoundTripTimeoutError) Error() string {
return fmt.Sprintf("h3 not receiving any data in %s (%dx handshake RTT), QUIC is likely blocked on this network", h.duration, h3MaxRoundTripScale)
}
func (h *h3InitRoundTripTimeoutError) Unwrap() error {
return h.err
}
const (
raceInitialized = 0
raceEstablished = 1
@ -71,6 +86,12 @@ const (
raceInactive raceResult = -2
)
const (
traceInit = 0
traceInflight = 1
traceSettled = 2
)
type endpointInfo struct {
lastFail time.Time
failCount int
@ -117,12 +138,18 @@ func updateH3Broken(endpoint string, brokenAt time.Time) int {
}
info, ok := h3EndpointCatalog[endpoint]
if !ok {
h3EndpointCatalog[endpoint] = &endpointInfo{
lastFail: brokenAt,
failCount: 1,
if brokenAt.IsZero() {
if ok {
info.failCount = 0
info.lastFail = time.Time{}
}
return 1
return 0
}
if !ok {
info = &endpointInfo{}
h3EndpointCatalog[endpoint] = info
}
info.failCount++
@ -141,13 +168,12 @@ func smoothedRtt(oldRtt, newRtt int64) int64 {
return int64((1-chromeH3SmoothRTTAlpha)*float64(oldRtt) + chromeH3SmoothRTTAlpha*float64(newRtt))
}
func updateH3RTT(endpoint string, rtt time.Duration) {
func updateH3RTT(endpoint string, rtt time.Duration) time.Duration {
h3EndpointCatalogLock.RLock()
info, ok := h3EndpointCatalog[endpoint]
if !ok || info.failCount > 0 {
if !ok {
h3EndpointCatalogLock.RUnlock()
updateH3RTTSlow(endpoint, rtt)
return
return updateH3RTTSlow(endpoint, rtt)
}
defer h3EndpointCatalogLock.RUnlock()
@ -155,29 +181,104 @@ func updateH3RTT(endpoint string, rtt time.Duration) {
oldRtt := info.rtt.Load()
newRtt := smoothedRtt(oldRtt, int64(rtt))
if info.rtt.CompareAndSwap(oldRtt, newRtt) {
return
return time.Duration(newRtt)
}
}
}
func updateH3RTTSlow(endpoint string, rtt time.Duration) {
func updateH3RTTSlow(endpoint string, rtt time.Duration) time.Duration {
h3EndpointCatalogLock.Lock()
defer h3EndpointCatalogLock.Unlock()
if h3EndpointCatalog == nil {
h3EndpointCatalog = make(map[string]*endpointInfo)
}
info, ok := h3EndpointCatalog[endpoint]
switch {
case !ok:
if ok {
newRtt := smoothedRtt(info.rtt.Load(), int64(rtt))
info.rtt.Store(newRtt)
return time.Duration(newRtt)
} else {
info = &endpointInfo{}
info.rtt.Store(int64(rtt))
case info.failCount > 0:
info.failCount = 0
info.lastFail = time.Time{}
info.rtt.Store(int64(rtt))
default:
info.rtt.Store(smoothedRtt(info.rtt.Load(), int64(rtt)))
h3EndpointCatalog[endpoint] = info
return rtt
}
}
type quicStreamTraced struct {
quic.Stream
conn *quicConnectionTraced
state atomic.Int32
}
func (s *quicStreamTraced) signal(success bool) {
if success {
s.conn.confirmedWorking.Store(true)
updateH3Broken(s.conn.endpoint, time.Time{})
} else {
s.conn.signalTimeout()
s.CancelRead(quic.StreamErrorCode(quic.ApplicationErrorErrorCode))
s.CancelWrite(quic.StreamErrorCode(quic.ApplicationErrorErrorCode))
_ = s.Close()
}
}
func (s *quicStreamTraced) Write(b []byte) (int, error) {
if s.state.CompareAndSwap(traceInit, traceInflight) {
_ = s.SetReadDeadline(time.Now().Add(s.conn.timeoutDuration))
}
return s.Stream.Write(b)
}
func (s *quicStreamTraced) Read(b []byte) (int, error) {
n, err := s.Stream.Read(b)
if s.state.CompareAndSwap(traceInflight, traceSettled) {
switch {
case err == nil:
_ = s.SetReadDeadline(time.Time{})
s.signal(true)
case goerrors.Is(err, os.ErrDeadlineExceeded):
s.signal(false)
err = &h3InitRoundTripTimeoutError{
err: err,
duration: s.conn.timeoutDuration,
}
}
}
return n, err
}
type quicConnectionTraced struct {
quic.EarlyConnection
endpoint string
timeoutDuration time.Duration
confirmedWorking atomic.Bool
}
func (conn *quicConnectionTraced) signalTimeout() {
_ = conn.CloseWithError(quic.ApplicationErrorCode(quic.ApplicationErrorErrorCode), "round trip timeout")
updateH3Broken(conn.endpoint, time.Now())
}
func (conn *quicConnectionTraced) OpenStreamSync(ctx context.Context) (quic.Stream, error) {
stream, err := conn.EarlyConnection.OpenStreamSync(ctx)
if err != nil {
return nil, err
}
if conn.confirmedWorking.Load() {
return stream, nil
}
return &quicStreamTraced{
Stream: stream,
conn: conn,
}, nil
}
type raceNotify struct {
c chan struct{}
result raceResult
@ -214,12 +315,19 @@ func (t *raceTransport) setup() *raceTransport {
defer func() {
notify := t.notify.Load()
if err == nil {
updateH3RTT(t.dest, time.Since(dialStart))
currRTT := time.Since(dialStart)
smoothRTT := updateH3RTT(t.dest, currRTT)
notify.result = raceH3
close(notify.c)
conn = &quicConnectionTraced{
EarlyConnection: conn,
endpoint: t.dest,
timeoutDuration: max(currRTT, smoothRTT) * h3MaxRoundTripScale,
}
} else if !isRaceInternalError(err) {
failed := updateH3Broken(t.dest, time.Now())
errors.LogDebug(ctx, "Race Dialer: h3 connection to ", t.dest, " failed ", failed, "times")
errors.LogDebug(ctx, "Race Dialer: h3 connection to ", t.dest, " failed ", failed, " time(s)")
}
// We can safely remove the raceNotify here, since both h2 and h3 Transport