Merge pull request #67038 from jennybuckley/dry-run-services

Automatic merge from submit-queue (batch tested with PRs 67323, 66717, 67038). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Prevent side effects on dryrun in service registry

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-08-27 16:27:06 -07:00 committed by GitHub
commit 3da79f5cab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 611 additions and 39 deletions

View File

@ -23,7 +23,10 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["allocator_test.go"], srcs = [
"allocator_test.go",
"operation_test.go",
],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",

View File

@ -33,15 +33,18 @@ type PortAllocationOperation struct {
allocated []int allocated []int
releaseDeferred []int releaseDeferred []int
shouldRollback bool shouldRollback bool
dryRun bool
} }
// Creates a portAllocationOperation, tracking a set of allocations & releases // Creates a portAllocationOperation, tracking a set of allocations & releases
func StartOperation(pa Interface) *PortAllocationOperation { // If dryRun is specified, never actually allocate or release anything
func StartOperation(pa Interface, dryRun bool) *PortAllocationOperation {
op := &PortAllocationOperation{} op := &PortAllocationOperation{}
op.pa = pa op.pa = pa
op.allocated = []int{} op.allocated = []int{}
op.releaseDeferred = []int{} op.releaseDeferred = []int{}
op.shouldRollback = true op.shouldRollback = true
op.dryRun = dryRun
return op return op
} }
@ -54,6 +57,10 @@ func (op *PortAllocationOperation) Finish() {
// (Try to) undo any operations we did // (Try to) undo any operations we did
func (op *PortAllocationOperation) Rollback() []error { func (op *PortAllocationOperation) Rollback() []error {
if op.dryRun {
return nil
}
errors := []error{} errors := []error{}
for _, allocated := range op.allocated { for _, allocated := range op.allocated {
@ -73,6 +80,10 @@ func (op *PortAllocationOperation) Rollback() []error {
// Note that even if this fails, we don't rollback; we always want to err on the side of over-allocation, // Note that even if this fails, we don't rollback; we always want to err on the side of over-allocation,
// and Commit should be called _after_ the owner is written // and Commit should be called _after_ the owner is written
func (op *PortAllocationOperation) Commit() []error { func (op *PortAllocationOperation) Commit() []error {
if op.dryRun {
return nil
}
errors := []error{} errors := []error{}
for _, release := range op.releaseDeferred { for _, release := range op.releaseDeferred {
@ -95,6 +106,19 @@ func (op *PortAllocationOperation) Commit() []error {
// Allocates a port, and record it for future rollback // Allocates a port, and record it for future rollback
func (op *PortAllocationOperation) Allocate(port int) error { func (op *PortAllocationOperation) Allocate(port int) error {
if op.dryRun {
if op.pa.Has(port) {
return ErrAllocated
}
for _, a := range op.allocated {
if port == a {
return ErrAllocated
}
}
op.allocated = append(op.allocated, port)
return nil
}
err := op.pa.Allocate(port) err := op.pa.Allocate(port)
if err == nil { if err == nil {
op.allocated = append(op.allocated, port) op.allocated = append(op.allocated, port)
@ -104,6 +128,33 @@ func (op *PortAllocationOperation) Allocate(port int) error {
// Allocates a port, and record it for future rollback // Allocates a port, and record it for future rollback
func (op *PortAllocationOperation) AllocateNext() (int, error) { func (op *PortAllocationOperation) AllocateNext() (int, error) {
if op.dryRun {
// Find the max element of the allocated ports array.
// If no ports are already being allocated by this operation,
// then choose a sensible guess for a dummy port number
var lastPort int
for _, allocatedPort := range op.allocated {
if allocatedPort > lastPort {
lastPort = allocatedPort
}
}
if len(op.allocated) == 0 {
lastPort = 32768
}
// Try to find the next non allocated port.
// If too many ports are full, just reuse one,
// since this is just a dummy value.
for port := lastPort + 1; port < 100; port++ {
err := op.Allocate(port)
if err == nil {
return port, nil
}
}
op.allocated = append(op.allocated, lastPort+1)
return lastPort + 1, nil
}
port, err := op.pa.AllocateNext() port, err := op.pa.AllocateNext()
if err == nil { if err == nil {
op.allocated = append(op.allocated, port) op.allocated = append(op.allocated, port)

View File

@ -0,0 +1,117 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portallocator
import (
"testing"
"k8s.io/apimachinery/pkg/util/net"
)
// TestDryRunAllocate tests the Allocate function in dry run mode
func TestDryRunAllocate(t *testing.T) {
pr, err := net.ParsePortRange("10000-10200")
if err != nil {
t.Fatal(err)
}
// Allocate some ports before calling
previouslyAllocated := []int{10000, 10010, 10020}
r := NewPortAllocator(*pr)
for _, port := range previouslyAllocated {
_ = r.Allocate(port)
}
freeAtStart := r.Free()
// Do some allocations with a dry run operation
toAllocate := []int{
10000,
10030,
10030,
10040,
}
expectedErrors := []error{
ErrAllocated,
nil,
ErrAllocated,
nil,
}
op := StartOperation(r, true)
for i, port := range toAllocate {
err := op.Allocate(port)
if err != expectedErrors[i] {
t.Errorf("%v: expected error %v but got %v", i, expectedErrors[i], err)
}
}
// Make sure no port allocations were actually made by the dry run
freeAtEnd := r.Free()
if freeAtStart != freeAtEnd {
t.Errorf("expected %v free ports but got %v", freeAtStart, freeAtEnd)
}
}
// TestDryRunAllocateNext tests the AllocateNext function in dry run mode
func TestDryRunAllocateNext(t *testing.T) {
pr, err := net.ParsePortRange("10000-10200")
if err != nil {
t.Fatal(err)
}
// Allocate some ports before calling
previouslyAllocated := []int{10000, 10010, 10020}
r := NewPortAllocator(*pr)
for _, port := range previouslyAllocated {
_ = r.Allocate(port)
}
freeAtStart := r.Free()
// AllocateNext without a previously unused dry run operation
op := StartOperation(r, true)
port, err := op.AllocateNext()
if port == 0 {
t.Errorf("expected non zero port but got: %v", port)
}
if err != nil {
t.Errorf("expected no error but got: %v", err)
}
// Try to allocate the returned port using the same operation
if e, a := ErrAllocated, op.Allocate(port); e != a {
t.Errorf("expected %v but got: %v", e, a)
}
// AllocateNext with a previously used dry run operation
op = StartOperation(r, true)
_ = op.Allocate(12345)
port, err = op.AllocateNext()
if port == 0 {
t.Errorf("expected non zero port but got: %v", port)
}
if port == 12345 {
t.Errorf("expected port not to be 12345 but got %v", port)
}
if err != nil {
t.Errorf("expected no error but got: %v", err)
}
// Make sure no port allocations were actually made by the dry run
freeAtEnd := r.Free()
if freeAtStart != freeAtEnd {
t.Errorf("expected %v free ports but got %v", freeAtStart, freeAtEnd)
}
}

View File

@ -39,6 +39,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/registry/generic/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:go_default_library",
], ],
) )
@ -74,6 +75,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
], ],

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
apiservice "k8s.io/kubernetes/pkg/api/service" apiservice "k8s.io/kubernetes/pkg/api/service"
@ -170,13 +171,15 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
}() }()
var err error var err error
if service.Spec.Type != api.ServiceTypeExternalName { if !dryrun.IsDryRun(options.DryRun) {
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil { if service.Spec.Type != api.ServiceTypeExternalName {
return nil, err if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
return nil, err
}
} }
} }
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts) nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
defer nodePortOp.Finish() defer nodePortOp.Finish()
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
@ -222,13 +225,32 @@ func (rs *REST) Delete(ctx context.Context, id string, options *metav1.DeleteOpt
svc := obj.(*api.Service) svc := obj.(*api.Service)
// TODO: can leave dangling endpoints, and potentially return incorrect // Only perform the cleanup if this is a non-dryrun deletion
// endpoints if a new service is created with the same name if !dryrun.IsDryRun(options.DryRun) {
_, _, err = rs.endpoints.Delete(ctx, id, &metav1.DeleteOptions{}) // TODO: can leave dangling endpoints, and potentially return incorrect
if err != nil && !errors.IsNotFound(err) { // endpoints if a new service is created with the same name
return nil, false, err _, _, err = rs.endpoints.Delete(ctx, id, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return nil, false, err
}
rs.releaseAllocatedResources(svc)
} }
// TODO: this is duplicated from the generic storage, when this wrapper is fully removed we can drop this
details := &metav1.StatusDetails{
Name: svc.Name,
UID: svc.UID,
}
if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {
details.Group = info.APIGroup
details.Kind = info.Resource // legacy behavior
}
status := &metav1.Status{Status: metav1.StatusSuccess, Details: details}
return status, true, nil
}
func (rs *REST) releaseAllocatedResources(svc *api.Service) {
if helper.IsServiceIPSet(svc) { if helper.IsServiceIPSet(svc) {
rs.serviceIPs.Release(net.ParseIP(svc.Spec.ClusterIP)) rs.serviceIPs.Release(net.ParseIP(svc.Spec.ClusterIP))
} }
@ -251,18 +273,6 @@ func (rs *REST) Delete(ctx context.Context, id string, options *metav1.DeleteOpt
} }
} }
} }
// TODO: this is duplicated from the generic storage, when this wrapper is fully removed we can drop this
details := &metav1.StatusDetails{
Name: svc.Name,
UID: svc.UID,
}
if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {
details.Group = info.APIGroup
details.Kind = info.Resource // legacy behavior
}
status := &metav1.Status{Status: metav1.StatusSuccess, Details: details}
return status, true, nil
} }
// externalTrafficPolicyUpdate adjusts ExternalTrafficPolicy during service update if needed. // externalTrafficPolicyUpdate adjusts ExternalTrafficPolicy during service update if needed.
@ -358,19 +368,21 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj
} }
}() }()
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts) nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
defer nodePortOp.Finish() defer nodePortOp.Finish()
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP. if !dryrun.IsDryRun(options.DryRun) {
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName { // Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil { if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
return nil, false, err if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil {
return nil, false, err
}
} }
} // Update service from non-ExternalName to ExternalName, should release ClusterIP if exists.
// Update service from non-ExternalName to ExternalName, should release ClusterIP if exists. if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName {
if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName { if helper.IsServiceIPSet(oldService) {
if helper.IsServiceIPSet(oldService) { rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP))
rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP)) }
} }
} }
// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists. // Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/api/service"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
@ -108,6 +109,9 @@ func (s *serviceStorage) New() runtime.Object {
} }
func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if dryrun.IsDryRun(options.DryRun) {
return obj, s.Err
}
svc := obj.(*api.Service) svc := obj.(*api.Service)
s.CreatedID = obj.(metav1.Object).GetName() s.CreatedID = obj.(metav1.Object).GetName()
s.Service = svc.DeepCopy() s.Service = svc.DeepCopy()
@ -121,17 +125,21 @@ func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createV
} }
func (s *serviceStorage) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (s *serviceStorage) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
s.UpdatedID = name
obj, err := objInfo.UpdatedObject(ctx, s.OldService) obj, err := objInfo.UpdatedObject(ctx, s.OldService)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
s.Service = obj.(*api.Service) if !dryrun.IsDryRun(options.DryRun) {
return s.Service, s.Created, s.Err s.UpdatedID = name
s.Service = obj.(*api.Service)
}
return obj, s.Created, s.Err
} }
func (s *serviceStorage) Delete(ctx context.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) { func (s *serviceStorage) Delete(ctx context.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
s.DeletedID = name if !dryrun.IsDryRun(options.DryRun) {
s.DeletedID = name
}
return s.Service, s.DeletedImmediately, s.Err return s.Service, s.DeletedImmediately, s.Err
} }
@ -279,6 +287,154 @@ func TestServiceRegistryCreate(t *testing.T) {
} }
} }
func TestServiceRegistryCreateDryRun(t *testing.T) {
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
// Test dry run create request with cluster ip
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
ctx := genericapirequest.NewDefaultContext()
_, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if storage.serviceIPs.Has(net.ParseIP("1.2.3.4")) {
t.Errorf("unexpected side effect: ip allocated")
}
srv, err := registry.GetService(ctx, svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv != nil {
t.Errorf("unexpected service found: %v", srv)
}
// Test dry run create request with a node port
svc = &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{{
NodePort: 30010,
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
_, err = storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if storage.serviceNodePorts.Has(30010) {
t.Errorf("unexpected side effect: NodePort allocated")
}
srv, err = registry.GetService(ctx, svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv != nil {
t.Errorf("unexpected service found: %v", srv)
}
// Test dry run create request with multi node port
svc = &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-tcp",
Port: 53,
NodePort: 30053,
TargetPort: intstr.FromInt(6503),
Protocol: api.ProtocolTCP,
},
{
Name: "port-udp",
Port: 53,
NodePort: 30053,
TargetPort: intstr.FromInt(6503),
Protocol: api.ProtocolUDP,
},
},
},
}
expectNodePorts := collectServiceNodePorts(svc)
created_svc, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
created_service := created_svc.(*api.Service)
serviceNodePorts := collectServiceNodePorts(created_service)
if !reflect.DeepEqual(serviceNodePorts, expectNodePorts) {
t.Errorf("Expected %v, but got %v", expectNodePorts, serviceNodePorts)
}
if storage.serviceNodePorts.Has(30053) {
t.Errorf("unexpected side effect: NodePort allocated")
}
srv, err = registry.GetService(ctx, svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv != nil {
t.Errorf("unexpected service found: %v", srv)
}
// Test dry run create request with multiple unspecified node ports,
// so PortAllocationOperation.AllocateNext() will be called multiple times.
svc = &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-a",
Port: 53,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6503),
},
{
Name: "port-b",
Port: 54,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6504),
},
},
},
}
created_svc, err = storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
created_service = created_svc.(*api.Service)
serviceNodePorts = collectServiceNodePorts(created_service)
if len(serviceNodePorts) != 2 {
t.Errorf("Expected service to have 2 ports, but got %v", serviceNodePorts)
} else if serviceNodePorts[0] == serviceNodePorts[1] {
t.Errorf("Expected unique port numbers, but got %v", serviceNodePorts)
}
}
func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) { func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) {
storage, registry, server := NewTestREST(t, nil) storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t) defer server.Terminate(t)
@ -515,6 +671,171 @@ func TestServiceRegistryUpdate(t *testing.T) {
} }
} }
func TestServiceRegistryUpdateDryRun(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
obj, err := registry.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", Namespace: metav1.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeExternalName,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
svc := obj.(*api.Service)
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
// Test dry run update request external name to node port
updated_svc, created, err := storage.Update(ctx, svc.Name, rest.DefaultUpdatedObjectInfo(&api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
ResourceVersion: svc.ResourceVersion},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{{
NodePort: 30020,
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if updated_svc == nil {
t.Errorf("Expected non-nil object")
}
if created {
t.Errorf("expected not created")
}
if storage.serviceNodePorts.Has(30020) {
t.Errorf("unexpected side effect: NodePort allocated")
}
if e, a := "", registry.UpdatedID; e != a {
t.Errorf("Expected %q, but got %q", e, a)
}
// Test dry run update request external name to cluster ip
_, _, err = storage.Update(ctx, svc.Name, rest.DefaultUpdatedObjectInfo(&api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
ResourceVersion: svc.ResourceVersion},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if storage.serviceIPs.Has(net.ParseIP("1.2.3.4")) {
t.Errorf("unexpected side effect: ip allocated")
}
// Test dry run update request remove node port
obj, err = storage.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", Namespace: metav1.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{{
NodePort: 30020,
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
svc = obj.(*api.Service)
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
_, _, err = storage.Update(ctx, svc.Name, rest.DefaultUpdatedObjectInfo(&api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
ResourceVersion: svc.ResourceVersion},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeExternalName,
ExternalName: "foo-svc",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if !storage.serviceNodePorts.Has(30020) {
t.Errorf("unexpected side effect: NodePort unallocated")
}
// Test dry run update request remove cluster ip
obj, err = storage.Create(ctx, &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", Namespace: metav1.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
svc = obj.(*api.Service)
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
_, _, err = storage.Update(ctx, svc.Name, rest.DefaultUpdatedObjectInfo(&api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
ResourceVersion: svc.ResourceVersion},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeExternalName,
ExternalName: "foo-svc",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if !storage.serviceIPs.Has(net.ParseIP("1.2.3.4")) {
t.Errorf("unexpected side effect: ip unallocated")
}
}
func TestServiceStorageValidatesUpdate(t *testing.T) { func TestServiceStorageValidatesUpdate(t *testing.T) {
ctx := genericapirequest.NewDefaultContext() ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil) storage, registry, server := NewTestREST(t, nil)
@ -630,6 +951,72 @@ func TestServiceRegistryDelete(t *testing.T) {
} }
} }
func TestServiceRegistryDeleteDryRun(t *testing.T) {
ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil)
defer server.Terminate(t)
// Test dry run delete request with cluster ip
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
_, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
_, _, err = storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if e, a := "", registry.DeletedID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if !storage.serviceIPs.Has(net.ParseIP("1.2.3.4")) {
t.Errorf("unexpected side effect: ip unallocated")
}
// Test dry run delete request with node port
svc = &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo2"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{{
NodePort: 30030,
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
_, err = storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
_, _, err = storage.Delete(ctx, svc.Name, &metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}})
if err != nil {
t.Fatalf("Expected no error: %v", err)
}
if e, a := "", registry.DeletedID; e != a {
t.Errorf("Expected %v, but got %v", e, a)
}
if !storage.serviceNodePorts.Has(30030) {
t.Errorf("unexpected side effect: NodePort unallocated")
}
}
func TestServiceRegistryDeleteExternal(t *testing.T) { func TestServiceRegistryDeleteExternal(t *testing.T) {
ctx := genericapirequest.NewDefaultContext() ctx := genericapirequest.NewDefaultContext()
storage, registry, server := NewTestREST(t, nil) storage, registry, server := NewTestREST(t, nil)
@ -1436,7 +1823,7 @@ func TestInitClusterIP(t *testing.T) {
func TestInitNodePorts(t *testing.T) { func TestInitNodePorts(t *testing.T) {
storage, _, server := NewTestREST(t, nil) storage, _, server := NewTestREST(t, nil)
defer server.Terminate(t) defer server.Terminate(t)
nodePortOp := portallocator.StartOperation(storage.serviceNodePorts) nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false)
defer nodePortOp.Finish() defer nodePortOp.Finish()
testCases := []struct { testCases := []struct {
@ -1618,7 +2005,7 @@ func TestInitNodePorts(t *testing.T) {
func TestUpdateNodePorts(t *testing.T) { func TestUpdateNodePorts(t *testing.T) {
storage, _, server := NewTestREST(t, nil) storage, _, server := NewTestREST(t, nil)
defer server.Terminate(t) defer server.Terminate(t)
nodePortOp := portallocator.StartOperation(storage.serviceNodePorts) nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false)
defer nodePortOp.Finish() defer nodePortOp.Finish()
testCases := []struct { testCases := []struct {