Populate endpoints and allow ports with headless service

pull/6/head
xiangpengzhao 2017-06-09 23:22:37 +08:00
parent 11b5956f7a
commit 9e31eb280a
8 changed files with 161 additions and 76 deletions

View File

@ -4012,13 +4012,11 @@ func validateEndpointSubsets(subsets []api.EndpointSubset, oldSubsets []api.Endp
ss := &subsets[i]
idxPath := fldPath.Index(i)
// EndpointSubsets must include endpoint address. For headless service, we allow its endpoints not to have ports.
if len(ss.Addresses) == 0 && len(ss.NotReadyAddresses) == 0 {
//TODO: consider adding a RequiredOneOf() error for this and similar cases
allErrs = append(allErrs, field.Required(idxPath, "must specify `addresses` or `notReadyAddresses`"))
}
if len(ss.Ports) == 0 {
allErrs = append(allErrs, field.Required(idxPath.Child("ports"), ""))
}
for addr := range ss.Addresses {
allErrs = append(allErrs, validateEndpointAddress(&ss.Addresses[addr], idxPath.Child("addresses").Index(addr), ipToNodeName)...)
}

View File

@ -9463,6 +9463,14 @@ func TestValidateEndpoints(t *testing.T) {
},
},
},
"empty ports": {
ObjectMeta: metav1.ObjectMeta{Name: "mysvc", Namespace: "namespace"},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "10.10.3.3"}},
},
},
},
}
for k, v := range successCases {
@ -9505,17 +9513,6 @@ func TestValidateEndpoints(t *testing.T) {
},
errorType: "FieldValueRequired",
},
"empty ports": {
endpoints: api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "mysvc", Namespace: "namespace"},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "10.10.3.3"}},
},
},
},
errorType: "FieldValueRequired",
},
"invalid IP": {
endpoints: api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "mysvc", Namespace: "namespace"},

View File

@ -354,8 +354,6 @@ func (e *EndpointController) syncService(key string) error {
return err
}
subsets := []v1.EndpointSubset{}
var tolerateUnreadyEndpoints bool
if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
b, err := strconv.ParseBool(v)
@ -366,20 +364,11 @@ func (e *EndpointController) syncService(key string) error {
}
}
readyEps := 0
notReadyEps := 0
subsets := []v1.EndpointSubset{}
var totalReadyEps int = 0
var totalNotReadyEps int = 0
for _, pod := range pods {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portName := servicePort.Name
portProto := servicePort.Protocol
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
if len(pod.Status.PodIP) == 0 {
glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
@ -389,7 +378,6 @@ func (e *EndpointController) syncService(key string) error {
continue
}
epp := v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
epa := v1.EndpointAddress{
IP: pod.Status.PodIP,
NodeName: &pod.Spec.NodeName,
@ -402,25 +390,33 @@ func (e *EndpointController) syncService(key string) error {
}}
hostname := pod.Spec.Hostname
if len(hostname) > 0 &&
pod.Spec.Subdomain == service.Name &&
service.Namespace == pod.Namespace {
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {
epa.Hostname = hostname
}
if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
subsets = append(subsets, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{epa},
Ports: []v1.EndpointPort{epp},
})
readyEps++
// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
epp := v1.EndpointPort{Port: 0, Protocol: v1.ProtocolTCP}
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
}
} else {
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
Ports: []v1.EndpointPort{epp},
})
notReadyEps++
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portName := servicePort.Name
portProto := servicePort.Protocol
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
var readyEps, notReadyEps int
epp := v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
@ -457,7 +453,7 @@ func (e *EndpointController) syncService(key string) error {
newEndpoints.Annotations = make(map[string]string)
}
glog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, readyEps, notReadyEps)
glog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
if createEndpoints {
// No previous endpoints, create them
@ -508,3 +504,24 @@ func (e *EndpointController) checkLeftoverEndpoints() {
e.queue.Add(key)
}
}
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
epp v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
var readyEps int = 0
var notReadyEps int = 0
if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
subsets = append(subsets, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{epa},
Ports: []v1.EndpointPort{epp},
})
readyEps++
} else {
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
Ports: []v1.EndpointPort{epp},
})
notReadyEps++
}
return subsets, readyEps, notReadyEps
}

View File

@ -620,3 +620,44 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
}()
}
}
func TestSyncEndpointsHeadlessService(t *testing.T) {
ns := "headless"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
ClusterIP: api.ClusterIPNone,
Ports: []v1.ServicePort{},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 0, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}

View File

@ -179,6 +179,7 @@ func (s ServiceExternalNameGeneratorV1) Generate(params map[string]interface{})
}
// validate validates required fields are set to support structured generation
// TODO(xiangpengzhao): validate ports are identity mapped for headless service when we enforce that in validation.validateServicePort.
func (s ServiceCommonGeneratorV1) validate() error {
if len(s.Name) == 0 {
return fmt.Errorf("name must be specified")
@ -189,9 +190,6 @@ func (s ServiceCommonGeneratorV1) validate() error {
if s.ClusterIP == api.ClusterIPNone && s.Type != api.ServiceTypeClusterIP {
return fmt.Errorf("ClusterIP=None can only be used with ClusterIP service type")
}
if s.ClusterIP == api.ClusterIPNone && len(s.TCP) > 0 {
return fmt.Errorf("can not map ports with clusterip=None")
}
if s.ClusterIP != api.ClusterIPNone && len(s.TCP) == 0 && s.Type != api.ServiceTypeExternalName {
return fmt.Errorf("at least one tcp port specifier must be provided")
}

View File

@ -57,13 +57,6 @@ func TestServiceBasicGenerate(t *testing.T) {
serviceType: api.ServiceTypeClusterIP,
expectErr: true,
},
{
name: "clusterip-none and port mapping",
tcp: []string{"456:9898"},
clusterip: "None",
serviceType: api.ServiceTypeClusterIP,
expectErr: true,
},
{
name: "clusterip-none-wrong-type",
tcp: []string{},
@ -88,6 +81,23 @@ func TestServiceBasicGenerate(t *testing.T) {
},
expectErr: false,
},
{
name: "clusterip-none-and-port-mapping",
tcp: []string{"456:9898"},
clusterip: "None",
serviceType: api.ServiceTypeClusterIP,
expected: &api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "clusterip-none-and-port-mapping",
Labels: map[string]string{"app": "clusterip-none-and-port-mapping"},
},
Spec: api.ServiceSpec{Type: "ClusterIP",
Ports: []api.ServicePort{{Name: "456-9898", Protocol: "TCP", Port: 456, TargetPort: intstr.IntOrString{Type: 0, IntVal: 9898, StrVal: ""}, NodePort: 0}},
Selector: map[string]string{"app": "clusterip-none-and-port-mapping"},
ClusterIP: "None", ExternalIPs: []string(nil), LoadBalancerIP: ""},
},
expectErr: false,
},
{
name: "loadbalancer-ok",
tcp: []string{"456:9898"},

View File

@ -560,6 +560,26 @@ func TestGenerateService(t *testing.T) {
},
},
},
{
generator: ServiceGeneratorV2{},
params: map[string]interface{}{
"selector": "foo=bar",
"name": "test",
"cluster-ip": "None",
},
expected: api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: api.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
},
Ports: []api.ServicePort{},
ClusterIP: api.ClusterIPNone,
},
},
},
}
for _, test := range tests {
obj, err := test.generator.Generate(test.params)

View File

@ -1154,6 +1154,10 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
svc.Spec.ClusterIP = api.ClusterIPNone
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
}),
makeTestService("somewhere-else", "headless-without-port", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = api.ClusterIPNone
}),
)
// Headless service should be ignored