Merge pull request #30253 from AdoHe/fix_nodeport

Automatic merge from submit-queue

Allow services which use same port, different protocol to use the same nodePort for both

fix #20092

@thockin @smarterclayton ptal.
pull/6/head
Kubernetes Submit Queue 2016-08-25 23:16:03 -07:00 committed by GitHub
commit 8705a41c56
3 changed files with 215 additions and 14 deletions

View File

@ -114,24 +114,46 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
}
assignNodePorts := shouldAssignNodePorts(service)
svcPortToNodePort := map[int]int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if servicePort.NodePort != 0 {
err := nodePortOp.Allocate(int(servicePort.NodePort))
if err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), servicePort.NodePort, err.Error())}
return nil, errors.NewInvalid(api.Kind("Service"), service.Name, el)
allocatedNodePort := svcPortToNodePort[int(servicePort.Port)]
if allocatedNodePort == 0 {
// This will only scan forward in the service.Spec.Ports list because any matches
// before the current port would have been found in svcPortToNodePort. This is really
// looking for any user provided values.
np := findRequestedNodePort(int(servicePort.Port), service.Spec.Ports)
if np != 0 {
err := nodePortOp.Allocate(np)
if err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), np, err.Error())}
return nil, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
servicePort.NodePort = int32(np)
svcPortToNodePort[int(servicePort.Port)] = np
} else if assignNodePorts {
nodePort, err := nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return nil, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
servicePort.NodePort = int32(nodePort)
svcPortToNodePort[int(servicePort.Port)] = nodePort
}
} else if assignNodePorts {
nodePort, err := nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return nil, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
} else if int(servicePort.NodePort) != allocatedNodePort {
if servicePort.NodePort == 0 {
servicePort.NodePort = int32(allocatedNodePort)
} else {
err := nodePortOp.Allocate(int(servicePort.NodePort))
if err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), servicePort.NodePort, err.Error())}
return nil, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
servicePort.NodePort = int32(nodePort)
}
}
@ -438,3 +460,15 @@ func shouldCheckOrAssignHealthCheckNodePort(service *api.Service) bool {
glog.V(4).Infof("Service type: %v does not need health check node port", service.Spec.Type)
return false
}
// Loop through the service ports list, find one with the same port number and
// NodePort specified, return this NodePort otherwise return 0.
func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
for i := range servicePorts {
servicePort := servicePorts[i]
if port == int(servicePort.Port) && servicePort.NodePort != 0 {
return int(servicePort.NodePort)
}
}
return 0
}

View File

@ -18,6 +18,7 @@ package service
import (
"net"
"reflect"
"strings"
"testing"
@ -115,6 +116,125 @@ func TestServiceRegistryCreate(t *testing.T) {
}
}
func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) {
storage, registry := NewTestREST(t, nil)
testCases := []struct {
svc *api.Service
name string
expectNodePorts []int
}{
{
svc: &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo1"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-tcp",
Port: 53,
NodePort: 30053,
TargetPort: intstr.FromInt(6503),
Protocol: api.ProtocolTCP,
},
{
Name: "port-udp",
Port: 53,
NodePort: 30053,
TargetPort: intstr.FromInt(6503),
Protocol: api.ProtocolUDP,
},
},
},
},
name: "foo1",
expectNodePorts: []int{30053, 30053},
},
{
svc: &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo2"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-tcp",
Port: 54,
TargetPort: intstr.FromInt(6504),
Protocol: api.ProtocolTCP,
},
{
Name: "port-udp",
Port: 54,
NodePort: 30054,
TargetPort: intstr.FromInt(6504),
Protocol: api.ProtocolUDP,
},
},
},
},
name: "foo2",
expectNodePorts: []int{30054, 30054},
},
{
svc: &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo3"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-tcp",
Port: 55,
NodePort: 30055,
TargetPort: intstr.FromInt(6505),
Protocol: api.ProtocolTCP,
},
{
Name: "port-udp",
Port: 55,
NodePort: 30056,
TargetPort: intstr.FromInt(6506),
Protocol: api.ProtocolUDP,
},
},
},
},
name: "foo3",
expectNodePorts: []int{30055, 30056},
},
}
ctx := api.NewDefaultContext()
for _, test := range testCases {
created_svc, err := storage.Create(ctx, test.svc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
created_service := created_svc.(*api.Service)
if !api.HasObjectMetaSystemFieldValues(&created_service.ObjectMeta) {
t.Errorf("storage did not populate object meta field values")
}
if created_service.Name != test.name {
t.Errorf("Expected %s, but got %s", test.name, created_service.Name)
}
serviceNodePorts := CollectServiceNodePorts(created_service)
if !reflect.DeepEqual(serviceNodePorts, test.expectNodePorts) {
t.Errorf("Expected %v, but got %v", test.expectNodePorts, serviceNodePorts)
}
srv, err := registry.GetService(ctx, test.name)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv == nil {
t.Errorf("Failed to find service: %s", test.name)
}
}
}
func TestServiceStorageValidatesCreate(t *testing.T) {
storage, _ := NewTestREST(t, nil)
failureCases := map[string]api.Service{

View File

@ -724,6 +724,53 @@ var _ = framework.KubeDescribe("Services", func() {
}
})
It("should use same NodePort with same port but different protocols", func() {
serviceName := "nodeports"
ns := f.Namespace.Name
t := NewServerTest(c, ns, serviceName)
defer func() {
defer GinkgoRecover()
errs := t.Cleanup()
if len(errs) != 0 {
framework.Failf("errors in cleanup: %v", errs)
}
}()
By("creating service " + serviceName + " with same NodePort but different protocols in namespace " + ns)
service := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: t.ServiceName,
Namespace: t.Namespace,
},
Spec: api.ServiceSpec{
Selector: t.Labels,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "tcp-port",
Port: 53,
Protocol: api.ProtocolTCP,
},
{
Name: "udp-port",
Port: 53,
Protocol: api.ProtocolUDP,
},
},
},
}
result, err := t.CreateService(service)
Expect(err).NotTo(HaveOccurred())
if len(result.Spec.Ports) != 2 {
framework.Failf("got unexpected len(Spec.Ports) for new service: %v", result)
}
if result.Spec.Ports[0].NodePort != result.Spec.Ports[1].NodePort {
framework.Failf("should use same NodePort for new service: %v", result)
}
})
It("should prevent NodePort collisions", func() {
// TODO: use the ServiceTestJig here
baseName := "nodeport-collision-"