kube-proxy: OnServiceUpdate takes pointers

This signature is more consistent with OnEndpointsUpdate and removes a
copy loop.  This is part on ongoing cleanup to rate-limit iptables
calls.
pull/6/head
Tim Hockin 2017-03-31 21:48:39 -07:00
parent a8e552832d
commit adf30aa2e1
13 changed files with 97 additions and 98 deletions

View File

@ -218,6 +218,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname}) recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})
var proxier proxy.ProxyProvider var proxier proxy.ProxyProvider
var servicesHandler proxyconfig.ServiceConfigHandler
var endpointsHandler proxyconfig.EndpointsConfigHandler var endpointsHandler proxyconfig.EndpointsConfigHandler
proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
@ -244,22 +245,20 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
proxier = proxierIPTables proxier = proxierIPTables
servicesHandler = proxierIPTables
endpointsHandler = proxierIPTables endpointsHandler = proxierIPTables
// No turning back. Remove artifacts that might still exist from the userspace Proxier. // No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.") glog.V(0).Info("Tearing down userspace rules.")
userspace.CleanupLeftovers(iptInterface) userspace.CleanupLeftovers(iptInterface)
} else { } else {
glog.V(0).Info("Using userspace Proxier.") glog.V(0).Info("Using userspace Proxier.")
var proxierUserspace proxy.ProxyProvider
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler. // our config.EndpointsConfigHandler.
loadBalancer := winuserspace.NewLoadBalancerRR() loadBalancer := winuserspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer // set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer endpointsHandler = loadBalancer
proxierUserspace, err = winuserspace.NewProxier( proxierUserspace, err := winuserspace.NewProxier(
loadBalancer, loadBalancer,
net.ParseIP(config.BindAddress), net.ParseIP(config.BindAddress),
netshInterface, netshInterface,
@ -268,13 +267,18 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
config.IPTablesSyncPeriod.Duration, config.IPTablesSyncPeriod.Duration,
config.UDPIdleTimeout.Duration, config.UDPIdleTimeout.Duration,
) )
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
servicesHandler = proxierUserspace
proxier = proxierUserspace
} else { } else {
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler. // our config.EndpointsConfigHandler.
loadBalancer := userspace.NewLoadBalancerRR() loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer // set EndpointsConfigHandler to our loadBalancer
endpointsHandler = loadBalancer endpointsHandler = loadBalancer
proxierUserspace, err = userspace.NewProxier( proxierUserspace, err := userspace.NewProxier(
loadBalancer, loadBalancer,
net.ParseIP(config.BindAddress), net.ParseIP(config.BindAddress),
iptInterface, iptInterface,
@ -284,11 +288,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
config.IPTablesMinSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration,
config.UDPIdleTimeout.Duration, config.UDPIdleTimeout.Duration,
) )
}
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
servicesHandler = proxierUserspace
proxier = proxierUserspace proxier = proxierUserspace
}
// Remove artifacts from the pure-iptables Proxier, if not on Windows. // Remove artifacts from the pure-iptables Proxier, if not on Windows.
if runtime.GOOS != "windows" { if runtime.GOOS != "windows" {
glog.V(0).Info("Tearing down pure-iptables proxy rules.") glog.V(0).Info("Tearing down pure-iptables proxy rules.")
@ -306,7 +311,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// only notify on changes, and the initial update (on process start) may be lost if no handlers // only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet. // are registered yet.
serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
serviceConfig.RegisterHandler(proxier) serviceConfig.RegisterHandler(servicesHandler)
go serviceConfig.Run(wait.NeverStop) go serviceConfig.Run(wait.NeverStop)
endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)

View File

@ -39,12 +39,12 @@ type HollowProxy struct {
type FakeProxyHandler struct{} type FakeProxyHandler struct{}
func (*FakeProxyHandler) OnServiceUpdate(services []api.Service) {} func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {}
func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {} func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {}
type FakeProxier struct{} type FakeProxier struct{}
func (*FakeProxier) OnServiceUpdate(services []api.Service) {} func (*FakeProxier) OnServiceUpdate(services []*api.Service) {}
func (*FakeProxier) Sync() {} func (*FakeProxier) Sync() {}
func (*FakeProxier) SyncLoop() { func (*FakeProxier) SyncLoop() {
select {} select {}

View File

@ -14,10 +14,7 @@ go_library(
"types.go", "types.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = ["//vendor:k8s.io/apimachinery/pkg/types"],
"//pkg/api:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/types",
],
) )
filegroup( filegroup(

View File

@ -74,27 +74,27 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
go serviceConfig.Run(stopCh) go serviceConfig.Run(stopCh)
// Add the first service // Add the first service
handler.expected = []api.Service{*service1v1} handler.expected = []*api.Service{service1v1}
fakeWatch.Add(service1v1) fakeWatch.Add(service1v1)
<-ch <-ch
// Add another service // Add another service
handler.expected = []api.Service{*service1v1, *service2} handler.expected = []*api.Service{service1v1, service2}
fakeWatch.Add(service2) fakeWatch.Add(service2)
<-ch <-ch
// Modify service1 // Modify service1
handler.expected = []api.Service{*service1v2, *service2} handler.expected = []*api.Service{service1v2, service2}
fakeWatch.Modify(service1v2) fakeWatch.Modify(service1v2)
<-ch <-ch
// Delete service1 // Delete service1
handler.expected = []api.Service{*service2} handler.expected = []*api.Service{service2}
fakeWatch.Delete(service1v2) fakeWatch.Delete(service1v2)
<-ch <-ch
// Delete service2 // Delete service2
handler.expected = []api.Service{} handler.expected = []*api.Service{}
fakeWatch.Delete(service2) fakeWatch.Delete(service2)
<-ch <-ch
} }
@ -174,15 +174,15 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
type svcHandler struct { type svcHandler struct {
t *testing.T t *testing.T
expected []api.Service expected []*api.Service
done func() done func()
} }
func newSvcHandler(t *testing.T, svcs []api.Service, done func()) *svcHandler { func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler {
return &svcHandler{t: t, expected: svcs, done: done} return &svcHandler{t: t, expected: svcs, done: done}
} }
func (s *svcHandler) OnServiceUpdate(services []api.Service) { func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
defer s.done() defer s.done()
sort.Sort(sortedServices(services)) sort.Sort(sortedServices(services))
if !reflect.DeepEqual(s.expected, services) { if !reflect.DeepEqual(s.expected, services) {
@ -242,7 +242,7 @@ func TestInitialSync(t *testing.T) {
svcConfig := newServiceConfig(svcLW, time.Minute) svcConfig := newServiceConfig(svcLW, time.Minute)
epsConfig := newEndpointsConfig(epsLW, time.Minute) epsConfig := newEndpointsConfig(epsLW, time.Minute)
svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler) svcConfig.RegisterHandler(svcHandler)
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
epsConfig.RegisterHandler(epsHandler) epsConfig.RegisterHandler(epsHandler)

View File

@ -33,9 +33,17 @@ import (
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
type ServiceConfigHandler interface { type ServiceConfigHandler interface {
// OnServiceUpdate gets called when a configuration has been changed by one of the sources. // OnServiceUpdate gets called when a service is created, removed or changed
// This is the union of all the configuration sources. // on any of the configuration sources. An example is when a new service
OnServiceUpdate(services []api.Service) // comes up.
//
// NOTE: For efficiency, services are being passed by reference, thus,
// OnServiceUpdate should NOT modify pointers of a given slice.
// Those service objects are shared with other layers of the system and
// are guaranteed to be immutable with the assumption that are also
// not mutated by those handlers. Make a deep copy if you need to modify
// them in your code.
OnServiceUpdate(services []*api.Service)
} }
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. // EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
@ -208,24 +216,23 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
return return
} }
// We hanve synced informers. Now we can start delivering updates // We have synced informers. Now we can start delivering updates
// to the registered handler. // to the registered handler.
go func() { go func() {
for range c.updates { for range c.updates {
services, err := c.lister.List(labels.Everything()) services, err := c.lister.List(labels.Everything())
if err != nil { if err != nil {
glog.Errorf("Error while listing services from cache: %v", err) glog.Errorf("Error while listing services from cache: %v", err)
// This will cause a retry (if there isnt' any other trigger in-flight). // This will cause a retry (if there isn't any other trigger in-flight).
c.dispatchUpdate() c.dispatchUpdate()
continue continue
} }
svcs := make([]api.Service, 0, len(services)) if services == nil {
for i := range services { services = []*api.Service{}
svcs = append(svcs, *services[i])
} }
for i := range c.handlers { for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnServiceUpdate()") glog.V(3).Infof("Calling handler.OnServiceUpdate()")
c.handlers[i].OnServiceUpdate(svcs) c.handlers[i].OnServiceUpdate(services)
} }
} }
}() }()

View File

@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
type sortedServices []api.Service type sortedServices []*api.Service
func (s sortedServices) Len() int { func (s sortedServices) Len() int {
return len(s) return len(s)
@ -41,24 +41,24 @@ func (s sortedServices) Less(i, j int) bool {
} }
type ServiceHandlerMock struct { type ServiceHandlerMock struct {
updated chan []api.Service updated chan []*api.Service
waits int waits int
} }
func NewServiceHandlerMock() *ServiceHandlerMock { func NewServiceHandlerMock() *ServiceHandlerMock {
return &ServiceHandlerMock{updated: make(chan []api.Service, 5)} return &ServiceHandlerMock{updated: make(chan []*api.Service, 5)}
} }
func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) { func (h *ServiceHandlerMock) OnServiceUpdate(services []*api.Service) {
sort.Sort(sortedServices(services)) sort.Sort(sortedServices(services))
h.updated <- services h.updated <- services
} }
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) { func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) {
// We might get 1 or more updates for N service updates, because we // We might get 1 or more updates for N service updates, because we
// over write older snapshots of services from the producer go-routine // over write older snapshots of services from the producer go-routine
// if the consumer falls behind. // if the consumer falls behind.
var services []api.Service var services []*api.Service
for { for {
select { select {
case services = <-h.updated: case services = <-h.updated:
@ -139,7 +139,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
} }
fakeWatch.Add(service) fakeWatch.Add(service)
handler.ValidateServices(t, []api.Service{*service}) handler.ValidateServices(t, []*api.Service{service})
} }
func TestServiceAddedRemovedSetAndNotified(t *testing.T) { func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
@ -161,18 +161,18 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
} }
fakeWatch.Add(service1) fakeWatch.Add(service1)
handler.ValidateServices(t, []api.Service{*service1}) handler.ValidateServices(t, []*api.Service{service1})
service2 := &api.Service{ service2 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
} }
fakeWatch.Add(service2) fakeWatch.Add(service2)
services := []api.Service{*service2, *service1} services := []*api.Service{service2, service1}
handler.ValidateServices(t, services) handler.ValidateServices(t, services)
fakeWatch.Delete(service1) fakeWatch.Delete(service1)
services = []api.Service{*service2} services = []*api.Service{service2}
handler.ValidateServices(t, services) handler.ValidateServices(t, services)
} }
@ -203,7 +203,7 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
fakeWatch.Add(service1) fakeWatch.Add(service1)
fakeWatch.Add(service2) fakeWatch.Add(service2)
services := []api.Service{*service2, *service1} services := []*api.Service{service2, service1}
handler.ValidateServices(t, services) handler.ValidateServices(t, services)
handler2.ValidateServices(t, services) handler2.ValidateServices(t, services)
} }

View File

@ -457,13 +457,12 @@ type healthCheckPort struct {
// Accepts a list of Services and the existing service map. Returns the new // Accepts a list of Services and the existing service map. Returns the new
// service map, a list of healthcheck ports to add to or remove from the health // service map, a list of healthcheck ports to add to or remove from the health
// checking listener service, and a set of stale UDP services. // checking listener service, and a set of stale UDP services.
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { func buildServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
newServiceMap := make(proxyServiceMap) newServiceMap := make(proxyServiceMap)
healthCheckAdd := make([]healthCheckPort, 0) healthCheckAdd := make([]healthCheckPort, 0)
healthCheckDel := make([]healthCheckPort, 0) healthCheckDel := make([]healthCheckPort, 0)
for i := range allServices { for _, service := range allServices {
service := &allServices[i]
svcName := types.NamespacedName{ svcName := types.NamespacedName{
Namespace: service.Namespace, Namespace: service.Namespace,
Name: service.Name, Name: service.Name,
@ -529,7 +528,7 @@ func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (
// OnServiceUpdate tracks the active set of service proxies. // OnServiceUpdate tracks the active set of service proxies.
// They will be synchronized using syncProxyRules() // They will be synchronized using syncProxyRules()
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
start := time.Now() start := time.Now()
defer func() { defer func() {
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices)) glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))

View File

@ -858,8 +858,8 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
} }
} }
func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Service { func makeTestService(namespace, name string, svcFunc func(*api.Service)) *api.Service {
svc := api.Service{ svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
Namespace: namespace, Namespace: namespace,
@ -867,7 +867,7 @@ func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Ser
Spec: api.ServiceSpec{}, Spec: api.ServiceSpec{},
Status: api.ServiceStatus{}, Status: api.ServiceStatus{},
} }
svcFunc(&svc) svcFunc(svc)
return svc return svc
} }
@ -883,7 +883,7 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po
} }
func TestBuildServiceMapAddRemove(t *testing.T) { func TestBuildServiceMapAddRemove(t *testing.T) {
services := []api.Service{ services := []*api.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.ClusterIP = "172.16.55.4"
@ -959,7 +959,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
} }
// Remove some stuff // Remove some stuff
services = []api.Service{services[0]} services = []*api.Service{services[0]}
services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]}
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap) serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap)
if len(serviceMap) != 1 { if len(serviceMap) != 1 {
@ -999,7 +999,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
} }
func TestBuildServiceMapServiceHeadless(t *testing.T) { func TestBuildServiceMapServiceHeadless(t *testing.T) {
services := []api.Service{ services := []*api.Service{
makeTestService("somewhere-else", "headless", func(svc *api.Service) { makeTestService("somewhere-else", "headless", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = api.ClusterIPNone svc.Spec.ClusterIP = api.ClusterIPNone
@ -1027,7 +1027,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
} }
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
services := []api.Service{ services := []*api.Service{
makeTestService("somewhere-else", "external-name", func(svc *api.Service) { makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeExternalName svc.Spec.Type = api.ServiceTypeExternalName
svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
@ -1053,7 +1053,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
} }
func TestBuildServiceMapServiceUpdate(t *testing.T) { func TestBuildServiceMapServiceUpdate(t *testing.T) {
first := []api.Service{ first := []*api.Service{
makeTestService("somewhere", "some-service", func(svc *api.Service) { makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.ClusterIP = "172.16.55.4"
@ -1062,7 +1062,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
}), }),
} }
second := []api.Service{ second := []*api.Service{
makeTestService("somewhere", "some-service", func(svc *api.Service) { makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.ObjectMeta.Annotations = map[string]string{ svc.ObjectMeta.Annotations = map[string]string{
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,

View File

@ -20,15 +20,10 @@ import (
"fmt" "fmt"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api"
) )
// ProxyProvider is the interface provided by proxier implementations. // ProxyProvider is the interface provided by proxier implementations.
type ProxyProvider interface { type ProxyProvider interface {
// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// removed if missing from the update set.
OnServiceUpdate(services []api.Service)
// Sync immediately synchronizes the ProxyProvider's current state to iptables. // Sync immediately synchronizes the ProxyProvider's current state to iptables.
Sync() Sync()
// SyncLoop runs periodic work. // SyncLoop runs periodic work.

View File

@ -400,12 +400,10 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceR
// OnServiceUpdate manages the active set of service proxies. // OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or // Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set. // shutdown if missing from the update set.
func (proxier *Proxier) OnServiceUpdate(services []api.Service) { func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
glog.V(4).Infof("Received update notice: %+v", services) glog.V(4).Infof("Received update notice: %+v", services)
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for i := range services { for _, service := range services {
service := &services[i]
// if ClusterIP is "None" or empty, skip proxying // if ClusterIP is "None" or empty, skip proxying
if !api.IsServiceIPSet(service) { if !api.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)

View File

@ -350,7 +350,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -515,7 +515,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -555,7 +555,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -594,7 +594,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -602,7 +602,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -650,7 +650,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -658,7 +658,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -703,7 +703,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -753,7 +753,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -802,7 +802,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Ports: []api.ServicePort{{ Ports: []api.ServicePort{{
@ -856,7 +856,7 @@ func TestProxyUpdatePortal(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -869,7 +869,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with empty ClusterIP should not be included in the proxy") t.Fatalf("service with empty ClusterIP should not be included in the proxy")
} }
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -882,7 +882,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
} }
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p", Name: "p",

View File

@ -317,12 +317,10 @@ func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[
// OnServiceUpdate manages the active set of service proxies. // OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or // Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set. // shutdown if missing from the update set.
func (proxier *Proxier) OnServiceUpdate(services []api.Service) { func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
glog.V(4).Infof("Received update notice: %+v", services) glog.V(4).Infof("Received update notice: %+v", services)
activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set
for i := range services { for _, service := range services {
service := &services[i]
// if ClusterIP is "None" or empty, skip proxying // if ClusterIP is "None" or empty, skip proxying
if !api.IsServiceIPSet(service) { if !api.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)

View File

@ -359,7 +359,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -526,7 +526,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -565,7 +565,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -603,7 +603,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -611,7 +611,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -658,7 +658,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{}) p.OnServiceUpdate([]*api.Service{})
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -666,7 +666,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) lb.OnEndpointsUpdate([]*api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -710,7 +710,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -759,7 +759,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -807,7 +807,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Ports: []api.ServicePort{{ Ports: []api.ServicePort{{
@ -860,7 +860,7 @@ func TestProxyUpdatePortal(t *testing.T) {
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -873,7 +873,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with empty ClusterIP should not be included in the proxy") t.Fatalf("service with empty ClusterIP should not be included in the proxy")
} }
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -886,7 +886,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
} }
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",