Merge pull request #13347 from nikhiljindal/expDeployment

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-08-31 14:31:23 -07:00
commit 044e4854e1
7 changed files with 600 additions and 2 deletions

View File

@ -445,7 +445,7 @@ func (t *Tester) testUpdateInvokesValidation(obj runtime.Object, setFn SetFunc,
toUpdate := update(copyOrDie(foo))
got, created, err := t.storage.(rest.Updater).Update(t.TestContext(), toUpdate)
if got != nil || created {
t.Errorf("expected nil object and no creation")
t.Errorf("expected nil object and no creation for object: %v", toUpdate)
}
if !errors.IsInvalid(err) && !errors.IsBadRequest(err) {
t.Errorf("expected invalid or bad request error, got %v", err)

View File

@ -212,7 +212,9 @@ func convert_v1_DeploymentSpec_To_expapi_DeploymentSpec(in *DeploymentSpec, out
if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found {
defaulting.(func(*DeploymentSpec))(in)
}
out.Replicas = *in.Replicas
if in.Replicas != nil {
out.Replicas = *in.Replicas
}
if in.Selector != nil {
out.Selector = make(map[string]string)
for key, val := range in.Selector {

View File

@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/registry/componentstatus"
controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd"
deploymentetcd "k8s.io/kubernetes/pkg/registry/deployment/etcd"
"k8s.io/kubernetes/pkg/registry/endpoint"
endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
eventetcd "k8s.io/kubernetes/pkg/registry/event/etcd"
@ -770,6 +771,7 @@ func (m *Master) expapi(c *Config) *apiserver.APIGroupVersion {
autoscalerStorage := horizontalpodautoscaleretcd.NewREST(c.ExpDatabaseStorage)
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(c.ExpDatabaseStorage)
daemonStorage := daemonetcd.NewREST(c.ExpDatabaseStorage)
deploymentStorage := deploymentetcd.NewREST(c.ExpDatabaseStorage)
storage := map[string]rest.Storage{
strings.ToLower("replicationControllers"): controllerStorage.ReplicationController,
@ -777,6 +779,7 @@ func (m *Master) expapi(c *Config) *apiserver.APIGroupVersion {
strings.ToLower("horizontalpodautoscalers"): autoscalerStorage,
strings.ToLower("thirdpartyresources"): thirdPartyResourceStorage,
strings.ToLower("daemons"): daemonStorage,
strings.ToLower("deployments"): deploymentStorage,
}
return &apiserver.APIGroupVersion{

View File

@ -0,0 +1,17 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment

View File

@ -0,0 +1,71 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/deployment"
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
*etcdgeneric.Etcd
}
// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(s storage.Interface) *REST {
prefix := "/deployments"
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &expapi.Deployment{} },
// NewListFunc returns an object capable of storing results of an etcd list.
NewListFunc: func() runtime.Object { return &expapi.DeploymentList{} },
// Produces a path that etcd understands, to the root of the resource
// by combining the namespace in the context with the given prefix.
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
},
// Produces a path that etcd understands, to the resource by combining
// the namespace in the context with the given prefix.
KeyFunc: func(ctx api.Context, name string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name)
},
// Retrieve the name field of a deployment.
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*expapi.Deployment).Name, nil
},
// Used to match objects based on labels/fields for list.
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return deployment.MatchDeployment(label, field)
},
EndpointName: "deployments",
// Used to validate deployment creation.
CreateStrategy: deployment.Strategy,
// Used to validate deployment updates.
UpdateStrategy: deployment.Strategy,
Storage: s,
}
return &REST{store}
}

View File

@ -0,0 +1,408 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"testing"
"time"
"github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest/resttest"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/tools/etcdtest"
)
const (
PASS = iota
FAIL
)
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t)
return NewREST(etcdStorage), fakeClient
}
func validNewDeployment() *expapi.Deployment {
return &expapi.Deployment{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: expapi.DeploymentSpec{
Selector: map[string]string{"a": "b"},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"a": "b"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "test",
Image: "test_image",
ImagePullPolicy: api.PullIfNotPresent,
},
},
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
},
},
},
}
}
var validDeployment = *validNewDeployment()
func TestCreate(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
deployment := validNewDeployment()
deployment.ObjectMeta = api.ObjectMeta{}
test.TestCreate(
// valid
deployment,
func(ctx api.Context, obj runtime.Object) error {
return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj)
},
func(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
return registrytest.GetObject(fakeClient, storage.KeyFunc, storage.NewFunc, ctx, obj)
},
// invalid (invalid selector)
&expapi.Deployment{
Spec: expapi.DeploymentSpec{
Selector: map[string]string{},
Template: validDeployment.Spec.Template,
},
},
)
}
func TestUpdate(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
test.TestUpdate(
// valid
validNewDeployment(),
func(ctx api.Context, obj runtime.Object) error {
return registrytest.SetObject(fakeClient, storage.KeyFunc, ctx, obj)
},
func(resourceVersion uint64) {
registrytest.SetResourceVersion(fakeClient, resourceVersion)
},
func(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
return registrytest.GetObject(fakeClient, storage.KeyFunc, storage.NewFunc, ctx, obj)
},
// updateFunc
func(obj runtime.Object) runtime.Object {
object := obj.(*expapi.Deployment)
object.Spec.Template.Spec.NodeSelector = map[string]string{"c": "d"}
return object
},
// invalid updateFunc
func(obj runtime.Object) runtime.Object {
object := obj.(*expapi.Deployment)
object.UID = "newUID"
return object
},
func(obj runtime.Object) runtime.Object {
object := obj.(*expapi.Deployment)
object.Name = ""
return object
},
func(obj runtime.Object) runtime.Object {
object := obj.(*expapi.Deployment)
object.Spec.Template.Spec.RestartPolicy = api.RestartPolicyOnFailure
return object
},
func(obj runtime.Object) runtime.Object {
object := obj.(*expapi.Deployment)
object.Spec.Selector = map[string]string{}
return object
},
)
}
func TestEtcdGet(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
test.TestGet(validNewDeployment())
}
func TestEtcdList(t *testing.T) {
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
key := etcdtest.AddPrefix(storage.KeyRootFunc(test.TestContext()))
test.TestList(
validNewDeployment(),
func(objects []runtime.Object) []runtime.Object {
return registrytest.SetObjectsForKey(fakeClient, key, objects)
},
func(resourceVersion uint64) {
registrytest.SetResourceVersion(fakeClient, resourceVersion)
})
}
func TestEtcdDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
key, err := storage.KeyFunc(ctx, validDeployment.Name)
key = etcdtest.AddPrefix(key)
fakeClient.Set(key, runtime.EncodeOrDie(testapi.Codec(), validNewDeployment()), 0)
obj, err := storage.Delete(ctx, validDeployment.Name, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if status, ok := obj.(*api.Status); !ok {
t.Errorf("Expected status of delete, got %#v", status)
} else if status.Status != api.StatusSuccess {
t.Errorf("Expected success, got %#v", status.Status)
}
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
}
func TestEtcdWatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
watching, err := storage.Watch(ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
// Tests that we can watch for the creation of Deployment with specified labels.
func TestEtcdWatchWithLabels(t *testing.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), validDeployment.Namespace)
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(validDeployment.Spec.Selector),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
// The watcher above is waiting for these Labels, on receiving them it should
// apply the deploymentStatus decorator, which lists pods, causing a query against
// the /registry/pods endpoint of the etcd client.
deployment := &expapi.Deployment{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: validDeployment.Spec.Selector,
Namespace: "default",
},
}
deploymentBytes, _ := testapi.Codec().Encode(deployment)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(deploymentBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}
// Tests that we can watch for Deployment with specified fields.
func TestEtcdWatchWithFields(t *testing.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), validDeployment.Namespace)
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
testFieldMap := map[int][]fields.Set{
PASS: {
{"metadata.name": "foo"},
},
FAIL: {
{"metadata.name": "bar"},
{"name": "foo"},
},
}
testEtcdActions := []string{
etcdstorage.EtcdCreate,
etcdstorage.EtcdSet,
etcdstorage.EtcdCAS,
etcdstorage.EtcdDelete}
deployment := &expapi.Deployment{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: validDeployment.Spec.Selector,
Namespace: "default",
},
Status: expapi.DeploymentStatus{
Replicas: 1,
UpdatedReplicas: 4,
},
}
deploymentBytes, _ := testapi.Codec().Encode(deployment)
for expectedResult, fieldSet := range testFieldMap {
for _, field := range fieldSet {
for _, action := range testEtcdActions {
watching, err := storage.Watch(ctx,
labels.Everything(),
field.AsSelector(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var prevNode *etcd.Node = nil
node := &etcd.Node{
Value: string(deploymentBytes),
}
if action == etcdstorage.EtcdDelete {
prevNode = node
}
fakeClient.WaitForWatchCompletion()
fakeClient.WatchResponse <- &etcd.Response{
Action: action,
Node: node,
PrevNode: prevNode,
}
select {
case r, ok := <-watching.ResultChan():
if expectedResult == FAIL {
t.Errorf("Unexpected result from channel %#v. Field: %v", r, field)
}
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
if expectedResult == PASS {
t.Error("unexpected timeout from result channel")
}
}
watching.Stop()
}
}
}
}
func TestEtcdWatchNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
watching, err := storage.Watch(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
deployment := &expapi.Deployment{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
deploymentBytes, _ := testapi.Codec().Encode(deployment)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(deploymentBytes),
},
}
select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}
func TestDelete(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
test := resttest.New(t, storage, fakeClient.SetError)
key, _ := storage.KeyFunc(ctx, validDeployment.Name)
key = etcdtest.AddPrefix(key)
createFn := func() runtime.Object {
dc := validNewDeployment()
dc.ResourceVersion = "1"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), dc),
ModifiedIndex: 1,
},
},
}
return dc
}
gracefulSetFn := func() bool {
// If the deployment is still around after trying to delete either the delete
// failed, or we're deleting it gracefully.
if fakeClient.Data[key].R.Node != nil {
return true
}
return false
}
test.TestDelete(createFn, gracefulSetFn)
}

View File

@ -0,0 +1,97 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/expapi/validation"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
errs "k8s.io/kubernetes/pkg/util/fielderrors"
)
// deploymentStrategy implements behavior for Deployments.
type deploymentStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// Strategy is the default logic that applies when creating and updating Deployment
// objects via the REST API.
var Strategy = deploymentStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped is true for deployment.
func (deploymentStrategy) NamespaceScoped() bool {
return true
}
// PrepareForCreate clears fields that are not allowed to be set by end users on creation.
func (deploymentStrategy) PrepareForCreate(obj runtime.Object) {
}
// Validate validates a new deployment.
func (deploymentStrategy) Validate(ctx api.Context, obj runtime.Object) errs.ValidationErrorList {
deployment := obj.(*expapi.Deployment)
return validation.ValidateDeployment(deployment)
}
// AllowCreateOnUpdate is false for deployments.
func (deploymentStrategy) AllowCreateOnUpdate() bool {
return false
}
// PrepareForUpdate clears fields that are not allowed to be set by end users on update.
func (deploymentStrategy) PrepareForUpdate(obj, old runtime.Object) {
}
// ValidateUpdate is the default update validation for an end user.
func (deploymentStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) errs.ValidationErrorList {
return validation.ValidateDeploymentUpdate(old.(*expapi.Deployment), obj.(*expapi.Deployment))
}
func (deploymentStrategy) AllowUnconditionalUpdate() bool {
return true
}
// DeploymentToSelectableFields returns a field set that represents the object.
func DeploymentToSelectableFields(deployment *expapi.Deployment) fields.Set {
return fields.Set{
"metadata.name": deployment.Name,
}
}
// MatchDeployment is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchDeployment(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
deployment, ok := obj.(*expapi.Deployment)
if !ok {
return nil, nil, fmt.Errorf("given object is not a deployment.")
}
return labels.Set(deployment.ObjectMeta.Labels), DeploymentToSelectableFields(deployment), nil
},
}
}