diff --git a/pkg/master/controller.go b/pkg/master/controller.go index a6a5ae814a..328ba01a03 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -86,7 +86,8 @@ func (c *Controller) Start() { // If we fail to repair node ports apiserver is useless. We should restart and retry. glog.Fatalf("Unable to perform initial service nodePort check: %v", err) } - if err := c.UpdateKubernetesService(); err != nil { + // Service definition is reconciled during first run to correct port and type per expectations. + if err := c.UpdateKubernetesService(true); err != nil { glog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err) } @@ -97,14 +98,17 @@ func (c *Controller) Start() { // RunKubernetesService periodically updates the kubernetes service func (c *Controller) RunKubernetesService(ch chan struct{}) { util.Until(func() { - if err := c.UpdateKubernetesService(); err != nil { + // Service definition is not reconciled after first + // run, ports and type will be corrected only during + // start. + if err := c.UpdateKubernetesService(false); err != nil { util.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err)) } }, c.EndpointInterval, ch) } // UpdateKubernetesService attempts to update the default Kube service. -func (c *Controller) UpdateKubernetesService() error { +func (c *Controller) UpdateKubernetesService(reconcile bool) error { // Update service & endpoint records. // TODO: when it becomes possible to change this stuff, // stop polling and start watching. @@ -114,11 +118,11 @@ func (c *Controller) UpdateKubernetesService() error { } if c.ServiceIP != nil { servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts) - if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType); err != nil { + if err := c.CreateOrUpdateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType, reconcile); err != nil { return err } endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) - if err := c.SetEndpoints("kubernetes", c.PublicIP, endpointPorts); err != nil { + if err := c.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil { return err } } @@ -179,10 +183,17 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndp // CreateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. -func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType) error { +func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType, reconcile bool) error { ctx := api.NewDefaultContext() - if _, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil { + if s, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil { // The service already exists. + if reconcile { + if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { + glog.Warningf("Resetting master service %q to %#v", serviceName, svc) + _, err := c.ServiceRegistry.UpdateService(ctx, svc) + return err + } + } return nil } svc := &api.Service{ @@ -211,20 +222,20 @@ func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP n return err } -// SetEndpoints sets the endpoints for the given apiserver service (ro or rw). -// SetEndpoints expects that the endpoints objects it manages will all be -// managed only by SetEndpoints; therefore, to understand this, you need only +// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). +// ReconcileEndpoints expects that the endpoints objects it manages will all be +// managed only by ReconcileEndpoints; therefore, to understand this, you need only // understand the requirements and the body of this function. // // Requirements: // * All apiservers MUST use the same ports for their {rw, ro} services. -// * All apiservers MUST use SetEndpoints and only SetEndpoints to manage the +// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the // endpoints for their {rw, ro} services. // * All apiservers MUST know and agree on the number of apiservers expected // to be running (c.masterCount). -// * SetEndpoints is called periodically from all apiservers. +// * ReconcileEndpoints is called periodically from all apiservers. // -func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { +func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { ctx := api.NewDefaultContext() e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName) if err != nil { @@ -238,7 +249,7 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [ // First, determine if the endpoint is in the format we expect (one // subset, ports matching endpointPorts, N IP addresses). - formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount) + formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount, reconcilePorts) if !formatCorrect { // Something is egregiously wrong, just re-make the endpoints record. e.Subsets = []api.EndpointSubset{{ @@ -247,7 +258,11 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [ }} glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) return c.EndpointRegistry.UpdateEndpoints(ctx, e) - } else if !ipCorrect { + } + if ipCorrect && portsCorrect { + return nil + } + if !ipCorrect { // We *always* add our own IP address. e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()}) @@ -271,38 +286,83 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [ } } } - return c.EndpointRegistry.UpdateEndpoints(ctx, e) } - // We didn't make any changes, no need to actually call update. - return nil + if !portsCorrect { + // Reset ports. + e.Subsets[0].Ports = endpointPorts + } + glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) + return c.EndpointRegistry.UpdateEndpoints(ctx, e) } -// Determine if the endpoint is in the format SetEndpoints expect (one subset, +// Determine if the endpoint is in the format ReconcileEndpoints expect (one subset, // correct ports, N IP addresses); and if the specified IP address is present and // the correct number of ip addresses are found. -func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int) (formatCorrect, ipCorrect bool) { +func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) { if len(e.Subsets) != 1 { - return false, false + return false, false, false } sub := &e.Subsets[0] - if len(sub.Ports) != len(ports) { - return false, false - } - for _, port := range ports { - contains := false - for _, subPort := range sub.Ports { - if port == subPort { - contains = true - } + portsCorrect = true + if reconcilePorts { + if len(sub.Ports) != len(ports) { + portsCorrect = false } - if !contains { - return false, false + for i, port := range ports { + if len(sub.Ports) <= i || port != sub.Ports[i] { + portsCorrect = false + break + } } } for _, addr := range sub.Addresses { if addr.IP == ip { - return true, len(sub.Addresses) == count + ipCorrect = len(sub.Addresses) == count + break } } - return true, false + return true, ipCorrect, portsCorrect +} + +// * getMasterServiceUpdateIfNeeded sets service attributes for the +// given apiserver service. +// * getMasterServiceUpdateIfNeeded expects that the service object it +// manages will be managed only by getMasterServiceUpdateIfNeeded; +// therefore, to understand this, you need only understand the +// requirements and the body of this function. +// * getMasterServiceUpdateIfNeeded ensures that the correct ports are +// are set. +// +// Requirements: +// * All apiservers MUST use getMasterServiceUpdateIfNeeded and only +// getMasterServiceUpdateIfNeeded to manage service attributes +// * updateMasterService is called periodically from all apiservers. +func getMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.ServicePort, serviceType api.ServiceType) (s *api.Service, updated bool) { + // Determine if the service is in the format we expect + // (servicePorts are present and service type matches) + formatCorrect := checkServiceFormat(svc, servicePorts, serviceType) + if formatCorrect { + return svc, false + } + svc.Spec.Ports = servicePorts + svc.Spec.Type = serviceType + return svc, true +} + +// Determine if the service is in the correct format +// getMasterServiceUpdateIfNeeded expects (servicePorts are correct +// and service type matches). +func checkServiceFormat(s *api.Service, ports []api.ServicePort, serviceType api.ServiceType) (formatCorrect bool) { + if s.Spec.Type != serviceType { + return false + } + if len(ports) != len(s.Spec.Ports) { + return false + } + for i, port := range ports { + if port != s.Spec.Ports[i] { + return false + } + } + return true } diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index 34db2eabfa..713d41214a 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -17,20 +17,22 @@ limitations under the License. package master import ( + "errors" "net" "reflect" "testing" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/registry/registrytest" + "k8s.io/kubernetes/pkg/util" ) -func TestSetEndpoints(t *testing.T) { +func TestReconcileEndpoints(t *testing.T) { ns := api.NamespaceDefault om := func(name string) api.ObjectMeta { return api.ObjectMeta{Namespace: ns, Name: name} } - tests := []struct { + reconcile_tests := []struct { testName string serviceName string ip string @@ -291,29 +293,6 @@ func TestSetEndpoints(t *testing.T) { }}, }, }, - { - testName: "existing endpoints extra un-ordered service ports satisfy", - serviceName: "foo", - ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ - {Name: "baz", Port: 1010, Protocol: "TCP"}, - {Name: "foo", Port: 8080, Protocol: "TCP"}, - {Name: "bar", Port: 1000, Protocol: "TCP"}, - }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ - ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{ - {Name: "bar", Port: 1000, Protocol: "TCP"}, - {Name: "foo", Port: 8080, Protocol: "TCP"}, - {Name: "baz", Port: 1010, Protocol: "TCP"}, - }, - }}, - }}, - }, - }, { testName: "existing endpoints extra service ports missing port", serviceName: "foo", @@ -343,13 +322,488 @@ func TestSetEndpoints(t *testing.T) { }, }, } - for _, test := range tests { + for _, test := range reconcile_tests { master := Controller{MasterCount: test.additionalMasters + 1} registry := ®istrytest.EndpointRegistry{ Endpoints: test.endpoints, } master.EndpointRegistry = registry - err := master.SetEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) + err := master.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + if test.expectUpdate != nil { + if len(registry.Updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectUpdate == nil && len(registry.Updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + } + } + + non_reconcile_tests := []struct { + testName string + serviceName string + ip string + endpointPorts []api.EndpointPort + additionalMasters int + endpoints *api.EndpointsList + expectUpdate *api.Endpoints // nil means none expected + }{ + { + testName: "existing endpoints extra service ports missing port no update", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: nil, + }, + { + testName: "existing endpoints extra service ports, wrong ports, wrong IP", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "no existing endpoints", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: nil, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + } + for _, test := range non_reconcile_tests { + master := Controller{MasterCount: test.additionalMasters + 1} + registry := ®istrytest.EndpointRegistry{ + Endpoints: test.endpoints, + } + master.EndpointRegistry = registry + err := master.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + if test.expectUpdate != nil { + if len(registry.Updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectUpdate == nil && len(registry.Updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + } + } + +} + +func TestCreateOrUpdateMasterService(t *testing.T) { + ns := api.NamespaceDefault + om := func(name string) api.ObjectMeta { + return api.ObjectMeta{Namespace: ns, Name: name} + } + + create_tests := []struct { + testName string + serviceName string + servicePorts []api.ServicePort + serviceType api.ServiceType + expectCreate *api.Service // nil means none expected + }{ + { + testName: "service does not exist", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + expectCreate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + } + for _, test := range create_tests { + master := Controller{MasterCount: 1} + registry := ®istrytest.ServiceRegistry{ + Err: errors.New("unable to get svc"), + } + master.ServiceRegistry = registry + master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, net.ParseIP("1.2.3.4"), test.servicePorts, test.serviceType, false) + if test.expectCreate != nil { + if len(registry.List.Items) != 1 { + t.Errorf("case %q: unexpected creations: %v", test.testName, registry.List.Items) + } else if e, a := test.expectCreate.Spec, registry.List.Items[0].Spec; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectCreate == nil && len(registry.List.Items) > 1 { + t.Errorf("case %q: no create expected, yet saw: %v", test.testName, registry.List.Items) + } + } + + reconcile_tests := []struct { + testName string + serviceName string + servicePorts []api.ServicePort + serviceType api.ServiceType + service *api.Service + expectUpdate *api.Service // nil means none expected + }{ + { + testName: "service definition wrong port", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8000, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + { + testName: "service definition missing port", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + {Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(1000)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + {Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(1000)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + { + testName: "service definition incorrect port", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "bar", Port: 1000, Protocol: "UDP", TargetPort: util.NewIntOrStringFromInt(1000)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + { + testName: "service definition incorrect port name", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 1000, Protocol: "UDP", TargetPort: util.NewIntOrStringFromInt(1000)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + { + testName: "service definition incorrect target port", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(1000)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + { + testName: "service definition incorrect protocol", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "UDP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + { + testName: "service definition has incorrect type", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeNodePort, + }, + }, + expectUpdate: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + }, + { + testName: "service definition satisfies", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: nil, + }, + } + for _, test := range reconcile_tests { + master := Controller{MasterCount: 1} + registry := ®istrytest.ServiceRegistry{ + Service: test.service, + } + master.ServiceRegistry = registry + err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, net.ParseIP("1.2.3.4"), test.servicePorts, test.serviceType, true) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + if test.expectUpdate != nil { + if len(registry.Updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectUpdate == nil && len(registry.Updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + } + } + + non_reconcile_tests := []struct { + testName string + serviceName string + servicePorts []api.ServicePort + serviceType api.ServiceType + service *api.Service + expectUpdate *api.Service // nil means none expected + }{ + { + testName: "service definition wrong port, no expected update", + serviceName: "foo", + servicePorts: []api.ServicePort{ + {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + }, + serviceType: api.ServiceTypeClusterIP, + service: &api.Service{ + ObjectMeta: om("foo"), + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "foo", Port: 1000, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(1000)}, + }, + Selector: nil, + ClusterIP: "1.2.3.4", + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + }, + }, + expectUpdate: nil, + }, + } + for _, test := range non_reconcile_tests { + master := Controller{MasterCount: 1} + registry := ®istrytest.ServiceRegistry{ + Service: test.service, + } + master.ServiceRegistry = registry + err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, net.ParseIP("1.2.3.4"), test.servicePorts, test.serviceType, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 751ef88998..643c306cb7 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -31,6 +31,7 @@ type ServiceRegistry struct { mu sync.Mutex List api.ServiceList Service *api.Service + Updates []api.Service Err error DeletedID string @@ -101,6 +102,7 @@ func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) (*api r.UpdatedID = svc.Name *r.Service = *svc + r.Updates = append(r.Updates, *svc) return svc, r.Err }