mirror of https://github.com/k3s-io/k3s
Debounce kubernetes service endpoint updates
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 2992477c4b
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/7240/head
parent
7036323cd7
commit
64709f401d
|
@ -35,6 +35,10 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/cluster/ports"
|
"k8s.io/kubernetes/pkg/cluster/ports"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
endpointDebounceDelay = time.Second
|
||||||
|
)
|
||||||
|
|
||||||
type agentTunnel struct {
|
type agentTunnel struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
cidrs cidranger.Ranger
|
cidrs cidranger.Ranger
|
||||||
|
@ -306,9 +310,14 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
|
||||||
<-done
|
<-done
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var cancelUpdate context.CancelFunc
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
if cancelUpdate != nil {
|
||||||
|
cancelUpdate()
|
||||||
|
}
|
||||||
return
|
return
|
||||||
case ev, ok := <-watch.ResultChan():
|
case ev, ok := <-watch.ResultChan():
|
||||||
endpoint, ok := ev.Object.(*v1.Endpoints)
|
endpoint, ok := ev.Object.(*v1.Endpoints)
|
||||||
|
@ -317,28 +326,49 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
newAddresses := util.GetAddresses(endpoint)
|
if cancelUpdate != nil {
|
||||||
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
|
cancelUpdate()
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
proxy.Update(newAddresses)
|
|
||||||
|
|
||||||
validEndpoint := map[string]bool{}
|
var debounceCtx context.Context
|
||||||
|
debounceCtx, cancelUpdate = context.WithCancel(ctx)
|
||||||
|
|
||||||
for _, address := range proxy.SupervisorAddresses() {
|
// When joining the cluster, the apiserver adds, removes, and then readds itself to
|
||||||
validEndpoint[address] = true
|
// the endpoint list several times. This causes a bit of thrashing if we react to
|
||||||
if _, ok := disconnect[address]; !ok {
|
// endpoint changes immediately. Instead, perform the endpoint update in a
|
||||||
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
|
// goroutine that sleeps for a short period before checking for changes and updating
|
||||||
|
// the proxy addresses. If another update occurs, the previous update operation
|
||||||
|
// will be cancelled and a new one queued.
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-time.After(endpointDebounceDelay):
|
||||||
|
case <-debounceCtx.Done():
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for address, cancel := range disconnect {
|
newAddresses := util.GetAddresses(endpoint)
|
||||||
if !validEndpoint[address] {
|
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
|
||||||
cancel()
|
return
|
||||||
delete(disconnect, address)
|
|
||||||
logrus.Infof("Stopped tunnel to %s", address)
|
|
||||||
}
|
}
|
||||||
}
|
proxy.Update(newAddresses)
|
||||||
|
|
||||||
|
validEndpoint := map[string]bool{}
|
||||||
|
|
||||||
|
for _, address := range proxy.SupervisorAddresses() {
|
||||||
|
validEndpoint[address] = true
|
||||||
|
if _, ok := disconnect[address]; !ok {
|
||||||
|
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for address, cancel := range disconnect {
|
||||||
|
if !validEndpoint[address] {
|
||||||
|
cancel()
|
||||||
|
delete(disconnect, address)
|
||||||
|
logrus.Infof("Stopped tunnel to %s", address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue