From 90a4155236ef0d12bd6c900ded0d89030212e1b0 Mon Sep 17 00:00:00 2001 From: rPDmYQ <195319688+rPDmYQ@users.noreply.github.com> Date: Sat, 15 Feb 2025 14:50:50 +0000 Subject: [PATCH] Detect QUIC blackhole after handshake complete --- transport/internet/splithttp/race_dialer.go | 150 +++++++++++++++++--- 1 file changed, 129 insertions(+), 21 deletions(-) diff --git a/transport/internet/splithttp/race_dialer.go b/transport/internet/splithttp/race_dialer.go index f8692f33..6fd42653 100644 --- a/transport/internet/splithttp/race_dialer.go +++ b/transport/internet/splithttp/race_dialer.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "os" "sync" "sync/atomic" "time" @@ -36,6 +37,8 @@ const ( // net/third_party/quiche/src/quiche/quic/core/congestion_control/rtt_stats.cc // kAlpha chromeH3SmoothRTTAlpha = 0.125 + + h3MaxRoundTripScale = 3 ) type raceKeyType struct{} @@ -55,6 +58,18 @@ func isRaceInternalError(err error) bool { return goerrors.Is(err, loseRaceError) || goerrors.Is(err, brokenSpanError) } +type h3InitRoundTripTimeoutError struct { + err error + duration time.Duration +} + +func (h *h3InitRoundTripTimeoutError) Error() string { + return fmt.Sprintf("h3 not receiving any data in %s (%dx handshake RTT), QUIC is likely blocked on this network", h.duration, h3MaxRoundTripScale) +} +func (h *h3InitRoundTripTimeoutError) Unwrap() error { + return h.err +} + const ( raceInitialized = 0 raceEstablished = 1 @@ -71,6 +86,12 @@ const ( raceInactive raceResult = -2 ) +const ( + traceInit = 0 + traceInflight = 1 + traceSettled = 2 +) + type endpointInfo struct { lastFail time.Time failCount int @@ -117,12 +138,18 @@ func updateH3Broken(endpoint string, brokenAt time.Time) int { } info, ok := h3EndpointCatalog[endpoint] - if !ok { - h3EndpointCatalog[endpoint] = &endpointInfo{ - lastFail: brokenAt, - failCount: 1, + if brokenAt.IsZero() { + if ok { + info.failCount = 0 + info.lastFail = time.Time{} } - return 1 + + return 0 + } + + if !ok { + info = &endpointInfo{} + h3EndpointCatalog[endpoint] = info } info.failCount++ @@ -141,13 +168,12 @@ func smoothedRtt(oldRtt, newRtt int64) int64 { return int64((1-chromeH3SmoothRTTAlpha)*float64(oldRtt) + chromeH3SmoothRTTAlpha*float64(newRtt)) } -func updateH3RTT(endpoint string, rtt time.Duration) { +func updateH3RTT(endpoint string, rtt time.Duration) time.Duration { h3EndpointCatalogLock.RLock() info, ok := h3EndpointCatalog[endpoint] - if !ok || info.failCount > 0 { + if !ok { h3EndpointCatalogLock.RUnlock() - updateH3RTTSlow(endpoint, rtt) - return + return updateH3RTTSlow(endpoint, rtt) } defer h3EndpointCatalogLock.RUnlock() @@ -155,29 +181,104 @@ func updateH3RTT(endpoint string, rtt time.Duration) { oldRtt := info.rtt.Load() newRtt := smoothedRtt(oldRtt, int64(rtt)) if info.rtt.CompareAndSwap(oldRtt, newRtt) { - return + return time.Duration(newRtt) } } } -func updateH3RTTSlow(endpoint string, rtt time.Duration) { +func updateH3RTTSlow(endpoint string, rtt time.Duration) time.Duration { h3EndpointCatalogLock.Lock() defer h3EndpointCatalogLock.Unlock() + if h3EndpointCatalog == nil { + h3EndpointCatalog = make(map[string]*endpointInfo) + } info, ok := h3EndpointCatalog[endpoint] - switch { - case !ok: + if ok { + newRtt := smoothedRtt(info.rtt.Load(), int64(rtt)) + info.rtt.Store(newRtt) + return time.Duration(newRtt) + } else { info = &endpointInfo{} info.rtt.Store(int64(rtt)) - case info.failCount > 0: - info.failCount = 0 - info.lastFail = time.Time{} - info.rtt.Store(int64(rtt)) - default: - info.rtt.Store(smoothedRtt(info.rtt.Load(), int64(rtt))) + h3EndpointCatalog[endpoint] = info + return rtt } } +type quicStreamTraced struct { + quic.Stream + + conn *quicConnectionTraced + state atomic.Int32 +} + +func (s *quicStreamTraced) signal(success bool) { + if success { + s.conn.confirmedWorking.Store(true) + updateH3Broken(s.conn.endpoint, time.Time{}) + } else { + s.conn.signalTimeout() + s.CancelRead(quic.StreamErrorCode(quic.ApplicationErrorErrorCode)) + s.CancelWrite(quic.StreamErrorCode(quic.ApplicationErrorErrorCode)) + _ = s.Close() + } +} + +func (s *quicStreamTraced) Write(b []byte) (int, error) { + if s.state.CompareAndSwap(traceInit, traceInflight) { + _ = s.SetReadDeadline(time.Now().Add(s.conn.timeoutDuration)) + } + return s.Stream.Write(b) +} + +func (s *quicStreamTraced) Read(b []byte) (int, error) { + n, err := s.Stream.Read(b) + if s.state.CompareAndSwap(traceInflight, traceSettled) { + switch { + case err == nil: + _ = s.SetReadDeadline(time.Time{}) + s.signal(true) + case goerrors.Is(err, os.ErrDeadlineExceeded): + s.signal(false) + err = &h3InitRoundTripTimeoutError{ + err: err, + duration: s.conn.timeoutDuration, + } + } + } + + return n, err +} + +type quicConnectionTraced struct { + quic.EarlyConnection + + endpoint string + timeoutDuration time.Duration + confirmedWorking atomic.Bool +} + +func (conn *quicConnectionTraced) signalTimeout() { + _ = conn.CloseWithError(quic.ApplicationErrorCode(quic.ApplicationErrorErrorCode), "round trip timeout") + updateH3Broken(conn.endpoint, time.Now()) +} + +func (conn *quicConnectionTraced) OpenStreamSync(ctx context.Context) (quic.Stream, error) { + stream, err := conn.EarlyConnection.OpenStreamSync(ctx) + if err != nil { + return nil, err + } + if conn.confirmedWorking.Load() { + return stream, nil + } + + return &quicStreamTraced{ + Stream: stream, + conn: conn, + }, nil +} + type raceNotify struct { c chan struct{} result raceResult @@ -214,12 +315,19 @@ func (t *raceTransport) setup() *raceTransport { defer func() { notify := t.notify.Load() if err == nil { - updateH3RTT(t.dest, time.Since(dialStart)) + currRTT := time.Since(dialStart) + smoothRTT := updateH3RTT(t.dest, currRTT) notify.result = raceH3 close(notify.c) + + conn = &quicConnectionTraced{ + EarlyConnection: conn, + endpoint: t.dest, + timeoutDuration: max(currRTT, smoothRTT) * h3MaxRoundTripScale, + } } else if !isRaceInternalError(err) { failed := updateH3Broken(t.dest, time.Now()) - errors.LogDebug(ctx, "Race Dialer: h3 connection to ", t.dest, " failed ", failed, "times") + errors.LogDebug(ctx, "Race Dialer: h3 connection to ", t.dest, " failed ", failed, " time(s)") } // We can safely remove the raceNotify here, since both h2 and h3 Transport