diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 8c6c1dde45..7a9eef7d25 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -41,7 +41,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" - replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/replication" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0454c32417..9f2d06b2ac 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -38,7 +38,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/routecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" - replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/replication" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index adbdd565a7..c664fa7c50 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -35,7 +35,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" - "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/replication" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" @@ -134,7 +134,7 @@ func runControllerManager(cl *client.Client) { endpoints := service.NewEndpointController(cl) go endpoints.Run(5, util.NeverStop) - controllerManager := controller.NewReplicationManager(cl, controller.BurstReplicas) + controllerManager := replication.NewReplicationManager(cl, replication.BurstReplicas) go controllerManager.Run(5, util.NeverStop) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 4520aa7f5d..ac12fe98ef 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -32,7 +32,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/routecontroller" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller" - "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/replication" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/namespace" "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" @@ -110,7 +110,7 @@ func (s *CMServer) Run(_ []string) error { endpoints := s.createEndpointController(kubeClient) go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) - controllerManager := controller.NewReplicationManager(kubeClient, controller.BurstReplicas) + controllerManager := replication.NewReplicationManager(kubeClient, replication.BurstReplicas) go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) //TODO(jdef) should eventually support more cloud providers here diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index f5fb686800..cee971b28c 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -18,6 +18,7 @@ package controller import ( "fmt" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" @@ -25,83 +26,99 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/golang/glog" "sync/atomic" ) const ( CreatedByAnnotation = "kubernetes.io/created-by" - updateRetries = 1 + + // If a watch drops a delete event for a pod, it'll take this long + // before a dormant controller waiting for those packets is woken up anyway. It is + // specifically targeted at the case where some problem prevents an update + // of expectations, without it the controller could stay asleep forever. This should + // be set based on the expected latency of watch events. + // + // Currently an controller can service (create *and* observe the watch events for said + // creation) about 10-20 pods a second, so it takes about 1 min to service + // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s + // latency/pod at the scale of 3000 pods over 100 nodes. + ExpectationsTimeout = 3 * time.Minute ) -// Expectations are a way for replication controllers to tell the rc manager what they expect. eg: -// RCExpectations: { -// rc1: expects 2 adds in 2 minutes -// rc2: expects 2 dels in 2 minutes -// rc3: expects -1 adds in 2 minutes => rc3's expectations have already been met +var ( + KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc +) + +// Expectations are a way for controllers to tell the controller manager what they expect. eg: +// ControllerExpectations: { +// controller1: expects 2 adds in 2 minutes +// controller2: expects 2 dels in 2 minutes +// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met // } // // Implementation: // PodExpectation = pair of atomic counters to track pod creation/deletion -// RCExpectationsStore = TTLStore + a PodExpectation per rc +// ControllerExpectationsStore = TTLStore + a PodExpectation per controller // // * Once set expectations can only be lowered -// * An RC isn't synced till its expectations are either fulfilled, or expire -// * Rcs that don't set expectations will get woken up for every matching pod +// * A controller isn't synced till its expectations are either fulfilled, or expire +// * Controllers that don't set expectations will get woken up for every matching pod -// expKeyFunc to parse out the key from a PodExpectation -var expKeyFunc = func(obj interface{}) (string, error) { +// ExpKeyFunc to parse out the key from a PodExpectation +var ExpKeyFunc = func(obj interface{}) (string, error) { if e, ok := obj.(*PodExpectations); ok { return e.key, nil } return "", fmt.Errorf("Could not find key for obj %#v", obj) } -// RCExpectationsManager is an interface that allows users to set and wait on expectations. +// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations. // Only abstracted out for testing. -type RCExpectationsManager interface { - GetExpectations(rc *api.ReplicationController) (*PodExpectations, bool, error) - SatisfiedExpectations(rc *api.ReplicationController) bool - DeleteExpectations(rcKey string) - ExpectCreations(rc *api.ReplicationController, adds int) error - ExpectDeletions(rc *api.ReplicationController, dels int) error - CreationObserved(rc *api.ReplicationController) - DeletionObserved(rc *api.ReplicationController) +// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different +// types of controllers, because the keys might conflict across types. +type ControllerExpectationsInterface interface { + GetExpectations(controllerKey string) (*PodExpectations, bool, error) + SatisfiedExpectations(controllerKey string) bool + DeleteExpectations(controllerKey string) + SetExpectations(controllerKey string, add, del int) error + ExpectCreations(controllerKey string, adds int) error + ExpectDeletions(controllerKey string, dels int) error + CreationObserved(controllerKey string) + DeletionObserved(controllerKey string) } -// RCExpectations is a ttl cache mapping rcs to what they expect to see before being woken up for a sync. -type RCExpectations struct { +// ControllerExpectations is a ttl cache mapping controllers to what they expect to see before being woken up for a sync. +type ControllerExpectations struct { cache.Store } -// GetExpectations returns the PodExpectations of the given rc. -func (r *RCExpectations) GetExpectations(rc *api.ReplicationController) (*PodExpectations, bool, error) { - rcKey, err := rcKeyFunc(rc) - if err != nil { - return nil, false, err - } - if podExp, exists, err := r.GetByKey(rcKey); err == nil && exists { +// GetExpectations returns the PodExpectations of the given controller. +func (r *ControllerExpectations) GetExpectations(controllerKey string) (*PodExpectations, bool, error) { + if podExp, exists, err := r.GetByKey(controllerKey); err == nil && exists { return podExp.(*PodExpectations), true, nil } else { return nil, false, err } } -// DeleteExpectations deletes the expectations of the given RC from the TTLStore. -func (r *RCExpectations) DeleteExpectations(rcKey string) { - if podExp, exists, err := r.GetByKey(rcKey); err == nil && exists { +// DeleteExpectations deletes the expectations of the given controller from the TTLStore. +func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { + if podExp, exists, err := r.GetByKey(controllerKey); err == nil && exists { if err := r.Delete(podExp); err != nil { - glog.V(2).Infof("Error deleting expectations for rc %v: %v", rcKey, err) + glog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err) } } } -// SatisfiedExpectations returns true if the replication manager has observed the required adds/dels -// for the given rc. Add/del counts are established by the rc at sync time, and updated as pods -// are observed by the replication manager's podController. -func (r *RCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bool { - if podExp, exists, err := r.GetExpectations(rc); exists { +// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. +// Add/del counts are established by the controller at sync time, and updated as pods are observed by the controller +// manager. +func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { + if podExp, exists, err := r.GetExpectations(controllerKey); exists { if podExp.Fulfilled() { return true } else { @@ -111,54 +128,50 @@ func (r *RCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bo } else if err != nil { glog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err) } else { - // When a new rc is created, it doesn't have expectations. + // When a new controller is created, it doesn't have expectations. // When it doesn't see expected watch events for > TTL, the expectations expire. // - In this case it wakes up, creates/deletes pods, and sets expectations again. // When it has satisfied expectations and no pods need to be created/destroyed > TTL, the expectations expire. // - In this case it continues without setting expectations till it needs to create/delete pods. - glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", rc.Name) + glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey) } // Trigger a sync if we either encountered and error (which shouldn't happen since we're - // getting from local store) or this rc hasn't established expectations. + // getting from local store) or this controller hasn't established expectations. return true } -// setExpectations registers new expectations for the given rc. Forgets existing expectations. -func (r *RCExpectations) setExpectations(rc *api.ReplicationController, add, del int) error { - rcKey, err := rcKeyFunc(rc) - if err != nil { - return err - } - podExp := &PodExpectations{add: int64(add), del: int64(del), key: rcKey} +// SetExpectations registers new expectations for the given controller. Forgets existing expectations. +func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { + podExp := &PodExpectations{add: int64(add), del: int64(del), key: controllerKey} glog.V(4).Infof("Setting expectations %+v", podExp) return r.Add(podExp) } -func (r *RCExpectations) ExpectCreations(rc *api.ReplicationController, adds int) error { - return r.setExpectations(rc, adds, 0) +func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error { + return r.SetExpectations(controllerKey, adds, 0) } -func (r *RCExpectations) ExpectDeletions(rc *api.ReplicationController, dels int) error { - return r.setExpectations(rc, 0, dels) +func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error { + return r.SetExpectations(controllerKey, 0, dels) } -// Decrements the expectation counts of the given rc. -func (r *RCExpectations) lowerExpectations(rc *api.ReplicationController, add, del int) { - if podExp, exists, err := r.GetExpectations(rc); err == nil && exists { +// Decrements the expectation counts of the given controller. +func (r *ControllerExpectations) lowerExpectations(controllerKey string, add, del int) { + if podExp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { podExp.Seen(int64(add), int64(del)) // The expectations might've been modified since the update on the previous line. glog.V(4).Infof("Lowering expectations %+v", podExp) } } -// CreationObserved atomically decrements the `add` expecation count of the given replication controller. -func (r *RCExpectations) CreationObserved(rc *api.ReplicationController) { - r.lowerExpectations(rc, 1, 0) +// CreationObserved atomically decrements the `add` expecation count of the given controller. +func (r *ControllerExpectations) CreationObserved(controllerKey string) { + r.lowerExpectations(controllerKey, 1, 0) } -// DeletionObserved atomically decrements the `del` expectation count of the given replication controller. -func (r *RCExpectations) DeletionObserved(rc *api.ReplicationController) { - r.lowerExpectations(rc, 0, 1) +// DeletionObserved atomically decrements the `del` expectation count of the given controller. +func (r *ControllerExpectations) DeletionObserved(controllerKey string) { + r.lowerExpectations(controllerKey, 0, 1) } // Expectations are either fulfilled, or expire naturally. @@ -185,59 +198,74 @@ func (e *PodExpectations) Fulfilled() bool { return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0 } -// getExpectations returns the add and del expectations of the pod. -func (e *PodExpectations) getExpectations() (int64, int64) { +// GetExpectations returns the add and del expectations of the pod. +func (e *PodExpectations) GetExpectations() (int64, int64) { return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del) } -// NewRCExpectations returns a store for PodExpectations. -func NewRCExpectations() *RCExpectations { - return &RCExpectations{cache.NewTTLStore(expKeyFunc, ExpectationsTimeout)} +// NewControllerExpectations returns a store for PodExpectations. +func NewControllerExpectations() *ControllerExpectations { + return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)} } // PodControlInterface is an interface that knows how to add or delete pods // created as an interface to allow testing. type PodControlInterface interface { - // createReplica creates new replicated pods according to the spec. - createReplica(namespace string, controller *api.ReplicationController) error - // deletePod deletes the pod identified by podID. - deletePod(namespace string, podID string) error + // CreateReplica creates new replicated pods according to the spec. + CreateReplica(namespace string, controller *api.ReplicationController) error + // DeletePod deletes the pod identified by podID. + DeletePod(namespace string, podID string) error } // RealPodControl is the default implementation of PodControllerInterface. type RealPodControl struct { - kubeClient client.Interface - recorder record.EventRecorder + KubeClient client.Interface + Recorder record.EventRecorder } -func (r RealPodControl) createReplica(namespace string, controller *api.ReplicationController) error { +func getReplicaLabelSet(template *api.PodTemplateSpec) labels.Set { desiredLabels := make(labels.Set) - for k, v := range controller.Spec.Template.Labels { + for k, v := range template.Labels { desiredLabels[k] = v } + return desiredLabels +} + +func getReplicaAnnotationSet(template *api.PodTemplateSpec, object runtime.Object) (labels.Set, error) { desiredAnnotations := make(labels.Set) - for k, v := range controller.Spec.Template.Annotations { + for k, v := range template.Annotations { desiredAnnotations[k] = v } - - createdByRef, err := api.GetReference(controller) + createdByRef, err := api.GetReference(object) if err != nil { - return fmt.Errorf("unable to get controller reference: %v", err) + return desiredAnnotations, fmt.Errorf("unable to get controller reference: %v", err) } createdByRefJson, err := latest.Codec.Encode(&api.SerializedReference{ Reference: *createdByRef, }) if err != nil { - return fmt.Errorf("unable to serialize controller reference: %v", err) + return desiredAnnotations, fmt.Errorf("unable to serialize controller reference: %v", err) } - desiredAnnotations[CreatedByAnnotation] = string(createdByRefJson) + return desiredAnnotations, nil +} +func getReplicaPrefix(controllerName string) string { // use the dash (if the name isn't too long) to make the pod name a bit prettier - prefix := fmt.Sprintf("%s-", controller.Name) + prefix := fmt.Sprintf("%s-", controllerName) if ok, _ := validation.ValidatePodName(prefix, true); !ok { - prefix = controller.Name + prefix = controllerName } + return prefix +} + +func (r RealPodControl) CreateReplica(namespace string, controller *api.ReplicationController) error { + desiredLabels := getReplicaLabelSet(controller.Spec.Template) + desiredAnnotations, err := getReplicaAnnotationSet(controller.Spec.Template, controller) + if err != nil { + return err + } + prefix := getReplicaPrefix(controller.Name) pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -252,27 +280,27 @@ func (r RealPodControl) createReplica(namespace string, controller *api.Replicat if labels.Set(pod.Labels).AsSelector().Empty() { return fmt.Errorf("unable to create pod replica, no labels") } - if newPod, err := r.kubeClient.Pods(namespace).Create(pod); err != nil { - r.recorder.Eventf(controller, "failedCreate", "Error creating: %v", err) + if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil { + r.Recorder.Eventf(controller, "failedCreate", "Error creating: %v", err) return fmt.Errorf("unable to create pod replica: %v", err) } else { glog.V(4).Infof("Controller %v created pod %v", controller.Name, newPod.Name) - r.recorder.Eventf(controller, "successfulCreate", "Created pod: %v", newPod.Name) + r.Recorder.Eventf(controller, "successfulCreate", "Created pod: %v", newPod.Name) } return nil } -func (r RealPodControl) deletePod(namespace, podID string) error { - return r.kubeClient.Pods(namespace).Delete(podID, nil) +func (r RealPodControl) DeletePod(namespace, podID string) error { + return r.KubeClient.Pods(namespace).Delete(podID, nil) } -// activePods type allows custom sorting of pods so an rc can pick the best ones to delete. -type activePods []*api.Pod +// ActivePods type allows custom sorting of pods so a controller can pick the best ones to delete. +type ActivePods []*api.Pod -func (s activePods) Len() int { return len(s) } -func (s activePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s ActivePods) Len() int { return len(s) } +func (s ActivePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s activePods) Less(i, j int) bool { +func (s ActivePods) Less(i, j int) bool { // Unassigned < assigned if s[i].Spec.NodeName == "" && s[j].Spec.NodeName != "" { return true @@ -289,21 +317,8 @@ func (s activePods) Less(i, j int) bool { return false } -// overlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker. -type overlappingControllers []api.ReplicationController - -func (o overlappingControllers) Len() int { return len(o) } -func (o overlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } - -func (o overlappingControllers) Less(i, j int) bool { - if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { - return o[i].Name < o[j].Name - } - return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) -} - -// filterActivePods returns pods that have not terminated. -func filterActivePods(pods []api.Pod) []*api.Pod { +// FilterActivePods returns pods that have not terminated. +func FilterActivePods(pods []api.Pod) []*api.Pod { var result []*api.Pod for i := range pods { if api.PodSucceeded != pods[i].Status.Phase && @@ -313,39 +328,3 @@ func filterActivePods(pods []api.Pod) []*api.Pod { } return result } - -// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry. -func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) { - // This is the steady state. It happens when the rc doesn't have any expectations, since - // we do a periodic relist every 30s. If the generations differ but the replicas are - // the same, a caller might've resized to the same replica count. - if controller.Status.Replicas == numReplicas && - controller.Generation == controller.Status.ObservedGeneration { - return nil - } - // Save the generation number we acted on, otherwise we might wrongfully indicate - // that we've seen a spec update when we retry. - // TODO: This can clobber an update if we allow multiple agents to write to the - // same status. - generation := controller.Generation - - var getErr error - for i, rc := 0, &controller; ; i++ { - glog.V(4).Infof("Updating replica count for rc: %v, %d->%d (need %d), sequence No: %v->%v", - controller.Name, controller.Status.Replicas, numReplicas, controller.Spec.Replicas, controller.Status.ObservedGeneration, generation) - - rc.Status = api.ReplicationControllerStatus{Replicas: numReplicas, ObservedGeneration: generation} - _, updateErr = rcClient.Update(rc) - if updateErr == nil || i >= updateRetries { - return updateErr - } - // Update the controller with the latest resource version for the next poll - if rc, getErr = rcClient.Get(controller.Name); getErr != nil { - // If the GET fails we can't trust status.Replicas anymore. This error - // is bound to be more interesting than the update failure. - return getErr - } - } - // Failed 2 updates one of which was with the latest controller, return the update error - return -} diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go new file mode 100644 index 0000000000..f51d23d553 --- /dev/null +++ b/pkg/controller/controller_utils_test.go @@ -0,0 +1,292 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "math/rand" + "net/http/httptest" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/securitycontext" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// NewFakeControllerExpectationsLookup creates a fake store for PodExpectations. +func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *util.FakeClock) { + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + fakeClock := &util.FakeClock{fakeTime} + ttlPolicy := &cache.TTLPolicy{ttl, fakeClock} + ttlStore := cache.NewFakeExpirationStore( + ExpKeyFunc, nil, ttlPolicy, fakeClock) + return &ControllerExpectations{ttlStore}, fakeClock +} + +func newReplicationController(replicas int) *api.ReplicationController { + rc := &api.ReplicationController{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, + ObjectMeta: api.ObjectMeta{ + UID: util.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + }, + Spec: api.ReplicationControllerSpec{ + Replicas: replicas, + Selector: map[string]string{"foo": "bar"}, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "name": "foo", + "type": "production", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo/bar", + TerminationMessagePath: api.TerminationMessagePathDefault, + ImagePullPolicy: api.PullIfNotPresent, + SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), + }, + }, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSDefault, + NodeSelector: map[string]string{ + "baz": "blah", + }, + }, + }, + }, + } + return rc +} + +// create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store. +func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController) *api.PodList { + pods := []api.Pod{} + for i := 0; i < count; i++ { + newPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("pod%d", i), + Labels: rc.Spec.Selector, + Namespace: rc.Namespace, + }, + Status: api.PodStatus{Phase: status}, + } + if store != nil { + store.Add(&newPod) + } + pods = append(pods, newPod) + } + return &api.PodList{ + Items: pods, + } +} + +func TestControllerExpectations(t *testing.T) { + ttl := 30 * time.Second + e, fakeClock := NewFakeControllerExpectationsLookup(ttl) + // In practice we can't really have add and delete expectations since we only either create or + // delete replicas in one rc pass, and the rc goes to sleep soon after until the expectations are + // either fulfilled or timeout. + adds, dels := 10, 30 + rc := newReplicationController(1) + + // RC fires off adds and deletes at apiserver, then sets expectations + rcKey, err := KeyFunc(rc) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rc, err) + } + e.SetExpectations(rcKey, adds, dels) + var wg sync.WaitGroup + for i := 0; i < adds+1; i++ { + wg.Add(1) + go func() { + // In prod this can happen either because of a failed create by the rc + // or after having observed a create via informer + e.CreationObserved(rcKey) + wg.Done() + }() + } + wg.Wait() + + // There are still delete expectations + if e.SatisfiedExpectations(rcKey) { + t.Errorf("Rc will sync before expectations are met") + } + for i := 0; i < dels+1; i++ { + wg.Add(1) + go func() { + e.DeletionObserved(rcKey) + wg.Done() + }() + } + wg.Wait() + + // Expectations have been surpassed + if podExp, exists, err := e.GetExpectations(rcKey); err == nil && exists { + add, del := podExp.GetExpectations() + if add != -1 || del != -1 { + t.Errorf("Unexpected pod expectations %#v", podExp) + } + } else { + t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err) + } + if !e.SatisfiedExpectations(rcKey) { + t.Errorf("Expectations are met but the rc will not sync") + } + + // Next round of rc sync, old expectations are cleared + e.SetExpectations(rcKey, 1, 2) + if podExp, exists, err := e.GetExpectations(rcKey); err == nil && exists { + add, del := podExp.GetExpectations() + if add != 1 || del != 2 { + t.Errorf("Unexpected pod expectations %#v", podExp) + } + } else { + t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err) + } + + // Expectations have expired because of ttl + fakeClock.Time = fakeClock.Time.Add(ttl + 1) + if !e.SatisfiedExpectations(rcKey) { + t.Errorf("Expectations should have expired but didn't") + } +} + +func TestCreateReplica(t *testing.T) { + ns := api.NamespaceDefault + body := runtime.EncodeOrDie(testapi.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}}) + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + + podControl := RealPodControl{ + KubeClient: client, + Recorder: &record.FakeRecorder{}, + } + + controllerSpec := newReplicationController(1) + + // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template + podControl.CreateReplica(ns, controllerSpec) + + expectedPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Labels: controllerSpec.Spec.Template.Labels, + GenerateName: fmt.Sprintf("%s-", controllerSpec.Name), + }, + Spec: controllerSpec.Spec.Template.Spec, + } + fakeHandler.ValidateRequest(t, testapi.ResourcePath("pods", api.NamespaceDefault, ""), "POST", nil) + actualPod, err := client.Codec.Decode([]byte(fakeHandler.RequestBody)) + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } + if !api.Semantic.DeepDerivative(&expectedPod, actualPod) { + t.Logf("Body: %s", fakeHandler.RequestBody) + t.Errorf("Unexpected mismatch. Expected\n %#v,\n Got:\n %#v", &expectedPod, actualPod) + } +} + +func TestActivePodFiltering(t *testing.T) { + // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. + rc := newReplicationController(0) + podList := newPodList(nil, 5, api.PodRunning, rc) + podList.Items[0].Status.Phase = api.PodSucceeded + podList.Items[1].Status.Phase = api.PodFailed + expectedNames := util.NewStringSet() + for _, pod := range podList.Items[2:] { + expectedNames.Insert(pod.Name) + } + + got := FilterActivePods(podList.Items) + gotNames := util.NewStringSet() + for _, pod := range got { + gotNames.Insert(pod.Name) + } + if expectedNames.Difference(gotNames).Len() != 0 || gotNames.Difference(expectedNames).Len() != 0 { + t.Errorf("expected %v, got %v", expectedNames.List(), gotNames.List()) + } +} + +func TestSortingActivePods(t *testing.T) { + numPods := 5 + // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. + rc := newReplicationController(0) + podList := newPodList(nil, numPods, api.PodRunning, rc) + + pods := make([]*api.Pod, len(podList.Items)) + for i := range podList.Items { + pods[i] = &podList.Items[i] + } + // pods[0] is not scheduled yet. + pods[0].Spec.NodeName = "" + pods[0].Status.Phase = api.PodPending + // pods[1] is scheduled but pending. + pods[1].Spec.NodeName = "bar" + pods[1].Status.Phase = api.PodPending + // pods[2] is unknown. + pods[2].Spec.NodeName = "foo" + pods[2].Status.Phase = api.PodUnknown + // pods[3] is running but not ready. + pods[3].Spec.NodeName = "foo" + pods[3].Status.Phase = api.PodRunning + // pods[4] is running and ready. + pods[4].Spec.NodeName = "foo" + pods[4].Status.Phase = api.PodRunning + pods[4].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}} + + getOrder := func(pods []*api.Pod) []string { + names := make([]string, len(pods)) + for i := range pods { + names[i] = pods[i].Name + } + return names + } + + expected := getOrder(pods) + + for i := 0; i < 20; i++ { + idx := rand.Perm(numPods) + randomizedPods := make([]*api.Pod, numPods) + for j := 0; j < numPods; j++ { + randomizedPods[j] = pods[idx[j]] + } + sort.Sort(ActivePods(randomizedPods)) + actual := getOrder(randomizedPods) + + if !reflect.DeepEqual(actual, expected) { + t.Errorf("expected %v, got %v", expected, actual) + } + } +} diff --git a/pkg/controller/doc.go b/pkg/controller/doc.go index 79e6b47b89..1e310b466f 100644 --- a/pkg/controller/doc.go +++ b/pkg/controller/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2015 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package controller contains logic for watching and synchronizing -// replicationControllers. +// Package controller contains code for controllers (like the replication +// controller). package controller diff --git a/pkg/controller/replication/doc.go b/pkg/controller/replication/doc.go new file mode 100644 index 0000000000..b60e1d99c4 --- /dev/null +++ b/pkg/controller/replication/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package replication contains logic for watching and synchronizing +// replication controllers. +package replication diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication/replication_controller.go similarity index 84% rename from pkg/controller/replication_controller.go rename to pkg/controller/replication/replication_controller.go index cbb675897a..e05a4ce35d 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package replication import ( "reflect" @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -36,10 +37,6 @@ import ( "github.com/golang/glog" ) -var ( - rcKeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc -) - const ( // We'll attempt to recompute the required replicas of all replication controllers // the have fulfilled their expectations at least this often. This recomputation @@ -52,18 +49,6 @@ const ( // final resting state of the pod. PodRelistPeriod = 5 * time.Minute - // If a watch drops a delete event for a pod, it'll take this long - // before a dormant rc waiting for those packets is woken up anyway. It is - // specifically targeted at the case where some problem prevents an update - // of expectations, without it the RC could stay asleep forever. This should - // be set based on the expected latency of watch events. - // - // Currently an rc can service (create *and* observe the watch events for said - // creation) about 10-20 pods a second, so it takes about 1 min to service - // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s - // latency/pod at the scale of 3000 pods over 100 nodes. - ExpectationsTimeout = 3 * time.Minute - // Realistic value of the burstReplica field for the replication manager based off // performance requirements for kubernetes 1.0. BurstReplicas = 500 @@ -71,13 +56,16 @@ const ( // We must avoid counting pods until the pod store has synced. If it hasn't synced, to // avoid a hot loop, we'll wait this long between checks. PodStoreSyncedPollPeriod = 100 * time.Millisecond + + // The number of times we retry updating a replication controller's status. + statusUpdateRetries = 1 ) // ReplicationManager is responsible for synchronizing ReplicationController objects stored // in the system with actual running pods. type ReplicationManager struct { kubeClient client.Interface - podControl PodControlInterface + podControl controller.PodControlInterface // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. @@ -90,9 +78,11 @@ type ReplicationManager struct { podStoreSynced func() bool // A TTLCache of pod creates/deletes each rc expects to see - expectations RCExpectationsManager + expectations controller.ControllerExpectationsInterface + // A store of replication controllers, populated by the rcController rcStore cache.StoreToReplicationControllerLister + // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all replication controllers @@ -111,12 +101,12 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl rm := &ReplicationManager{ kubeClient: kubeClient, - podControl: RealPodControl{ - kubeClient: kubeClient, - recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), + podControl: controller.RealPodControl{ + KubeClient: kubeClient, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), }, burstReplicas: burstReplicas, - expectations: NewRCExpectations(), + expectations: controller.NewControllerExpectations(), queue: workqueue.New(), } @@ -185,7 +175,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) { // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks // need to pass in a fake. - rm.podControl = RealPodControl{rm.kubeClient, recorder} + rm.podControl = controller.RealPodControl{rm.kubeClient, recorder} } // Run begins watching and syncing. @@ -222,7 +212,12 @@ func (rm *ReplicationManager) getPodControllers(pod *api.Pod) *api.ReplicationCo func (rm *ReplicationManager) addPod(obj interface{}) { pod := obj.(*api.Pod) if rc := rm.getPodControllers(pod); rc != nil { - rm.expectations.CreationObserved(rc) + rcKey, err := controller.KeyFunc(rc) + if err != nil { + glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) + return + } + rm.expectations.CreationObserved(rcKey) rm.enqueueController(rc) } } @@ -263,24 +258,29 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, ExpectationsTimeout) + glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout) return } pod, ok = tombstone.Obj.(*api.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, ExpectationsTimeout) + glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout) return } } if rc := rm.getPodControllers(pod); rc != nil { - rm.expectations.DeletionObserved(rc) + rcKey, err := controller.KeyFunc(rc) + if err != nil { + glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) + return + } + rm.expectations.DeletionObserved(rcKey) rm.enqueueController(rc) } } // obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item. func (rm *ReplicationManager) enqueueController(obj interface{}) { - key, err := rcKeyFunc(obj) + key, err := controller.KeyFunc(obj) if err != nil { glog.Errorf("Couldn't get key for object %+v: %v", obj, err) return @@ -314,24 +314,29 @@ func (rm *ReplicationManager) worker() { } // manageReplicas checks and updates replicas for the given replication controller. -func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller *api.ReplicationController) { - diff := len(filteredPods) - controller.Spec.Replicas +func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) { + diff := len(filteredPods) - rc.Spec.Replicas + rcKey, err := controller.KeyFunc(rc) + if err != nil { + glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) + return + } if diff < 0 { diff *= -1 if diff > rm.burstReplicas { diff = rm.burstReplicas } - rm.expectations.ExpectCreations(controller, diff) + rm.expectations.ExpectCreations(rcKey, diff) wait := sync.WaitGroup{} wait.Add(diff) - glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff) + glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) for i := 0; i < diff; i++ { go func() { defer wait.Done() - if err := rm.podControl.createReplica(controller.Namespace, controller); err != nil { + if err := rm.podControl.CreateReplica(rc.Namespace, rc); err != nil { // Decrement the expected number of creates because the informer won't observe this pod - glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", controller.Namespace, controller.Name) - rm.expectations.CreationObserved(controller) + glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) + rm.expectations.CreationObserved(rcKey) util.HandleError(err) } }() @@ -341,14 +346,14 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller if diff > rm.burstReplicas { diff = rm.burstReplicas } - rm.expectations.ExpectDeletions(controller, diff) - glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff) + rm.expectations.ExpectDeletions(rcKey, diff) + glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) // No need to sort pods if we are about to delete all of them - if controller.Spec.Replicas != 0 { + if rc.Spec.Replicas != 0 { // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. - sort.Sort(activePods(filteredPods)) + sort.Sort(controller.ActivePods(filteredPods)) } wait := sync.WaitGroup{} @@ -356,10 +361,10 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller for i := 0; i < diff; i++ { go func(ix int) { defer wait.Done() - if err := rm.podControl.deletePod(controller.Namespace, filteredPods[ix].Name); err != nil { + if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion - glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", controller.Namespace, controller.Name) - rm.expectations.DeletionObserved(controller) + glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) + rm.expectations.DeletionObserved(rcKey) } }(i) } @@ -387,20 +392,25 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { rm.queue.Add(key) return err } - controller := *obj.(*api.ReplicationController) + rc := *obj.(*api.ReplicationController) if !rm.podStoreSynced() { // Sleep so we give the pod reflector goroutine a chance to run. time.Sleep(PodStoreSyncedPollPeriod) - glog.Infof("Waiting for pods controller to sync, requeuing rc %v", controller.Name) - rm.enqueueController(&controller) + glog.Infof("Waiting for pods controller to sync, requeuing rc %v", rc.Name) + rm.enqueueController(&rc) return nil } // Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the rc sync is just deferred till the next relist. - rcNeedsSync := rm.expectations.SatisfiedExpectations(&controller) - podList, err := rm.podStore.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector()) + rcKey, err := controller.KeyFunc(&rc) + if err != nil { + glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) + return err + } + rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey) + podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) @@ -408,17 +418,17 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { } // TODO: Do this in a single pass, or use an index. - filteredPods := filterActivePods(podList.Items) + filteredPods := controller.FilterActivePods(podList.Items) if rcNeedsSync { - rm.manageReplicas(filteredPods, &controller) + rm.manageReplicas(filteredPods, &rc) } // Always updates status as pods come up or die. - if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(controller.Namespace), controller, len(filteredPods)); err != nil { + if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(rc.Namespace), rc, len(filteredPods)); err != nil { // Multiple things could lead to this update failing. Requeuing the controller ensures // we retry with some fairness. - glog.V(2).Infof("Failed to update replica count for controller %v, requeuing", controller.Name) - rm.enqueueController(&controller) + glog.V(2).Infof("Failed to update replica count for controller %v, requeuing", rc.Name) + rm.enqueueController(&rc) } return nil } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go similarity index 82% rename from pkg/controller/replication_controller_test.go rename to pkg/controller/replication/replication_controller_test.go index 20cac2b4e7..292c116515 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -14,15 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package replication import ( "fmt" "math/rand" "net/http" "net/http/httptest" - "reflect" - "sort" "sync" "testing" "time" @@ -31,8 +29,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/securitycontext" @@ -60,7 +58,7 @@ func init() { api.ForTesting_ReferencesAllowBlankSelfLinks = true } -func (f *FakePodControl) createReplica(namespace string, spec *api.ReplicationController) error { +func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error { f.lock.Lock() defer f.lock.Unlock() if f.err != nil { @@ -70,7 +68,7 @@ func (f *FakePodControl) createReplica(namespace string, spec *api.ReplicationCo return nil } -func (f *FakePodControl) deletePod(namespace string, podName string) error { +func (f *FakePodControl) DeletePod(namespace string, podName string) error { f.lock.Lock() defer f.lock.Unlock() if f.err != nil { @@ -79,7 +77,6 @@ func (f *FakePodControl) deletePod(namespace string, podName string) error { f.deletePodName = append(f.deletePodName, podName) return nil } - func (f *FakePodControl) clear() { f.lock.Lock() defer f.lock.Unlock() @@ -88,7 +85,7 @@ func (f *FakePodControl) clear() { } func getKey(rc *api.ReplicationController, t *testing.T) string { - if key, err := rcKeyFunc(rc); err != nil { + if key, err := controller.KeyFunc(rc); err != nil { t.Errorf("Unexpected error getting key for rc %v: %v", rc.Name, err) return "" } else { @@ -301,45 +298,6 @@ func TestSyncReplicationControllerCreates(t *testing.T) { validateSyncReplication(t, &fakePodControl, 2, 0) } -func TestCreateReplica(t *testing.T) { - ns := api.NamespaceDefault - body := runtime.EncodeOrDie(testapi.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}}) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - - podControl := RealPodControl{ - kubeClient: client, - recorder: &record.FakeRecorder{}, - } - - controllerSpec := newReplicationController(1) - - // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template - podControl.createReplica(ns, controllerSpec) - - expectedPod := api.Pod{ - ObjectMeta: api.ObjectMeta{ - Labels: controllerSpec.Spec.Template.Labels, - GenerateName: fmt.Sprintf("%s-", controllerSpec.Name), - }, - Spec: controllerSpec.Spec.Template.Spec, - } - fakeHandler.ValidateRequest(t, testapi.ResourcePath("pods", api.NamespaceDefault, ""), "POST", nil) - actualPod, err := client.Codec.Decode([]byte(fakeHandler.RequestBody)) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - if !api.Semantic.DeepDerivative(&expectedPod, actualPod) { - t.Logf("Body: %s", fakeHandler.RequestBody) - t.Errorf("Unexpected mismatch. Expected\n %#v,\n Got:\n %#v", &expectedPod, actualPod) - } -} - func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // Setup a fake server to listen for requests, and run the rc manager in steady state fakeHandler := util.FakeHandler{ @@ -420,156 +378,6 @@ func TestControllerUpdateReplicas(t *testing.T) { validateSyncReplication(t, &fakePodControl, 1, 0) } -func TestActivePodFiltering(t *testing.T) { - // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. - rc := newReplicationController(0) - podList := newPodList(nil, 5, api.PodRunning, rc) - podList.Items[0].Status.Phase = api.PodSucceeded - podList.Items[1].Status.Phase = api.PodFailed - expectedNames := util.NewStringSet() - for _, pod := range podList.Items[2:] { - expectedNames.Insert(pod.Name) - } - - got := filterActivePods(podList.Items) - gotNames := util.NewStringSet() - for _, pod := range got { - gotNames.Insert(pod.Name) - } - if expectedNames.Difference(gotNames).Len() != 0 || gotNames.Difference(expectedNames).Len() != 0 { - t.Errorf("expected %v, got %v", expectedNames.List(), gotNames.List()) - } -} - -func TestSortingActivePods(t *testing.T) { - numPods := 5 - // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. - rc := newReplicationController(0) - podList := newPodList(nil, numPods, api.PodRunning, rc) - - pods := make([]*api.Pod, len(podList.Items)) - for i := range podList.Items { - pods[i] = &podList.Items[i] - } - // pods[0] is not scheduled yet. - pods[0].Spec.NodeName = "" - pods[0].Status.Phase = api.PodPending - // pods[1] is scheduled but pending. - pods[1].Spec.NodeName = "bar" - pods[1].Status.Phase = api.PodPending - // pods[2] is unknown. - pods[2].Spec.NodeName = "foo" - pods[2].Status.Phase = api.PodUnknown - // pods[3] is running but not ready. - pods[3].Spec.NodeName = "foo" - pods[3].Status.Phase = api.PodRunning - // pods[4] is running and ready. - pods[4].Spec.NodeName = "foo" - pods[4].Status.Phase = api.PodRunning - pods[4].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}} - - getOrder := func(pods []*api.Pod) []string { - names := make([]string, len(pods)) - for i := range pods { - names[i] = pods[i].Name - } - return names - } - - expected := getOrder(pods) - - for i := 0; i < 20; i++ { - idx := rand.Perm(numPods) - randomizedPods := make([]*api.Pod, numPods) - for j := 0; j < numPods; j++ { - randomizedPods[j] = pods[idx[j]] - } - sort.Sort(activePods(randomizedPods)) - actual := getOrder(randomizedPods) - - if !reflect.DeepEqual(actual, expected) { - t.Errorf("expected %v, got %v", expected, actual) - } - } -} - -// NewFakeRCExpectationsLookup creates a fake store for PodExpectations. -func NewFakeRCExpectationsLookup(ttl time.Duration) (*RCExpectations, *util.FakeClock) { - fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - fakeClock := &util.FakeClock{fakeTime} - ttlPolicy := &cache.TTLPolicy{ttl, fakeClock} - ttlStore := cache.NewFakeExpirationStore( - expKeyFunc, nil, ttlPolicy, fakeClock) - return &RCExpectations{ttlStore}, fakeClock -} - -func TestRCExpectations(t *testing.T) { - ttl := 30 * time.Second - e, fakeClock := NewFakeRCExpectationsLookup(ttl) - // In practice we can't really have add and delete expectations since we only either create or - // delete replicas in one rc pass, and the rc goes to sleep soon after until the expectations are - // either fulfilled or timeout. - adds, dels := 10, 30 - rc := newReplicationController(1) - - // RC fires off adds and deletes at apiserver, then sets expectations - e.setExpectations(rc, adds, dels) - var wg sync.WaitGroup - for i := 0; i < adds+1; i++ { - wg.Add(1) - go func() { - // In prod this can happen either because of a failed create by the rc - // or after having observed a create via informer - e.CreationObserved(rc) - wg.Done() - }() - } - wg.Wait() - - // There are still delete expectations - if e.SatisfiedExpectations(rc) { - t.Errorf("Rc will sync before expectations are met") - } - for i := 0; i < dels+1; i++ { - wg.Add(1) - go func() { - e.DeletionObserved(rc) - wg.Done() - }() - } - wg.Wait() - - // Expectations have been surpassed - if podExp, exists, err := e.GetExpectations(rc); err == nil && exists { - add, del := podExp.getExpectations() - if add != -1 || del != -1 { - t.Errorf("Unexpected pod expectations %#v", podExp) - } - } else { - t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err) - } - if !e.SatisfiedExpectations(rc) { - t.Errorf("Expectations are met but the rc will not sync") - } - - // Next round of rc sync, old expectations are cleared - e.setExpectations(rc, 1, 2) - if podExp, exists, err := e.GetExpectations(rc); err == nil && exists { - add, del := podExp.getExpectations() - if add != 1 || del != 2 { - t.Errorf("Unexpected pod expectations %#v", podExp) - } - } else { - t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err) - } - - // Expectations have expired because of ttl - fakeClock.Time = fakeClock.Time.Add(ttl + 1) - if !e.SatisfiedExpectations(rc) { - t.Errorf("Expectations should have expired but didn't") - } -} - func TestSyncReplicationControllerDormancy(t *testing.T) { // Setup a test server so we can lie about the current state of pods fakeHandler := util.FakeHandler{ @@ -600,9 +408,15 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { manager.syncReplicationController(getKey(controllerSpec, t)) validateSyncReplication(t, &fakePodControl, 0, 0) + // Get the key for the controller + rcKey, err := controller.KeyFunc(controllerSpec) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", controllerSpec, err) + } + // Lowering expectations should lead to a sync that creates a replica, however the // fakePodControl error will prevent this, leaving expectations at 0, 0 - manager.expectations.CreationObserved(controllerSpec) + manager.expectations.CreationObserved(rcKey) controllerSpec.Status.Replicas = 1 fakePodControl.clear() fakePodControl.err = fmt.Errorf("Fake Error") @@ -920,6 +734,11 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) expectedPods := 0 pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec) + rcKey, err := controller.KeyFunc(controllerSpec) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", controllerSpec, err) + } + // Size up the controller, then size it down, and confirm the expected create/delete pattern for _, replicas := range []int{numReplicas, 0} { @@ -949,11 +768,11 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) manager.addPod(&pods.Items[i]) } - podExp, exists, err := manager.expectations.GetExpectations(controllerSpec) + podExp, exists, err := manager.expectations.GetExpectations(rcKey) if !exists || err != nil { t.Fatalf("Did not find expectations for rc.") } - if add, _ := podExp.getExpectations(); add != 1 { + if add, _ := podExp.GetExpectations(); add != 1 { t.Fatalf("Expectations are wrong %v", podExp) } } else { @@ -966,11 +785,11 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) manager.podStore.Store.Delete(&pods.Items[i]) manager.deletePod(&pods.Items[i]) } - podExp, exists, err := manager.expectations.GetExpectations(controllerSpec) + podExp, exists, err := manager.expectations.GetExpectations(rcKey) if !exists || err != nil { t.Fatalf("Did not find expectations for rc.") } - if _, del := podExp.getExpectations(); del != 1 { + if _, del := podExp.GetExpectations(); del != 1 { t.Fatalf("Expectations are wrong %v", podExp) } } @@ -1010,12 +829,12 @@ func TestControllerBurstReplicas(t *testing.T) { } type FakeRCExpectations struct { - *RCExpectations + *controller.ControllerExpectations satisfied bool expSatisfied func() } -func (fe FakeRCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bool { +func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { fe.expSatisfied() return fe.satisfied } @@ -1036,7 +855,7 @@ func TestRCSyncExpectations(t *testing.T) { postExpectationsPod := pods.Items[1] manager.expectations = FakeRCExpectations{ - NewRCExpectations(), true, func() { + controller.NewControllerExpectations(), true, func() { // If we check active pods before checking expectataions, the rc // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. @@ -1063,16 +882,22 @@ func TestDeleteControllerAndExpectations(t *testing.T) { validateSyncReplication(t, &fakePodControl, 1, 0) fakePodControl.clear() + // Get the RC key + rcKey, err := controller.KeyFunc(rc) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rc, err) + } + // This is to simulate a concurrent addPod, that has a handle on the expectations // as the controller deletes it. - podExp, exists, err := manager.expectations.GetExpectations(rc) + podExp, exists, err := manager.expectations.GetExpectations(rcKey) if !exists || err != nil { t.Errorf("No expectations found for rc") } manager.rcStore.Delete(rc) manager.syncReplicationController(getKey(rc, t)) - if _, exists, err = manager.expectations.GetExpectations(rc); exists { + if _, exists, err = manager.expectations.GetExpectations(rcKey); exists { t.Errorf("Found expectaions, expected none since the rc has been deleted.") } diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go new file mode 100644 index 0000000000..63f4fecf58 --- /dev/null +++ b/pkg/controller/replication/replication_controller_utils.go @@ -0,0 +1,72 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replication + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/golang/glog" +) + +// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry. +func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) { + // This is the steady state. It happens when the rc doesn't have any expectations, since + // we do a periodic relist every 30s. If the generations differ but the replicas are + // the same, a caller might've resized to the same replica count. + if controller.Status.Replicas == numReplicas && + controller.Generation == controller.Status.ObservedGeneration { + return nil + } + // Save the generation number we acted on, otherwise we might wrongfully indicate + // that we've seen a spec update when we retry. + // TODO: This can clobber an update if we allow multiple agents to write to the + // same status. + generation := controller.Generation + + var getErr error + for i, rc := 0, &controller; ; i++ { + glog.V(4).Infof("Updating replica count for rc: %v, %d->%d (need %d), sequence No: %v->%v", + controller.Name, controller.Status.Replicas, numReplicas, controller.Spec.Replicas, controller.Status.ObservedGeneration, generation) + + rc.Status = api.ReplicationControllerStatus{Replicas: numReplicas, ObservedGeneration: generation} + _, updateErr = rcClient.Update(rc) + if updateErr == nil || i >= statusUpdateRetries { + return updateErr + } + // Update the controller with the latest resource version for the next poll + if rc, getErr = rcClient.Get(controller.Name); getErr != nil { + // If the GET fails we can't trust status.Replicas anymore. This error + // is bound to be more interesting than the update failure. + return getErr + } + } + // Failed 2 updates one of which was with the latest controller, return the update error + return +} + +// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker. +type overlappingControllers []api.ReplicationController + +func (o overlappingControllers) Len() int { return len(o) } +func (o overlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o overlappingControllers) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 918877783e..719972a126 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -30,7 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/replication" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -66,7 +66,7 @@ type MasterComponents struct { // Restclient used to talk to the kubernetes master RestClient *client.Client // Replication controller manager - ControllerManager *controller.ReplicationManager + ControllerManager *replication.ReplicationManager // Channel for stop signals to rc manager rcStopCh chan struct{} // Used to stop master components individually, and via MasterComponents.Stop @@ -99,7 +99,7 @@ func NewMasterComponents(c *Config) *MasterComponents { } restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version(), QPS: c.QPS, Burst: c.Burst}) rcStopCh := make(chan struct{}) - controllerManager := controller.NewReplicationManager(restClient, c.Burst) + controllerManager := replication.NewReplicationManager(restClient, c.Burst) // TODO: Support events once we can cleanly shutdown an event recorder. controllerManager.SetEventRecorder(&record.FakeRecorder{})