mirror of https://github.com/k3s-io/k3s
Merge pull request #8846 from thockin/session-affinity
Fix session affinity in kube-proxypull/6/head
commit
d88444cace
|
@ -96,6 +96,8 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort ServicePortName, affinityTy
|
||||||
if _, exists := lb.services[svcPort]; !exists {
|
if _, exists := lb.services[svcPort]; !exists {
|
||||||
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
|
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
|
||||||
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort)
|
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort)
|
||||||
|
} else if affinityType != "" {
|
||||||
|
lb.services[svcPort].affinity.affinityType = affinityType
|
||||||
}
|
}
|
||||||
return lb.services[svcPort]
|
return lb.services[svcPort]
|
||||||
}
|
}
|
||||||
|
@ -261,8 +263,9 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
|
||||||
lb.updateAffinityMap(svcPort, newEndpoints)
|
lb.updateAffinityMap(svcPort, newEndpoints)
|
||||||
// OnUpdate can be called without NewService being called externally.
|
// OnUpdate can be called without NewService being called externally.
|
||||||
// To be safe we will call it here. A new service will only be created
|
// To be safe we will call it here. A new service will only be created
|
||||||
// if one does not already exist.
|
// if one does not already exist. The affinity will be updated
|
||||||
state = lb.newServiceInternal(svcPort, api.ServiceAffinityNone, 0)
|
// later, once NewService is called.
|
||||||
|
state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0)
|
||||||
state.endpoints = slice.ShuffleStrings(newEndpoints)
|
state.endpoints = slice.ShuffleStrings(newEndpoints)
|
||||||
|
|
||||||
// Reset the round-robin index.
|
// Reset the round-robin index.
|
||||||
|
|
|
@ -342,32 +342,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
|
||||||
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
|
expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
|
func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) {
|
||||||
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
|
|
||||||
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
|
|
||||||
loadBalancer := NewLoadBalancerRR()
|
|
||||||
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
|
|
||||||
endpoint, err := loadBalancer.NextEndpoint(service, nil)
|
|
||||||
if err == nil || len(endpoint) != 0 {
|
|
||||||
t.Errorf("Didn't fail with non-existent service")
|
|
||||||
}
|
|
||||||
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
|
|
||||||
endpoints := make([]api.Endpoints, 1)
|
|
||||||
endpoints[0] = api.Endpoints{
|
|
||||||
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
|
||||||
Subsets: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "endpoint"}}, Ports: []api.EndpointPort{{Port: 1}}}},
|
|
||||||
}
|
|
||||||
loadBalancer.OnUpdate(endpoints)
|
|
||||||
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
|
|
||||||
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
|
|
||||||
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
|
|
||||||
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
|
|
||||||
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
|
|
||||||
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
|
|
||||||
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
|
|
||||||
loadBalancer := NewLoadBalancerRR()
|
loadBalancer := NewLoadBalancerRR()
|
||||||
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
|
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
|
||||||
endpoint, err := loadBalancer.NextEndpoint(service, nil)
|
endpoint, err := loadBalancer.NextEndpoint(service, nil)
|
||||||
|
@ -375,33 +350,56 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
|
||||||
t.Errorf("Didn't fail with non-existent service")
|
t.Errorf("Didn't fail with non-existent service")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Call NewService() before OnUpdate()
|
||||||
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
|
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
|
||||||
endpoints := make([]api.Endpoints, 1)
|
endpoints := make([]api.Endpoints, 1)
|
||||||
endpoints[0] = api.Endpoints{
|
endpoints[0] = api.Endpoints{
|
||||||
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||||
Subsets: []api.EndpointSubset{
|
Subsets: []api.EndpointSubset{
|
||||||
{
|
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
|
||||||
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
|
{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
|
||||||
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
|
{Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}},
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
loadBalancer.OnUpdate(endpoints)
|
loadBalancer.OnUpdate(endpoints)
|
||||||
shuffledEndpoints := loadBalancer.services[service].endpoints
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
|
|
||||||
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
|
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
|
||||||
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
|
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
|
||||||
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
|
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
|
||||||
|
|
||||||
|
ep1, err := loadBalancer.NextEndpoint(service, client1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Didn't find a service for %s: %v", service, err)
|
||||||
|
}
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
|
||||||
|
ep2, err := loadBalancer.NextEndpoint(service, client2)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Didn't find a service for %s: %v", service, err)
|
||||||
|
}
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
|
||||||
|
ep3, err := loadBalancer.NextEndpoint(service, client3)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Didn't find a service for %s: %v", service, err)
|
||||||
|
}
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) {
|
||||||
loadBalancer := NewLoadBalancerRR()
|
loadBalancer := NewLoadBalancerRR()
|
||||||
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
|
service := ServicePortName{types.NamespacedName{"testnamespace", "foo"}, ""}
|
||||||
endpoint, err := loadBalancer.NextEndpoint(service, nil)
|
endpoint, err := loadBalancer.NextEndpoint(service, nil)
|
||||||
|
@ -409,28 +407,52 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
|
||||||
t.Errorf("Didn't fail with non-existent service")
|
t.Errorf("Didn't fail with non-existent service")
|
||||||
}
|
}
|
||||||
|
|
||||||
loadBalancer.NewService(service, api.ServiceAffinityNone, 0)
|
// Call OnUpdate() before NewService()
|
||||||
endpoints := make([]api.Endpoints, 1)
|
endpoints := make([]api.Endpoints, 1)
|
||||||
endpoints[0] = api.Endpoints{
|
endpoints[0] = api.Endpoints{
|
||||||
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||||
Subsets: []api.EndpointSubset{
|
Subsets: []api.EndpointSubset{
|
||||||
{
|
{Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},
|
||||||
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
|
{Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}},
|
||||||
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
loadBalancer.OnUpdate(endpoints)
|
loadBalancer.OnUpdate(endpoints)
|
||||||
|
loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0)
|
||||||
|
|
||||||
shuffledEndpoints := loadBalancer.services[service].endpoints
|
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
|
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
|
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client2)
|
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client2)
|
ep1, err := loadBalancer.NextEndpoint(service, client1)
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client3)
|
if err != nil {
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
|
t.Errorf("Didn't find a service for %s: %v", service, err)
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
|
}
|
||||||
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
|
||||||
|
ep2, err := loadBalancer.NextEndpoint(service, client2)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Didn't find a service for %s: %v", service, err)
|
||||||
|
}
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
|
||||||
|
ep3, err := loadBalancer.NextEndpoint(service, client3)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Didn't find a service for %s: %v", service, err)
|
||||||
|
}
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep1, client1)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep2, client2)
|
||||||
|
expectEndpoint(t, loadBalancer, service, ep3, client3)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
|
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue