From 04b943ce382531d572f6f70da7b1973aec1d4b09 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sun, 31 Mar 2019 15:09:12 -0500 Subject: [PATCH] proxy/userspace: track initial service/endpoints sync We'll use this shortly to prevent premature syncing before all initial endpoints and services have been received from the apiserver. --- pkg/proxy/userspace/proxier.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 0f57ce81bd..d50cd31a6c 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -109,6 +109,12 @@ type Proxier struct { proxyPorts PortAllocator makeProxySocket ProxySocketFunc exec utilexec.Interface + // endpointsSynced and servicesSynced are set to 1 when the corresponding + // objects are synced after startup. This is used to avoid updating iptables + // with some partial data after kube-proxy restart. + endpointsSynced int32 + servicesSynced int32 + initialized int32 stopChan chan struct{} } @@ -304,6 +310,10 @@ func (proxier *Proxier) shutdown() { close(proxier.stopChan) } +func (proxier *Proxier) isInitialized() bool { + return atomic.LoadInt32(&proxier.initialized) > 0 +} + // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { if err := iptablesInit(proxier.iptables); err != nil { @@ -545,6 +555,14 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { } func (proxier *Proxier) OnServiceSynced() { + klog.V(2).Infof("userspace OnServiceSynced") + + // Mark services as initialized and (if endpoints are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&proxier.servicesSynced, 1) + if atomic.LoadInt32(&proxier.endpointsSynced) > 0 { + atomic.StoreInt32(&proxier.initialized, 1) + } } func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { @@ -560,7 +578,15 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { } func (proxier *Proxier) OnEndpointsSynced() { + klog.V(2).Infof("userspace OnEndpointsSynced") proxier.loadBalancer.OnEndpointsSynced() + + // Mark endpoints as initialized and (if services are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&proxier.endpointsSynced, 1) + if atomic.LoadInt32(&proxier.servicesSynced) > 0 { + atomic.StoreInt32(&proxier.initialized, 1) + } } func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool {