mirror of https://github.com/k3s-io/k3s
Fix unprotected shared state in load balancer
parent
a2e2ae1526
commit
743888739d
|
@ -77,6 +77,14 @@ func NewLoadBalancerRR() *LoadBalancerRR {
|
|||
}
|
||||
|
||||
func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error {
|
||||
lb.lock.Lock()
|
||||
defer lb.lock.Unlock()
|
||||
|
||||
lb.newServiceInternal(service, affinityType, ttlMinutes)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) {
|
||||
if ttlMinutes == 0 {
|
||||
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
|
||||
}
|
||||
|
@ -84,7 +92,6 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy
|
|||
lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes)
|
||||
glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// return true if this service is using some form of session affinity.
|
||||
|
@ -177,8 +184,9 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, en
|
|||
}
|
||||
}
|
||||
|
||||
//Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
|
||||
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
|
||||
// Then remove any session affinity records that are not in both lists.
|
||||
// This assumes the lb.lock is held.
|
||||
func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) {
|
||||
allEndpoints := map[string]int{}
|
||||
for _, validEndpoint := range newEndpoints {
|
||||
|
@ -213,7 +221,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
|
|||
// On update can be called without NewService being called externally.
|
||||
// to be safe we will call it here. A new service will only be created
|
||||
// if one does not already exist.
|
||||
lb.NewService(svcEndpoints.Name, api.AffinityTypeNone, 0)
|
||||
lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0)
|
||||
lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints)
|
||||
|
||||
// Reset the round-robin index.
|
||||
|
@ -232,6 +240,9 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
|
|||
}
|
||||
|
||||
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) {
|
||||
lb.lock.Lock()
|
||||
defer lb.lock.Unlock()
|
||||
|
||||
ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes
|
||||
for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap {
|
||||
if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes {
|
||||
|
|
Loading…
Reference in New Issue