diff --git a/pkg/api/rest/resttest/resttest.go b/pkg/api/rest/resttest/resttest.go index 5edfe698f3..0a232be3cb 100644 --- a/pkg/api/rest/resttest/resttest.go +++ b/pkg/api/rest/resttest/resttest.go @@ -445,7 +445,7 @@ func (t *Tester) testUpdateInvokesValidation(obj runtime.Object, setFn SetFunc, toUpdate := update(copyOrDie(foo)) got, created, err := t.storage.(rest.Updater).Update(t.TestContext(), toUpdate) if got != nil || created { - t.Errorf("expected nil object and no creation") + t.Errorf("expected nil object and no creation for object: %v", toUpdate) } if !errors.IsInvalid(err) && !errors.IsBadRequest(err) { t.Errorf("expected invalid or bad request error, got %v", err) diff --git a/pkg/expapi/v1/conversion.go b/pkg/expapi/v1/conversion.go index 1522625d0c..b3b5888ed3 100644 --- a/pkg/expapi/v1/conversion.go +++ b/pkg/expapi/v1/conversion.go @@ -212,7 +212,9 @@ func convert_v1_DeploymentSpec_To_expapi_DeploymentSpec(in *DeploymentSpec, out if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { defaulting.(func(*DeploymentSpec))(in) } - out.Replicas = *in.Replicas + if in.Replicas != nil { + out.Replicas = *in.Replicas + } if in.Selector != nil { out.Selector = make(map[string]string) for key, val := range in.Selector { diff --git a/pkg/master/master.go b/pkg/master/master.go index 378e2a65f5..f0a40edf02 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/registry/componentstatus" controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd" + deploymentetcd "k8s.io/kubernetes/pkg/registry/deployment/etcd" "k8s.io/kubernetes/pkg/registry/endpoint" endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd" eventetcd "k8s.io/kubernetes/pkg/registry/event/etcd" @@ -770,6 +771,7 @@ func (m *Master) expapi(c *Config) *apiserver.APIGroupVersion { autoscalerStorage := horizontalpodautoscaleretcd.NewREST(c.ExpDatabaseStorage) thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(c.ExpDatabaseStorage) daemonStorage := daemonetcd.NewREST(c.ExpDatabaseStorage) + deploymentStorage := deploymentetcd.NewREST(c.ExpDatabaseStorage) storage := map[string]rest.Storage{ strings.ToLower("replicationControllers"): controllerStorage.ReplicationController, @@ -777,6 +779,7 @@ func (m *Master) expapi(c *Config) *apiserver.APIGroupVersion { strings.ToLower("horizontalpodautoscalers"): autoscalerStorage, strings.ToLower("thirdpartyresources"): thirdPartyResourceStorage, strings.ToLower("daemons"): daemonStorage, + strings.ToLower("deployments"): deploymentStorage, } return &apiserver.APIGroupVersion{ diff --git a/pkg/registry/deployment/doc.go b/pkg/registry/deployment/doc.go new file mode 100644 index 0000000000..184fa30d3a --- /dev/null +++ b/pkg/registry/deployment/doc.go @@ -0,0 +1,17 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment diff --git a/pkg/registry/deployment/etcd/etcd.go b/pkg/registry/deployment/etcd/etcd.go new file mode 100644 index 0000000000..114579d839 --- /dev/null +++ b/pkg/registry/deployment/etcd/etcd.go @@ -0,0 +1,71 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/expapi" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/registry/deployment" + "k8s.io/kubernetes/pkg/registry/generic" + etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" +) + +type REST struct { + *etcdgeneric.Etcd +} + +// NewREST returns a RESTStorage object that will work against deployments. +func NewREST(s storage.Interface) *REST { + prefix := "/deployments" + store := &etcdgeneric.Etcd{ + NewFunc: func() runtime.Object { return &expapi.Deployment{} }, + // NewListFunc returns an object capable of storing results of an etcd list. + NewListFunc: func() runtime.Object { return &expapi.DeploymentList{} }, + // Produces a path that etcd understands, to the root of the resource + // by combining the namespace in the context with the given prefix. + KeyRootFunc: func(ctx api.Context) string { + return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix) + }, + // Produces a path that etcd understands, to the resource by combining + // the namespace in the context with the given prefix. + KeyFunc: func(ctx api.Context, name string) (string, error) { + return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name) + }, + // Retrieve the name field of a deployment. + ObjectNameFunc: func(obj runtime.Object) (string, error) { + return obj.(*expapi.Deployment).Name, nil + }, + // Used to match objects based on labels/fields for list. + PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { + return deployment.MatchDeployment(label, field) + }, + EndpointName: "deployments", + + // Used to validate deployment creation. + CreateStrategy: deployment.Strategy, + + // Used to validate deployment updates. + UpdateStrategy: deployment.Strategy, + + Storage: s, + } + return &REST{store} +} diff --git a/pkg/registry/deployment/etcd/etcd_test.go b/pkg/registry/deployment/etcd/etcd_test.go new file mode 100755 index 0000000000..c9c35e955c --- /dev/null +++ b/pkg/registry/deployment/etcd/etcd_test.go @@ -0,0 +1,408 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/rest/resttest" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/expapi" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" + "k8s.io/kubernetes/pkg/registry/registrytest" + "k8s.io/kubernetes/pkg/runtime" + etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/tools" + "k8s.io/kubernetes/pkg/tools/etcdtest" +) + +const ( + PASS = iota + FAIL +) + +func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { + etcdStorage, fakeClient := registrytest.NewEtcdStorage(t) + return NewREST(etcdStorage), fakeClient +} + +func validNewDeployment() *expapi.Deployment { + return &expapi.Deployment{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: expapi.DeploymentSpec{ + Selector: map[string]string{"a": "b"}, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{"a": "b"}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "test", + Image: "test_image", + ImagePullPolicy: api.PullIfNotPresent, + }, + }, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + }, + }, + }, + } +} + +var validDeployment = *validNewDeployment() + +func TestCreate(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + deployment := validNewDeployment() + deployment.ObjectMeta = api.ObjectMeta{} + test.TestCreate( + // valid + deployment, + func(ctx api.Context, obj runtime.Object) error { + return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj) + }, + func(ctx api.Context, obj runtime.Object) (runtime.Object, error) { + return registrytest.GetObject(fakeClient, storage.KeyFunc, storage.NewFunc, ctx, obj) + }, + // invalid (invalid selector) + &expapi.Deployment{ + Spec: expapi.DeploymentSpec{ + Selector: map[string]string{}, + Template: validDeployment.Spec.Template, + }, + }, + ) +} + +func TestUpdate(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + test.TestUpdate( + // valid + validNewDeployment(), + func(ctx api.Context, obj runtime.Object) error { + return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj) + }, + func(resourceVersion uint64) { + registrytest.SetResourceVersion(fakeClient, resourceVersion) + }, + func(ctx api.Context, obj runtime.Object) (runtime.Object, error) { + return registrytest.GetObject(fakeClient, storage.KeyFunc, storage.NewFunc, ctx, obj) + }, + // updateFunc + func(obj runtime.Object) runtime.Object { + object := obj.(*expapi.Deployment) + object.Spec.Template.Spec.NodeSelector = map[string]string{"c": "d"} + return object + }, + // invalid updateFunc + func(obj runtime.Object) runtime.Object { + object := obj.(*expapi.Deployment) + object.UID = "newUID" + return object + }, + func(obj runtime.Object) runtime.Object { + object := obj.(*expapi.Deployment) + object.Name = "" + return object + }, + func(obj runtime.Object) runtime.Object { + object := obj.(*expapi.Deployment) + object.Spec.Template.Spec.RestartPolicy = api.RestartPolicyOnFailure + return object + }, + func(obj runtime.Object) runtime.Object { + object := obj.(*expapi.Deployment) + object.Spec.Selector = map[string]string{} + return object + }, + ) +} + +func TestEtcdGet(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + test.TestGet(validNewDeployment()) +} + +func TestEtcdList(t *testing.T) { + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + key := etcdtest.AddPrefix(storage.KeyRootFunc(test.TestContext())) + test.TestList( + validNewDeployment(), + func(objects []runtime.Object) []runtime.Object { + return registrytest.SetObjectsForKey(fakeClient, key, objects) + }, + func(resourceVersion uint64) { + registrytest.SetResourceVersion(fakeClient, resourceVersion) + }) +} + +func TestEtcdDelete(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + key, err := storage.KeyFunc(ctx, validDeployment.Name) + key = etcdtest.AddPrefix(key) + + fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewDeployment()), 0) + obj, err := storage.Delete(ctx, validDeployment.Name, nil) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if status, ok := obj.(*api.Status); !ok { + t.Errorf("Expected status of delete, got %#v", status) + } else if status.Status != api.StatusSuccess { + t.Errorf("Expected success, got %#v", status.Status) + } + if len(fakeClient.DeletedKeys) != 1 { + t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) + } + if fakeClient.DeletedKeys[0] != key { + t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) + } +} + +func TestEtcdWatch(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + watching, err := storage.Watch(ctx, + labels.Everything(), + fields.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +// Tests that we can watch for the creation of Deployment with specified labels. +func TestEtcdWatchWithLabels(t *testing.T) { + ctx := api.WithNamespace(api.NewDefaultContext(), validDeployment.Namespace) + storage, fakeClient := newStorage(t) + fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) + + watching, err := storage.Watch(ctx, + labels.SelectorFromSet(validDeployment.Spec.Selector), + fields.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + // The watcher above is waiting for these Labels, on receiving them it should + // apply the deploymentStatus decorator, which lists pods, causing a query against + // the /registry/pods endpoint of the etcd client. + deployment := &expapi.Deployment{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: validDeployment.Spec.Selector, + Namespace: "default", + }, + } + deploymentBytes, _ := testapi.Codec().Encode(deployment) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(deploymentBytes), + }, + } + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + t.Error("unexpected timeout from result channel") + } + watching.Stop() +} + +// Tests that we can watch for Deployment with specified fields. +func TestEtcdWatchWithFields(t *testing.T) { + ctx := api.WithNamespace(api.NewDefaultContext(), validDeployment.Namespace) + storage, fakeClient := newStorage(t) + fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) + + testFieldMap := map[int][]fields.Set{ + PASS: { + {"metadata.name": "foo"}, + }, + FAIL: { + {"metadata.name": "bar"}, + {"name": "foo"}, + }, + } + testEtcdActions := []string{ + etcdstorage.EtcdCreate, + etcdstorage.EtcdSet, + etcdstorage.EtcdCAS, + etcdstorage.EtcdDelete} + + deployment := &expapi.Deployment{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: validDeployment.Spec.Selector, + Namespace: "default", + }, + Status: expapi.DeploymentStatus{ + Replicas: 1, + UpdatedReplicas: 4, + }, + } + deploymentBytes, _ := testapi.Codec().Encode(deployment) + + for expectedResult, fieldSet := range testFieldMap { + for _, field := range fieldSet { + for _, action := range testEtcdActions { + watching, err := storage.Watch(ctx, + labels.Everything(), + field.AsSelector(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + var prevNode *etcd.Node = nil + node := &etcd.Node{ + Value: string(deploymentBytes), + } + if action == etcdstorage.EtcdDelete { + prevNode = node + } + fakeClient.WaitForWatchCompletion() + fakeClient.WatchResponse <- &etcd.Response{ + Action: action, + Node: node, + PrevNode: prevNode, + } + + select { + case r, ok := <-watching.ResultChan(): + if expectedResult == FAIL { + t.Errorf("Unexpected result from channel %#v. Field: %v", r, field) + } + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + if expectedResult == PASS { + t.Error("unexpected timeout from result channel") + } + } + watching.Stop() + } + } + } +} + +func TestEtcdWatchNotMatch(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) + + watching, err := storage.Watch(ctx, + labels.SelectorFromSet(labels.Set{"name": "foo"}), + fields.Everything(), + "1", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + deployment := &expapi.Deployment{ + ObjectMeta: api.ObjectMeta{ + Name: "bar", + Labels: map[string]string{ + "name": "bar", + }, + }, + } + deploymentBytes, _ := testapi.Codec().Encode(deployment) + fakeClient.WatchResponse <- &etcd.Response{ + Action: "create", + Node: &etcd.Node{ + Value: string(deploymentBytes), + }, + } + + select { + case <-watching.ResultChan(): + t.Error("unexpected result from result channel") + case <-time.After(time.Millisecond * 100): + // expected case + } +} + +func TestDelete(t *testing.T) { + ctx := api.NewDefaultContext() + storage, fakeClient := newStorage(t) + test := resttest.New(t, storage, fakeClient.SetError) + key, _ := storage.KeyFunc(ctx, validDeployment.Name) + key = etcdtest.AddPrefix(key) + + createFn := func() runtime.Object { + dc := validNewDeployment() + dc.ResourceVersion = "1" + fakeClient.Data[key] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), dc), + ModifiedIndex: 1, + }, + }, + } + return dc + } + gracefulSetFn := func() bool { + // If the deployment is still around after trying to delete either the delete + // failed, or we're deleting it gracefully. + if fakeClient.Data[key].R.Node != nil { + return true + } + return false + } + test.TestDelete(createFn, gracefulSetFn) +} diff --git a/pkg/registry/deployment/strategy.go b/pkg/registry/deployment/strategy.go new file mode 100644 index 0000000000..d00db2f0f5 --- /dev/null +++ b/pkg/registry/deployment/strategy.go @@ -0,0 +1,97 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/expapi" + "k8s.io/kubernetes/pkg/expapi/validation" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/registry/generic" + "k8s.io/kubernetes/pkg/runtime" + errs "k8s.io/kubernetes/pkg/util/fielderrors" +) + +// deploymentStrategy implements behavior for Deployments. +type deploymentStrategy struct { + runtime.ObjectTyper + api.NameGenerator +} + +// Strategy is the default logic that applies when creating and updating Deployment +// objects via the REST API. +var Strategy = deploymentStrategy{api.Scheme, api.SimpleNameGenerator} + +// NamespaceScoped is true for deployment. +func (deploymentStrategy) NamespaceScoped() bool { + return true +} + +// PrepareForCreate clears fields that are not allowed to be set by end users on creation. +func (deploymentStrategy) PrepareForCreate(obj runtime.Object) { +} + +// Validate validates a new deployment. +func (deploymentStrategy) Validate(ctx api.Context, obj runtime.Object) errs.ValidationErrorList { + deployment := obj.(*expapi.Deployment) + return validation.ValidateDeployment(deployment) +} + +// AllowCreateOnUpdate is false for deployments. +func (deploymentStrategy) AllowCreateOnUpdate() bool { + return false +} + +// PrepareForUpdate clears fields that are not allowed to be set by end users on update. +func (deploymentStrategy) PrepareForUpdate(obj, old runtime.Object) { +} + +// ValidateUpdate is the default update validation for an end user. +func (deploymentStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) errs.ValidationErrorList { + return validation.ValidateDeploymentUpdate(old.(*expapi.Deployment), obj.(*expapi.Deployment)) +} + +func (deploymentStrategy) AllowUnconditionalUpdate() bool { + return true +} + +// DeploymentToSelectableFields returns a field set that represents the object. +func DeploymentToSelectableFields(deployment *expapi.Deployment) fields.Set { + return fields.Set{ + "metadata.name": deployment.Name, + } +} + +// MatchDeployment is the filter used by the generic etcd backend to route +// watch events from etcd to clients of the apiserver only interested in specific +// labels/fields. +func MatchDeployment(label labels.Selector, field fields.Selector) generic.Matcher { + return &generic.SelectionPredicate{ + Label: label, + Field: field, + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + deployment, ok := obj.(*expapi.Deployment) + if !ok { + return nil, nil, fmt.Errorf("given object is not a deployment.") + } + return labels.Set(deployment.ObjectMeta.Labels), DeploymentToSelectableFields(deployment), nil + }, + } +}