proxy/userspace: track initial service/endpoints sync

We'll use this shortly to prevent premature syncing before all
initial endpoints and services have been received from the
apiserver.
k3s-v1.15.3
Dan Williams 2019-03-31 15:09:12 -05:00
parent ddab79a233
commit 04b943ce38
1 changed files with 26 additions and 0 deletions

View File

@ -109,6 +109,12 @@ type Proxier struct {
proxyPorts PortAllocator
makeProxySocket ProxySocketFunc
exec utilexec.Interface
// endpointsSynced and servicesSynced are set to 1 when the corresponding
// objects are synced after startup. This is used to avoid updating iptables
// with some partial data after kube-proxy restart.
endpointsSynced int32
servicesSynced int32
initialized int32
stopChan chan struct{}
}
@ -304,6 +310,10 @@ func (proxier *Proxier) shutdown() {
close(proxier.stopChan)
}
func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
}
// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {
if err := iptablesInit(proxier.iptables); err != nil {
@ -545,6 +555,14 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
}
func (proxier *Proxier) OnServiceSynced() {
klog.V(2).Infof("userspace OnServiceSynced")
// Mark services as initialized and (if endpoints are already
// initialized) the entire proxy as initialized
atomic.StoreInt32(&proxier.servicesSynced, 1)
if atomic.LoadInt32(&proxier.endpointsSynced) > 0 {
atomic.StoreInt32(&proxier.initialized, 1)
}
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
@ -560,7 +578,15 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
}
func (proxier *Proxier) OnEndpointsSynced() {
klog.V(2).Infof("userspace OnEndpointsSynced")
proxier.loadBalancer.OnEndpointsSynced()
// Mark endpoints as initialized and (if services are already
// initialized) the entire proxy as initialized
atomic.StoreInt32(&proxier.endpointsSynced, 1)
if atomic.LoadInt32(&proxier.servicesSynced) > 0 {
atomic.StoreInt32(&proxier.initialized, 1)
}
}
func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool {