diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 07d4688f20..6fc974fbde 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -77,6 +77,14 @@ func NewLoadBalancerRR() *LoadBalancerRR { } func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { + lb.lock.Lock() + defer lb.lock.Unlock() + + lb.newServiceInternal(service, affinityType, ttlMinutes) + return nil +} + +func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) { if ttlMinutes == 0 { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } @@ -84,7 +92,6 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) } - return nil } // return true if this service is using some form of session affinity. @@ -177,8 +184,9 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, en } } -//Loop through the valid endpoints and then the endpoints associated with the Load Balancer. -// Then remove any session affinity records that are not in both lists. +// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +// This assumes the lb.lock is held. func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} for _, validEndpoint := range newEndpoints { @@ -213,7 +221,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { // On update can be called without NewService being called externally. // to be safe we will call it here. A new service will only be created // if one does not already exist. - lb.NewService(svcEndpoints.Name, api.AffinityTypeNone, 0) + lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. @@ -232,6 +240,9 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { } func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { + lb.lock.Lock() + defer lb.lock.Unlock() + ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes {