From 9e31eb280a70db59cce6434417b16c57972ed1a3 Mon Sep 17 00:00:00 2001 From: xiangpengzhao Date: Fri, 9 Jun 2017 23:22:37 +0800 Subject: [PATCH] Populate endpoints and allow ports with headless service --- pkg/api/validation/validation.go | 4 +- pkg/api/validation/validation_test.go | 19 ++- .../endpoint/endpoints_controller.go | 121 ++++++++++-------- .../endpoint/endpoints_controller_test.go | 41 ++++++ pkg/kubectl/service_basic.go | 4 +- pkg/kubectl/service_basic_test.go | 24 +++- pkg/kubectl/service_test.go | 20 +++ pkg/proxy/iptables/proxier_test.go | 4 + 8 files changed, 161 insertions(+), 76 deletions(-) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 99c63f9a38..fc11919249 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -4012,13 +4012,11 @@ func validateEndpointSubsets(subsets []api.EndpointSubset, oldSubsets []api.Endp ss := &subsets[i] idxPath := fldPath.Index(i) + // EndpointSubsets must include endpoint address. For headless service, we allow its endpoints not to have ports. if len(ss.Addresses) == 0 && len(ss.NotReadyAddresses) == 0 { //TODO: consider adding a RequiredOneOf() error for this and similar cases allErrs = append(allErrs, field.Required(idxPath, "must specify `addresses` or `notReadyAddresses`")) } - if len(ss.Ports) == 0 { - allErrs = append(allErrs, field.Required(idxPath.Child("ports"), "")) - } for addr := range ss.Addresses { allErrs = append(allErrs, validateEndpointAddress(&ss.Addresses[addr], idxPath.Child("addresses").Index(addr), ipToNodeName)...) } diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index e29e97aaad..788b2ee59e 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -9463,6 +9463,14 @@ func TestValidateEndpoints(t *testing.T) { }, }, }, + "empty ports": { + ObjectMeta: metav1.ObjectMeta{Name: "mysvc", Namespace: "namespace"}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "10.10.3.3"}}, + }, + }, + }, } for k, v := range successCases { @@ -9505,17 +9513,6 @@ func TestValidateEndpoints(t *testing.T) { }, errorType: "FieldValueRequired", }, - "empty ports": { - endpoints: api.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Name: "mysvc", Namespace: "namespace"}, - Subsets: []api.EndpointSubset{ - { - Addresses: []api.EndpointAddress{{IP: "10.10.3.3"}}, - }, - }, - }, - errorType: "FieldValueRequired", - }, "invalid IP": { endpoints: api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: "mysvc", Namespace: "namespace"}, diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index f13b80f8de..78cf37a95a 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -354,8 +354,6 @@ func (e *EndpointController) syncService(key string) error { return err } - subsets := []v1.EndpointSubset{} - var tolerateUnreadyEndpoints bool if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok { b, err := strconv.ParseBool(v) @@ -366,61 +364,59 @@ func (e *EndpointController) syncService(key string) error { } } - readyEps := 0 - notReadyEps := 0 + subsets := []v1.EndpointSubset{} + var totalReadyEps int = 0 + var totalNotReadyEps int = 0 + for _, pod := range pods { + if len(pod.Status.PodIP) == 0 { + glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) + continue + } + if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil { + glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name) + continue + } - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] + epa := v1.EndpointAddress{ + IP: pod.Status.PodIP, + NodeName: &pod.Spec.NodeName, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }} - portName := servicePort.Name - portProto := servicePort.Protocol - portNum, err := podutil.FindPort(pod, servicePort) - if err != nil { - glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) - continue - } - if len(pod.Status.PodIP) == 0 { - glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) - continue - } - if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil { - glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name) - continue + hostname := pod.Spec.Hostname + if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace { + epa.Hostname = hostname + } + + // Allow headless service not to have ports. + if len(service.Spec.Ports) == 0 { + if service.Spec.ClusterIP == api.ClusterIPNone { + epp := v1.EndpointPort{Port: 0, Protocol: v1.ProtocolTCP} + subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints) } + } else { + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] - epp := v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto} - epa := v1.EndpointAddress{ - IP: pod.Status.PodIP, - NodeName: &pod.Spec.NodeName, - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Namespace: pod.ObjectMeta.Namespace, - Name: pod.ObjectMeta.Name, - UID: pod.ObjectMeta.UID, - ResourceVersion: pod.ObjectMeta.ResourceVersion, - }} + portName := servicePort.Name + portProto := servicePort.Protocol + portNum, err := podutil.FindPort(pod, servicePort) + if err != nil { + glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) + continue + } - hostname := pod.Spec.Hostname - if len(hostname) > 0 && - pod.Spec.Subdomain == service.Name && - service.Namespace == pod.Namespace { - epa.Hostname = hostname - } - - if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) { - subsets = append(subsets, v1.EndpointSubset{ - Addresses: []v1.EndpointAddress{epa}, - Ports: []v1.EndpointPort{epp}, - }) - readyEps++ - } else { - glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) - subsets = append(subsets, v1.EndpointSubset{ - NotReadyAddresses: []v1.EndpointAddress{epa}, - Ports: []v1.EndpointPort{epp}, - }) - notReadyEps++ + var readyEps, notReadyEps int + epp := v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto} + subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints) + totalReadyEps = totalReadyEps + readyEps + totalNotReadyEps = totalNotReadyEps + notReadyEps } } } @@ -457,7 +453,7 @@ func (e *EndpointController) syncService(key string) error { newEndpoints.Annotations = make(map[string]string) } - glog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, readyEps, notReadyEps) + glog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) createEndpoints := len(currentEndpoints.ResourceVersion) == 0 if createEndpoints { // No previous endpoints, create them @@ -508,3 +504,24 @@ func (e *EndpointController) checkLeftoverEndpoints() { e.queue.Add(key) } } + +func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, + epp v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) { + var readyEps int = 0 + var notReadyEps int = 0 + if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) { + subsets = append(subsets, v1.EndpointSubset{ + Addresses: []v1.EndpointAddress{epa}, + Ports: []v1.EndpointPort{epp}, + }) + readyEps++ + } else { + glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) + subsets = append(subsets, v1.EndpointSubset{ + NotReadyAddresses: []v1.EndpointAddress{epa}, + Ports: []v1.EndpointPort{epp}, + }) + notReadyEps++ + } + return subsets, readyEps, notReadyEps +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index bd3a81a60b..9a45cd84e2 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -620,3 +620,44 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) { }() } } + +func TestSyncEndpointsHeadlessService(t *testing.T) { + ns := "headless" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, + }}, + }) + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{}, + ClusterIP: api.ClusterIPNone, + Ports: []v1.ServicePort{}, + }, + }) + endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: []v1.EndpointPort{{Port: 0, Protocol: "TCP"}}, + }}, + }) + endpointsHandler.ValidateRequestCount(t, 1) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} diff --git a/pkg/kubectl/service_basic.go b/pkg/kubectl/service_basic.go index a28a76f3cc..d53cc60fcd 100644 --- a/pkg/kubectl/service_basic.go +++ b/pkg/kubectl/service_basic.go @@ -179,6 +179,7 @@ func (s ServiceExternalNameGeneratorV1) Generate(params map[string]interface{}) } // validate validates required fields are set to support structured generation +// TODO(xiangpengzhao): validate ports are identity mapped for headless service when we enforce that in validation.validateServicePort. func (s ServiceCommonGeneratorV1) validate() error { if len(s.Name) == 0 { return fmt.Errorf("name must be specified") @@ -189,9 +190,6 @@ func (s ServiceCommonGeneratorV1) validate() error { if s.ClusterIP == api.ClusterIPNone && s.Type != api.ServiceTypeClusterIP { return fmt.Errorf("ClusterIP=None can only be used with ClusterIP service type") } - if s.ClusterIP == api.ClusterIPNone && len(s.TCP) > 0 { - return fmt.Errorf("can not map ports with clusterip=None") - } if s.ClusterIP != api.ClusterIPNone && len(s.TCP) == 0 && s.Type != api.ServiceTypeExternalName { return fmt.Errorf("at least one tcp port specifier must be provided") } diff --git a/pkg/kubectl/service_basic_test.go b/pkg/kubectl/service_basic_test.go index b0363e7dfc..a7fea48608 100644 --- a/pkg/kubectl/service_basic_test.go +++ b/pkg/kubectl/service_basic_test.go @@ -57,13 +57,6 @@ func TestServiceBasicGenerate(t *testing.T) { serviceType: api.ServiceTypeClusterIP, expectErr: true, }, - { - name: "clusterip-none and port mapping", - tcp: []string{"456:9898"}, - clusterip: "None", - serviceType: api.ServiceTypeClusterIP, - expectErr: true, - }, { name: "clusterip-none-wrong-type", tcp: []string{}, @@ -88,6 +81,23 @@ func TestServiceBasicGenerate(t *testing.T) { }, expectErr: false, }, + { + name: "clusterip-none-and-port-mapping", + tcp: []string{"456:9898"}, + clusterip: "None", + serviceType: api.ServiceTypeClusterIP, + expected: &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clusterip-none-and-port-mapping", + Labels: map[string]string{"app": "clusterip-none-and-port-mapping"}, + }, + Spec: api.ServiceSpec{Type: "ClusterIP", + Ports: []api.ServicePort{{Name: "456-9898", Protocol: "TCP", Port: 456, TargetPort: intstr.IntOrString{Type: 0, IntVal: 9898, StrVal: ""}, NodePort: 0}}, + Selector: map[string]string{"app": "clusterip-none-and-port-mapping"}, + ClusterIP: "None", ExternalIPs: []string(nil), LoadBalancerIP: ""}, + }, + expectErr: false, + }, { name: "loadbalancer-ok", tcp: []string{"456:9898"}, diff --git a/pkg/kubectl/service_test.go b/pkg/kubectl/service_test.go index 6266779c52..bd80422ea3 100644 --- a/pkg/kubectl/service_test.go +++ b/pkg/kubectl/service_test.go @@ -560,6 +560,26 @@ func TestGenerateService(t *testing.T) { }, }, }, + { + generator: ServiceGeneratorV2{}, + params: map[string]interface{}{ + "selector": "foo=bar", + "name": "test", + "cluster-ip": "None", + }, + expected: api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Ports: []api.ServicePort{}, + ClusterIP: api.ClusterIPNone, + }, + }, + }, } for _, test := range tests { obj, err := test.generator.Generate(test.params) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 989c244776..31e7b1f9d9 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1154,6 +1154,10 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { svc.Spec.ClusterIP = api.ClusterIPNone svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) }), + makeTestService("somewhere-else", "headless-without-port", func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeClusterIP + svc.Spec.ClusterIP = api.ClusterIPNone + }), ) // Headless service should be ignored