kube-proxy applies latest snapshot of endpoints and services.

pull/6/head
Prashanth Balasubramanian 2016-02-01 19:02:23 -08:00
parent 210bac10c9
commit e1fa6e9fb8
3 changed files with 63 additions and 28 deletions

View File

@ -128,19 +128,19 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
update := change.(EndpointsUpdate) update := change.(EndpointsUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints)) glog.V(5).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints))
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
endpoints[name] = value endpoints[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing an endpoint %s", spew.Sdump(update)) glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update))
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(endpoints, name) delete(endpoints, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting endpoints %s", spew.Sdump(update)) glog.V(5).Infof("Setting endpoints %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
endpoints = make(map[types.NamespacedName]api.Endpoints) endpoints = make(map[types.NamespacedName]api.Endpoints)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
@ -153,7 +153,13 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpoints[source] = endpoints s.endpoints[source] = endpoints
s.endpointLock.Unlock() s.endpointLock.Unlock()
if s.updates != nil { if s.updates != nil {
s.updates <- struct{}{} // Since we record the snapshot before sending this signal, it's
// possible that the consumer ends up performing an extra update.
select {
case s.updates <- struct{}{}:
default:
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
}
} }
return nil return nil
} }
@ -227,19 +233,19 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
update := change.(ServiceUpdate) update := change.(ServiceUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services)) glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))
for _, value := range update.Services { for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
services[name] = value services[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing a service %s", spew.Sdump(update)) glog.V(5).Infof("Removing a service %s", spew.Sdump(update))
for _, value := range update.Services { for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(services, name) delete(services, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting services %s", spew.Sdump(update)) glog.V(5).Infof("Setting services %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
services = make(map[types.NamespacedName]api.Service) services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services { for _, value := range update.Services {
@ -252,7 +258,13 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
s.services[source] = services s.services[source] = services
s.serviceLock.Unlock() s.serviceLock.Unlock()
if s.updates != nil { if s.updates != nil {
s.updates <- struct{}{} // Since we record the snapshot before sending this signal, it's
// possible that the consumer ends up performing an extra update.
select {
case s.updates <- struct{}{}:
default:
glog.V(4).Infof("Service handler already has a pending interrupt.")
}
} }
return nil return nil
} }

View File

@ -19,7 +19,6 @@ package config_test
import ( import (
"reflect" "reflect"
"sort" "sort"
"sync"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -49,29 +48,35 @@ func (s sortedServices) Less(i, j int) bool {
} }
type ServiceHandlerMock struct { type ServiceHandlerMock struct {
services []api.Service updated chan []api.Service
updated sync.WaitGroup waits int
} }
func NewServiceHandlerMock() *ServiceHandlerMock { func NewServiceHandlerMock() *ServiceHandlerMock {
return &ServiceHandlerMock{services: make([]api.Service, 0)} return &ServiceHandlerMock{updated: make(chan []api.Service, 5)}
} }
func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) { func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) {
sort.Sort(sortedServices(services)) sort.Sort(sortedServices(services))
h.services = services h.updated <- services
h.updated.Done()
} }
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) { func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) {
h.updated.Wait() // We might get 1 or more updates for N service updates, because we
if !reflect.DeepEqual(h.services, expectedServices) { // over write older snapshots of services from the producer go-routine
t.Errorf("Expected %#v, Got %#v", expectedServices, h.services) // if the consumer falls behind. Unittests will hard timeout in 5m.
var services []api.Service
for ; h.waits > 0; h.waits = h.waits - 1 {
services = <-h.updated
if reflect.DeepEqual(services, expectedServices) {
return
}
} }
t.Errorf("Expected %#v, Got %#v", expectedServices, services)
} }
func (h *ServiceHandlerMock) Wait(waits int) { func (h *ServiceHandlerMock) Wait(waits int) {
h.updated.Add(waits) h.waits = waits
} }
type sortedEndpoints []api.Endpoints type sortedEndpoints []api.Endpoints
@ -87,29 +92,35 @@ func (s sortedEndpoints) Less(i, j int) bool {
} }
type EndpointsHandlerMock struct { type EndpointsHandlerMock struct {
endpoints []api.Endpoints updated chan []api.Endpoints
updated sync.WaitGroup waits int
} }
func NewEndpointsHandlerMock() *EndpointsHandlerMock { func NewEndpointsHandlerMock() *EndpointsHandlerMock {
return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)} return &EndpointsHandlerMock{updated: make(chan []api.Endpoints, 5)}
} }
func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) { func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) {
sort.Sort(sortedEndpoints(endpoints)) sort.Sort(sortedEndpoints(endpoints))
h.endpoints = endpoints h.updated <- endpoints
h.updated.Done()
} }
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) { func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) {
h.updated.Wait() // We might get 1 or more updates for N endpoint updates, because we
if !reflect.DeepEqual(h.endpoints, expectedEndpoints) { // over write older snapshots of endpoints from the producer go-routine
t.Errorf("Expected %#v, Got %#v", expectedEndpoints, h.endpoints) // if the consumer falls behind. Unittests will hard timeout in 5m.
var endpoints []api.Endpoints
for ; h.waits > 0; h.waits = h.waits - 1 {
endpoints := <-h.updated
if reflect.DeepEqual(endpoints, expectedEndpoints) {
return
}
} }
t.Errorf("Expected %#v, Got %#v", expectedEndpoints, endpoints)
} }
func (h *EndpointsHandlerMock) Wait(waits int) { func (h *EndpointsHandlerMock) Wait(waits int) {
h.updated.Add(waits) h.waits = waits
} }
func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate { func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate {

View File

@ -279,6 +279,10 @@ func (proxier *Proxier) SyncLoop() {
// OnServiceUpdate tracks the active set of service proxies. // OnServiceUpdate tracks the active set of service proxies.
// They will be synchronized using syncProxyRules() // They will be synchronized using syncProxyRules()
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
start := time.Now()
defer func() {
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
}()
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedServiceUpdate = true proxier.haveReceivedServiceUpdate = true
@ -316,7 +320,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
glog.V(3).Infof("Something changed for service %q: removing it", serviceName) glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
delete(proxier.serviceMap, serviceName) delete(proxier.serviceMap, serviceName)
} }
serviceIP := net.ParseIP(service.Spec.ClusterIP) serviceIP := net.ParseIP(service.Spec.ClusterIP)
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info = newServiceInfo(serviceName) info = newServiceInfo(serviceName)
@ -347,6 +350,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
// OnEndpointsUpdate takes in a slice of updated endpoints. // OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
start := time.Now()
defer func() {
glog.V(4).Infof("OnEndpointsUpdate took %v for %d endpoints", time.Since(start), len(allEndpoints))
}()
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true proxier.haveReceivedEndpointsUpdate = true
@ -451,6 +459,10 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp
// The only other iptables rules are those that are setup in iptablesInit() // The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held // assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() { func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints // don't sync rules till we've received services and endpoints
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")