Browse Source

Debounce kubernetes service endpoint updates

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/7215/head
Brad Davidson 2 years ago committed by Brad Davidson
parent
commit
2992477c4b
  1. 62
      pkg/agent/tunnel/tunnel.go

62
pkg/agent/tunnel/tunnel.go

@ -35,6 +35,10 @@ import (
"k8s.io/kubernetes/pkg/cluster/ports"
)
var (
endpointDebounceDelay = time.Second
)
type agentTunnel struct {
client kubernetes.Interface
cidrs cidranger.Ranger
@ -306,9 +310,14 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
<-done
}()
var cancelUpdate context.CancelFunc
for {
select {
case <-ctx.Done():
if cancelUpdate != nil {
cancelUpdate()
}
return
case ev, ok := <-watch.ResultChan():
endpoint, ok := ev.Object.(*v1.Endpoints)
@ -317,28 +326,49 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
continue
}
newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
continue
if cancelUpdate != nil {
cancelUpdate()
}
proxy.Update(newAddresses)
validEndpoint := map[string]bool{}
var debounceCtx context.Context
debounceCtx, cancelUpdate = context.WithCancel(ctx)
// When joining the cluster, the apiserver adds, removes, and then readds itself to
// the endpoint list several times. This causes a bit of thrashing if we react to
// endpoint changes immediately. Instead, perform the endpoint update in a
// goroutine that sleeps for a short period before checking for changes and updating
// the proxy addresses. If another update occurs, the previous update operation
// will be cancelled and a new one queued.
go func() {
select {
case <-time.After(endpointDebounceDelay):
case <-debounceCtx.Done():
return
}
for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
return
}
}
proxy.Update(newAddresses)
for address, cancel := range disconnect {
if !validEndpoint[address] {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
validEndpoint := map[string]bool{}
for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
}
}
}
for address, cancel := range disconnect {
if !validEndpoint[address] {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}()
}
}
}

Loading…
Cancel
Save