From 2992477c4ba6ab241c40eeabd8df335ca848a723 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Mon, 3 Apr 2023 20:51:26 +0000 Subject: [PATCH] Debounce kubernetes service endpoint updates Signed-off-by: Brad Davidson --- pkg/agent/tunnel/tunnel.go | 62 ++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index 5a77ea550a..701fdb5fe1 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -35,6 +35,10 @@ import ( "k8s.io/kubernetes/pkg/cluster/ports" ) +var ( + endpointDebounceDelay = time.Second +) + type agentTunnel struct { client kubernetes.Interface cidrs cidranger.Ranger @@ -306,9 +310,14 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan <-done }() + var cancelUpdate context.CancelFunc + for { select { case <-ctx.Done(): + if cancelUpdate != nil { + cancelUpdate() + } return case ev, ok := <-watch.ResultChan(): endpoint, ok := ev.Object.(*v1.Endpoints) @@ -317,28 +326,49 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan continue } - newAddresses := util.GetAddresses(endpoint) - if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) { - continue + if cancelUpdate != nil { + cancelUpdate() } - proxy.Update(newAddresses) - validEndpoint := map[string]bool{} + var debounceCtx context.Context + debounceCtx, cancelUpdate = context.WithCancel(ctx) + + // When joining the cluster, the apiserver adds, removes, and then readds itself to + // the endpoint list several times. This causes a bit of thrashing if we react to + // endpoint changes immediately. Instead, perform the endpoint update in a + // goroutine that sleeps for a short period before checking for changes and updating + // the proxy addresses. If another update occurs, the previous update operation + // will be cancelled and a new one queued. + go func() { + select { + case <-time.After(endpointDebounceDelay): + case <-debounceCtx.Done(): + return + } - for _, address := range proxy.SupervisorAddresses() { - validEndpoint[address] = true - if _, ok := disconnect[address]; !ok { - disconnect[address] = a.connect(ctx, nil, address, tlsConfig) + newAddresses := util.GetAddresses(endpoint) + if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) { + return } - } + proxy.Update(newAddresses) - for address, cancel := range disconnect { - if !validEndpoint[address] { - cancel() - delete(disconnect, address) - logrus.Infof("Stopped tunnel to %s", address) + validEndpoint := map[string]bool{} + + for _, address := range proxy.SupervisorAddresses() { + validEndpoint[address] = true + if _, ok := disconnect[address]; !ok { + disconnect[address] = a.connect(ctx, nil, address, tlsConfig) + } } - } + + for address, cancel := range disconnect { + if !validEndpoint[address] { + cancel() + delete(disconnect, address) + logrus.Infof("Stopped tunnel to %s", address) + } + } + }() } } }