From 57d35d5acbc2af278fb81e9941cf9c1cd339df8b Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 5 May 2017 10:56:03 +0200 Subject: [PATCH 1/2] Switch winuserspace proxy to be event based for services --- cmd/kube-proxy/app/server.go | 2 +- pkg/proxy/winuserspace/proxier.go | 187 +++++++++++++++---------- pkg/proxy/winuserspace/proxier_test.go | 131 +++++++++++------ 3 files changed, 198 insertions(+), 122 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 7f30878360..a5c1439077 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -455,7 +455,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceHandler = proxierUserspace + serviceEventHandler = proxierUserspace proxier = proxierUserspace } else { // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index b331530e76..8bce3d639f 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -315,86 +315,78 @@ func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[ return listenIPPortMap } -// OnServiceUpdate manages the active set of service proxies. -// Active service proxies are reinitialized if found in the update set or -// shutdown if missing from the update set. -func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { - glog.V(4).Infof("Received update notice: %+v", services) - activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set - for _, service := range services { - // if ClusterIP is "None" or empty, skip proxying - if !helper.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) - continue +func (proxier *Proxier) mergeService(service *api.Service) map[ServicePortPortalName]bool { + if service == nil { + return nil + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return nil + } + existingPortPortals := make(map[ServicePortPortalName]bool) + + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + // create a slice of all the source IPs to use for service port portals + listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) + protocol := servicePort.Protocol + + for listenIP, listenPort := range listenIPPortMap { + servicePortPortalName := ServicePortPortalName{ + NamespacedName: svcName, + Port: servicePort.Name, + PortalIPName: listenIP, + } + existingPortPortals[servicePortPortalName] = true + info, exists := proxier.getServiceInfo(servicePortPortalName) + if exists && sameConfig(info, service, protocol, listenPort) { + // Nothing changed. + continue + } + if exists { + glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName) + if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil { + glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err) + } + } + glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol) + info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout) + if err != nil { + glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err) + continue + } + info.sessionAffinityType = service.Spec.SessionAffinity + glog.V(10).Infof("info: %#v", info) } - - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - // create a slice of all the source IPs to use for service port portals - listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) - protocol := servicePort.Protocol - - for listenIP, listenPort := range listenIPPortMap { - servicePortPortalName := ServicePortPortalName{ - NamespacedName: types.NamespacedName{ - Namespace: service.Namespace, - Name: service.Name, - }, - Port: servicePort.Name, - PortalIPName: listenIP, - } - activeServicePortPortals[servicePortPortalName] = true - info, exists := proxier.getServiceInfo(servicePortPortalName) - if exists && sameConfig(info, service, protocol, listenPort) { - // Nothing changed. - continue - } - if exists { - glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName) - if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil { - glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err) - } - } - glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol) - info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout) - if err != nil { - glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err) - continue - } - info.sessionAffinityType = service.Spec.SessionAffinity - glog.V(10).Infof("info: %#v", info) - } - if len(listenIPPortMap) > 0 { - // only one loadbalancer per service port portal - servicePortName := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{ - Namespace: service.Namespace, - Name: service.Name, - }, - Port: servicePort.Name, - } - proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes) + if len(listenIPPortMap) > 0 { + // only one loadbalancer per service port portal + servicePortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: service.Namespace, + Name: service.Name, + }, + Port: servicePort.Name, } + proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes) } } - for name, info := range proxier.serviceMap { - if !activeServicePortPortals[name] { - glog.V(1).Infof("Stopping service %q", name) + return existingPortPortals +} - if err := proxier.closeServicePortPortal(name, info); err != nil { - glog.Errorf("Failed to close service port portal %q: %v", name, err) - } - } +func (proxier *Proxier) unmergeService(service *api.Service, existingPortPortals map[ServicePortPortalName]bool) { + if service == nil { + return + } + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if !helper.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + return } - proxier.mu.Lock() - defer proxier.mu.Unlock() - - // servicePortNameMap tracks all service port portals with the same name/port. - // A value of true means there is one or more service port portals with name/port pair. servicePortNameMap := make(map[proxy.ServicePortName]bool) - for name := range proxier.serviceMap { + for name := range existingPortPortals { servicePortName := proxy.ServicePortName{ NamespacedName: types.NamespacedName{ Namespace: name.Namespace, @@ -402,17 +394,60 @@ func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { }, Port: name.Port, } - servicePortNameMap[servicePortName] = servicePortNameMap[servicePortName] || activeServicePortPortals[name] + servicePortNameMap[servicePortName] = true } - // Only delete load balancer if all listen ips per name/port show inactive. - for name := range servicePortNameMap { - if !servicePortNameMap[name] { - proxier.loadBalancer.DeleteService(name) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} + // create a slice of all the source IPs to use for service port portals + listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) + + for listenIP := range listenIPPortMap { + servicePortPortalName := ServicePortPortalName{ + NamespacedName: svcName, + Port: servicePort.Name, + PortalIPName: listenIP, + } + if existingPortPortals[servicePortPortalName] { + continue + } + + glog.V(1).Infof("Stopping service %q", servicePortPortalName) + info, exists := proxier.getServiceInfo(servicePortPortalName) + if !exists { + glog.Errorf("Service %q is being removed but doesn't exist", servicePortPortalName) + continue + } + + if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil { + glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err) + } + } + + // Only delete load balancer if all listen ips per name/port show inactive. + if !servicePortNameMap[serviceName] { + proxier.loadBalancer.DeleteService(serviceName) } } } +func (proxier *Proxier) OnServiceAdd(service *api.Service) { + _ = proxier.mergeService(service) +} + +func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) { + existingPortPortals := proxier.mergeService(service) + proxier.unmergeService(oldService, existingPortPortals) +} + +func (proxier *Proxier) OnServiceDelete(service *api.Service) { + proxier.unmergeService(service, map[ServicePortPortalName]bool{}) +} + +func (proxier *Proxier) OnServiceSynced() { +} + func sameConfig(info *serviceInfo, service *api.Service, protocol api.Protocol, listenPort int) bool { return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity } diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 9334880a1d..fadc5e38bc 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -341,7 +341,7 @@ func TestMultiPortProxy(t *testing.T) { waitForNumProxyLoops(t, p, 2) } -func TestMultiPortOnServiceUpdate(t *testing.T) { +func TestMultiPortOnServiceAdd(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} @@ -354,7 +354,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { } waitForNumProxyLoops(t, p, 0) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{ Name: "p", @@ -365,7 +365,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { Port: 0, Protocol: "UDP", }}}, - }}) + }) waitForNumProxyLoops(t, p, 2) servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP} @@ -515,7 +515,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -552,7 +559,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -590,7 +604,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "TCP", + }}}, + }) if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -598,14 +619,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Protocol: "TCP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) @@ -645,7 +666,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { conn.Close() waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{}) + p.OnServiceDelete(&api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "UDP", + }}}, + }) if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) } @@ -653,14 +681,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { // need to add endpoint here because it got clean up during service delete lb.OnEndpointsAdd(endpoint) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Protocol: "UDP", }}}, - }}) + }) svcInfo, exists := p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) @@ -695,14 +723,14 @@ func TestTCPProxyUpdatePort(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: 0, Protocol: "TCP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) @@ -742,14 +770,14 @@ func TestUDPProxyUpdatePort(t *testing.T) { } waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: 0, Protocol: "UDP", }}}, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) @@ -788,7 +816,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ + p.OnServiceAdd(&api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -799,7 +827,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { ClusterIP: svcInfo.portal.ip, ExternalIPs: []string{"0.0.0.0"}, }, - }}) + }) // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { t.Fatalf(err.Error()) @@ -841,40 +869,53 @@ func TestProxyUpdatePortal(t *testing.T) { testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) waitForNumProxyLoops(t, p, 1) - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(svcInfo.portal.port), - Protocol: "TCP", - }}}, - }}) - _, exists := p.getServiceInfo(servicePortPortalName) - if exists { - t.Fatalf("service with empty ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ - Name: "p", - Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), - Protocol: "TCP", - }}}, - }}) - _, exists = p.getServiceInfo(servicePortPortalName) - if exists { - t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") - } - - p.OnServiceUpdate([]*api.Service{{ + svcv0 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Name: "p", Port: int32(svcInfo.portal.port), Protocol: "TCP", }}}, - }}) + } + + svcv1 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.portal.port), + Protocol: "TCP", + }}}, + } + + p.OnServiceUpdate(svcv0, svcv1) + _, exists := p.getServiceInfo(servicePortPortalName) + if exists { + t.Fatalf("service with empty ClusterIP should not be included in the proxy") + } + + svcv2 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv1, svcv2) + _, exists = p.getServiceInfo(servicePortPortalName) + if exists { + t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") + } + + svcv3 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.portal.port), + Protocol: "TCP", + }}}, + } + p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) svcInfo, exists = p.getServiceInfo(servicePortPortalName) if !exists { From ce752e3fc99928460f76c360e351b242d7ab4cb4 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 5 May 2017 11:45:32 +0200 Subject: [PATCH 2/2] Remove no-longer used code in proxy/config --- cmd/kube-proxy/app/server.go | 18 ++------- pkg/proxy/config/BUILD | 1 - pkg/proxy/config/config.go | 76 ------------------------------------ 3 files changed, 3 insertions(+), 92 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index a5c1439077..e7af2601b3 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -302,11 +302,8 @@ type ProxyServer struct { ResourceContainer string ConfigSyncPeriod time.Duration ServiceEventHandler proxyconfig.ServiceHandler - // TODO: Migrate all handlers to ServiceHandler types and - // get rid of this one. - ServiceHandler proxyconfig.ServiceConfigHandler - EndpointsEventHandler proxyconfig.EndpointsHandler - HealthzServer *healthcheck.HealthzServer + EndpointsEventHandler proxyconfig.EndpointsHandler + HealthzServer *healthcheck.HealthzServer } // createClients creates a kube client and an event client from the given config and masterOverride. @@ -397,9 +394,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx var proxier proxy.ProxyProvider var serviceEventHandler proxyconfig.ServiceHandler - // TODO: Migrate all handlers to ServiceHandler types and - // get rid of this one. - var serviceHandler proxyconfig.ServiceConfigHandler var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{}) @@ -517,7 +511,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, ServiceEventHandler: serviceEventHandler, - ServiceHandler: serviceHandler, EndpointsEventHandler: endpointsEventHandler, HealthzServer: healthzServer, }, nil @@ -621,12 +614,7 @@ func (s *ProxyServer) Run() error { // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod) - if s.ServiceHandler != nil { - serviceConfig.RegisterHandler(s.ServiceHandler) - } - if s.ServiceEventHandler != nil { - serviceConfig.RegisterEventHandler(s.ServiceEventHandler) - } + serviceConfig.RegisterEventHandler(s.ServiceEventHandler) go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod) diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index b68c78286d..8048f6e9d6 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -21,7 +21,6 @@ go_library( "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/controller:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 3e99d1f9fa..1c79fdf6f2 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -21,7 +21,6 @@ import ( "time" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -30,22 +29,6 @@ import ( "k8s.io/kubernetes/pkg/controller" ) -// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. -// DEPRECATED: Use ServiceHandler instead - this will be removed soon. -type ServiceConfigHandler interface { - // OnServiceUpdate gets called when a service is created, removed or changed - // on any of the configuration sources. An example is when a new service - // comes up. - // - // NOTE: For efficiency, services are being passed by reference, thus, - // OnServiceUpdate should NOT modify pointers of a given slice. - // Those service objects are shared with other layers of the system and - // are guaranteed to be immutable with the assumption that are also - // not mutated by those handlers. Make a deep copy if you need to modify - // them in your code. - OnServiceUpdate(services []*api.Service) -} - // ServiceHandler is an abstract interface of objects which receive // notifications about service object changes. type ServiceHandler interface { @@ -185,11 +168,6 @@ type ServiceConfig struct { lister listers.ServiceLister listerSynced cache.InformerSynced eventHandlers []ServiceHandler - // TODO: Remove as soon as we migrate everything to event handlers. - handlers []ServiceConfigHandler - // updates channel is used to trigger registered handlers - updates chan struct{} - stop chan struct{} } // NewServiceConfig creates a new ServiceConfig. @@ -197,12 +175,6 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio result := &ServiceConfig{ lister: serviceInformer.Lister(), listerSynced: serviceInformer.Informer().HasSynced, - // The updates channel is used to send interrupts to the Services handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates: make(chan struct{}, 1), - stop: make(chan struct{}), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -217,12 +189,6 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio return result } -// RegisterHandler registers a handler which is called on every services change. -// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon. -func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { - c.handlers = append(c.handlers, handler) -} - // RegisterEventHandler registers a handler which is called on every service change. func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) { c.eventHandlers = append(c.eventHandlers, handler) @@ -240,40 +206,12 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { return } - // We have synced informers. Now we can start delivering updates - // to the registered handler. for i := range c.eventHandlers { glog.V(3).Infof("Calling handler.OnServiceSynced()") c.eventHandlers[i].OnServiceSynced() } - go func() { - defer utilruntime.HandleCrash() - for { - select { - case <-c.updates: - services, err := c.lister.List(labels.Everything()) - if err != nil { - glog.Errorf("Error while listing services from cache: %v", err) - // This will cause a retry (if there isnt' any other trigger in-flight). - c.dispatchUpdate() - continue - } - if services == nil { - services = []*api.Service{} - } - for i := range c.handlers { - glog.V(3).Infof("Calling handler.OnServiceUpdate()") - c.handlers[i].OnServiceUpdate(services) - } - case <-c.stop: - return - } - } - }() - // Close updates channel when stopCh is closed. <-stopCh - close(c.stop) } func (c *ServiceConfig) handleAddService(obj interface{}) { @@ -286,7 +224,6 @@ func (c *ServiceConfig) handleAddService(obj interface{}) { glog.V(4).Infof("Calling handler.OnServiceAdd") c.eventHandlers[i].OnServiceAdd(service) } - c.dispatchUpdate() } func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) { @@ -304,7 +241,6 @@ func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) { glog.V(4).Infof("Calling handler.OnServiceUpdate") c.eventHandlers[i].OnServiceUpdate(oldService, service) } - c.dispatchUpdate() } func (c *ServiceConfig) handleDeleteService(obj interface{}) { @@ -324,16 +260,4 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) { glog.V(4).Infof("Calling handler.OnServiceDelete") c.eventHandlers[i].OnServiceDelete(service) } - c.dispatchUpdate() -} - -func (c *ServiceConfig) dispatchUpdate() { - select { - case c.updates <- struct{}{}: - // Work enqueued successfully - case <-c.stop: - // We're shut down / avoid logging the message below - default: - glog.V(4).Infof("Service handler already has a pending interrupt.") - } }