Fix deployment tests failures; change ResyncPeriod

pull/6/head
Janet Kuo 2015-11-18 15:12:11 -08:00
parent b838d8ce18
commit 32d153093e
10 changed files with 193 additions and 153 deletions

View File

@ -386,7 +386,7 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller")).
go deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller"), s.ResyncPeriod).
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
}
}

View File

@ -60,6 +60,7 @@ kube-controller-manager
--cloud-provider="": The provider for cloud services. Empty string for no provider.
--cluster-cidr=<nil>: CIDR Range for Pods in cluster.
--cluster-name="kubernetes": The instance prefix for the cluster
--concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load
--concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
--concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load
--concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load

View File

@ -235,7 +235,6 @@ function start_apiserver {
APISERVER_LOG=/tmp/kube-apiserver.log
sudo -E "${GO_OUT}/kube-apiserver" ${priv_arg} ${runtime_config}\
--v=${LOG_LEVEL} \
--runtime-config=experimental/v1=true \
--cert-dir="${CERT_DIR}" \
--service-account-key-file="${SERVICE_ACCOUNT_KEY}" \
--service-account-lookup="${SERVICE_ACCOUNT_LOOKUP}" \
@ -256,7 +255,6 @@ function start_controller_manager {
CTLRMGR_LOG=/tmp/kube-controller-manager.log
sudo -E "${GO_OUT}/kube-controller-manager" \
--v=${LOG_LEVEL} \
--enable-deployment-controller \
--service-account-private-key-file="${SERVICE_ACCOUNT_KEY}" \
--root-ca-file="${ROOT_CA_FILE}" \
--enable-hostpath-provisioner="${ENABLE_HOSTPATH_PROVISIONER}" \

View File

@ -640,7 +640,7 @@ runTests() {
# Post-Condition: hpa "frontend" has configuration annotation
[[ "$(kubectl get hpa frontend -o yaml "${kube_flags[@]}" | grep kubectl.kubernetes.io/last-applied-configuration)" ]]
# Clean up
kubectl delete rc,hpa frontend
kubectl delete rc,hpa frontend "${kube_flags[@]}"
## kubectl apply should create the resource that doesn't exist yet
# Pre-Condition: no POD is running
@ -658,19 +658,20 @@ runTests() {
# Pre-Condition: no Job is running
kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" ''
# Command
kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)'
kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' "${kube_flags[@]}"
# Post-Condition: Job "pi" is created
kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" 'pi:'
# Clean up
kubectl delete jobs pi
kubectl delete jobs pi "${kube_flags[@]}"
# Pre-Condition: no Deployment is running
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" ''
# Command
kubectl run nginx --image=nginx --generator=deployment/v1beta1
kubectl run nginx --image=nginx --generator=deployment/v1beta1 "${kube_flags[@]}"
# Post-Condition: Deployment "nginx" is created
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx:'
# Clean up
kubectl delete deployment nginx
kubectl delete deployment nginx "${kube_flags[@]}"
kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}"
##############
# Namespaces #
@ -1050,11 +1051,12 @@ __EOF__
kubectl create -f examples/extensions/deployment.yaml "${kube_flags[@]}"
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx-deployment:'
# autoscale 2~3 pods, default CPU utilization (80%)
kubectl autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3
kubectl-with-retry autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3
kube::test::get_object_assert 'hpa nginx-deployment' "{{$hpa_min_field}} {{$hpa_max_field}} {{$hpa_cpu_field}}" '2 3 80'
# Clean up
kubectl delete hpa nginx-deployment "${kube_flags[@]}"
kubectl delete deployment nginx-deployment "${kube_flags[@]}"
kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}"
######################
# Multiple Resources #

View File

@ -167,7 +167,7 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
var rc api.ReplicationController
if len(pod.Labels) == 0 {
err = fmt.Errorf("No controllers found for pod %v because it has no labels", pod.Name)
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
return
}
@ -186,7 +186,7 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
controllers = append(controllers, rc)
}
if len(controllers) == 0 {
err = fmt.Errorf("Could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
@ -220,7 +220,7 @@ func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationControl
var d extensions.Deployment
if len(rc.Labels) == 0 {
err = fmt.Errorf("No controllers found for replication controller %v because it has no labels", rc.Name)
err = fmt.Errorf("no deployments found for replication controller %v because it has no labels", rc.Name)
return
}
@ -233,14 +233,14 @@ func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationControl
labelSet := labels.Set(d.Spec.Selector)
selector = labels.Set(d.Spec.Selector).AsSelector()
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(rc.Labels)) {
continue
}
deployments = append(deployments, d)
}
if len(deployments) == 0 {
err = fmt.Errorf("Could not find deployments set for replication controller %s in namespace %s with labels: %v", rc.Name, rc.Namespace, rc.Labels)
err = fmt.Errorf("could not find deployments set for replication controller %s in namespace %s with labels: %v", rc.Name, rc.Namespace, rc.Labels)
}
return
}
@ -275,7 +275,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex
var daemonSet extensions.DaemonSet
if len(pod.Labels) == 0 {
err = fmt.Errorf("No daemon sets found for pod %v because it has no labels", pod.Name)
err = fmt.Errorf("no daemon sets found for pod %v because it has no labels", pod.Name)
return
}
@ -297,7 +297,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex
daemonSets = append(daemonSets, daemonSet)
}
if len(daemonSets) == 0 {
err = fmt.Errorf("Could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
err = fmt.Errorf("could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
@ -337,7 +337,7 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Serv
}
}
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
@ -364,7 +364,7 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E
return ep, nil
}
}
err = fmt.Errorf("Could not find endpoints for service: %v", svc.Name)
err = fmt.Errorf("could not find endpoints for service: %v", svc.Name)
return
}
@ -396,7 +396,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err
var job extensions.Job
if len(pod.Labels) == 0 {
err = fmt.Errorf("No jobs found for pod %v because it has no labels", pod.Name)
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)
return
}
@ -413,7 +413,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err
jobs = append(jobs, job)
}
if len(jobs) == 0 {
err = fmt.Errorf("Could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

View File

@ -223,34 +223,6 @@ func NewControllerExpectations() *ControllerExpectations {
return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)}
}
// RCControlInterface is an interface that knows how to add or delete
// replication controllers, as well as increment or decrement them. It is used
// by the deployment controller to ease testing of actions that it takes.
// @TODO Decide if we want to use this or use the kube client directly.
type RCControlInterface interface {
// CreateRC creates a new replication controller.
CreateRC(controller *api.ReplicationController) (*api.ReplicationController, error)
// ChangeReplicaCount increments or decrements a replication controller by
// a given amount. For decrementing, use negative numbers.
AdjustReplicaCount(controller *api.ReplicationController, count int) error
}
// RealPodControl is the default implementation of PodControllerInterface.
type RealRCControl struct {
KubeClient client.Interface
Recorder record.EventRecorder
}
func (r RealRCControl) CreateRC(controller *api.ReplicationController) (*api.ReplicationController, error) {
return r.KubeClient.ReplicationControllers(controller.Namespace).Create(controller)
}
func (r RealRCControl) AdjustReplicaCount(controller *api.ReplicationController, count int) error {
controller.Spec.Replicas += count
_, err := r.KubeClient.ReplicationControllers(controller.Namespace).Update(controller)
return err
}
// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
@ -262,7 +234,7 @@ type PodControlInterface interface {
DeletePod(namespace string, podID string) error
}
// RealPodControl is the default implementation of PodControllerInterface.
// RealPodControl is the default implementation of PodControlInterface.
type RealPodControl struct {
KubeClient client.Interface
Recorder record.EventRecorder

View File

@ -24,13 +24,13 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
@ -40,27 +40,18 @@ import (
)
const (
// We'll attempt to recompute the required replicas of all deployments
// that have fulfilled their expectations at least this often. This recomputation
// happens based on contents in the local caches.
// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
// of all deployments that have fulfilled their expectations at least this often.
// This recomputation happens based on contents in the local caches.
FullDeploymentResyncPeriod = 30 * time.Second
// We'll keep replication controller watches open up to this long. In the unlikely case
// that a watch misdelivers info about an RC, it'll take this long for
// that mistake to be rectified.
ControllerRelistPeriod = 5 * time.Minute
// We'll keep pod watches open up to this long. In the unlikely case
// that a watch misdelivers info about a pod, it'll take this long for
// that mistake to be rectified.
PodRelistPeriod = 5 * time.Minute
)
// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running rcs and pods.
type DeploymentController struct {
client client.Interface
expClient client.ExtensionsInterface
eventRecorder record.EventRecorder
rcControl controller.RCControlInterface
// To allow injection of syncDeployment for testing.
syncHandler func(dKey string) error
@ -88,7 +79,8 @@ type DeploymentController struct {
queue *workqueue.Type
}
func NewDeploymentController(client client.Interface) *DeploymentController {
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(client client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(client.Events(""))
@ -103,10 +95,10 @@ func NewDeploymentController(client client.Interface) *DeploymentController {
dc.dStore.Store, dc.dController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return dc.expClient.Deployments(api.NamespaceAll).List(labels.Everything(), fields.Everything())
return dc.expClient.Deployments(api.NamespaceAll).List(unversioned.ListOptions{})
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.expClient.Deployments(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options)
WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
return dc.expClient.Deployments(api.NamespaceAll).Watch(options)
},
},
&extensions.Deployment{},
@ -118,8 +110,6 @@ func NewDeploymentController(client client.Interface) *DeploymentController {
dc.enqueueDeployment(cur)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the deployment.
DeleteFunc: dc.enqueueDeployment,
},
)
@ -127,14 +117,14 @@ func NewDeploymentController(client client.Interface) *DeploymentController {
dc.rcStore.Store, dc.rcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return dc.client.ReplicationControllers(api.NamespaceAll).List(labels.Everything(), fields.Everything())
return dc.client.ReplicationControllers(api.NamespaceAll).List(unversioned.ListOptions{})
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options)
WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
return dc.client.ReplicationControllers(api.NamespaceAll).Watch(options)
},
},
&api.ReplicationController{},
ControllerRelistPeriod,
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: dc.addRC,
UpdateFunc: dc.updateRC,
@ -148,24 +138,44 @@ func NewDeploymentController(client client.Interface) *DeploymentController {
dc.podStore.Store, dc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return dc.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
return dc.client.Pods(api.NamespaceAll).List(unversioned.ListOptions{})
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options)
WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
return dc.client.Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
PodRelistPeriod,
framework.ResourceEventHandlerFuncs{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
// When pod updates (becomes ready), we need to enqueue deployment
UpdateFunc: dc.updatePod,
},
)
dc.syncHandler = dc.syncDeployment
dc.rcStoreSynced = dc.rcController.HasSynced
dc.podStoreSynced = dc.podController.HasSynced
return dc
}
// When an RC is created, enqueue the deployment that manages it.
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go dc.dController.Run(stopCh)
go dc.rcController.Run(stopCh)
go dc.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down deployment controller")
dc.queue.ShutDown()
}
// addRC enqueues the deployment that manages an RC when the RC is created.
func (dc *DeploymentController) addRC(obj interface{}) {
rc := obj.(*api.ReplicationController)
glog.V(4).Infof("Replication controller %s added.", rc.Name)
if d := dc.getDeploymentForRC(rc); rc != nil {
dc.enqueueDeployment(d)
}
@ -175,8 +185,8 @@ func (dc *DeploymentController) addRC(obj interface{}) {
// TODO: Surface that we are ignoring multiple deployments for a given controller.
func (dc *DeploymentController) getDeploymentForRC(rc *api.ReplicationController) *extensions.Deployment {
deployments, err := dc.dStore.GetDeploymentsForRC(rc)
if err != nil {
glog.V(4).Infof("No deployments found for replication controller %v, deployment controller will avoid syncing", rc.Name)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for replication controller %v, deployment controller will avoid syncing.", err, rc.Name)
return nil
}
// Because all RC's belonging to a deployment should have a unique label key,
@ -187,9 +197,9 @@ func (dc *DeploymentController) getDeploymentForRC(rc *api.ReplicationController
return &deployments[0]
}
// When a controller is updated, figure out what deployment/s manage it and wake them
// up. If the labels of the controller have changed we need to awaken both the old
// and new deployments. old and cur must be *api.ReplicationController types.
// updateRC figures out what deployment(s) manage an RC when the RC is updated and
// wake them up. If the anything of the RCs have changed, we need to awaken both
// the old and new deployments. old and cur must be *api.ReplicationController types.
func (dc *DeploymentController) updateRC(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
// A periodic relist will send update events for all known controllers.
@ -197,13 +207,13 @@ func (dc *DeploymentController) updateRC(old, cur interface{}) {
}
// TODO: Write a unittest for this case
curRC := cur.(*api.ReplicationController)
glog.V(4).Infof("Replication controller %s updated.", curRC.Name)
if d := dc.getDeploymentForRC(curRC); d != nil {
dc.enqueueDeployment(d)
}
// A number of things could affect the old deployment: labels changing,
// pod template changing, etc.
oldRC := old.(*api.ReplicationController)
// TODO: Is this the right way to check this, or is checking names sufficient?
if !api.Semantic.DeepEqual(oldRC, curRC) {
if oldD := dc.getDeploymentForRC(oldRC); oldD != nil {
dc.enqueueDeployment(oldD)
@ -211,11 +221,12 @@ func (dc *DeploymentController) updateRC(old, cur interface{}) {
}
}
// When a controller is deleted, enqueue the deployment that manages it.
// deleteRC enqueues the deployment that manages an RC when the RC is deleted.
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown
// marker item.
func (dc *DeploymentController) deleteRC(obj interface{}) {
rc, ok := obj.(*api.ReplicationController)
glog.V(4).Infof("Replication controller %s deleted.", rc.Name)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
@ -238,6 +249,44 @@ func (dc *DeploymentController) deleteRC(obj interface{}) {
}
}
// getDeploymentForPod returns the deployment managing the RC that manages the given Pod.
// TODO: Surface that we are ignoring multiple deployments for a given Pod.
func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment {
rcs, err := dc.rcStore.GetPodControllers(pod)
if err != nil {
glog.V(4).Infof("Error: %v. No replication controllers found for pod %v, deployment controller will avoid syncing.", err, pod.Name)
return nil
}
for _, rc := range rcs {
deployments, err := dc.dStore.GetDeploymentsForRC(&rc)
if err == nil && len(deployments) > 0 {
return &deployments[0]
}
}
glog.V(4).Infof("No deployments found for pod %v, deployment controller will avoid syncing.", pod.Name)
return nil
}
// updatePod figures out what deployment(s) manage the RC that manages the Pod when the Pod
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
// the old and new deployments. old and cur must be *api.Pod types.
func (dc *DeploymentController) updatePod(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
return
}
curPod := cur.(*api.Pod)
glog.V(4).Infof("Pod %s updated.", curPod.Name)
if d := dc.getDeploymentForPod(curPod); d != nil {
dc.enqueueDeployment(d)
}
oldPod := old.(*api.Pod)
if !api.Semantic.DeepEqual(oldPod, curPod) {
if oldD := dc.getDeploymentForPod(oldPod); oldD != nil {
dc.enqueueDeployment(oldD)
}
}
}
// obj could be an *api.Deployment, or a DeletionFinalStateUnknown marker item.
func (dc *DeploymentController) enqueueDeployment(obj interface{}) {
key, err := controller.KeyFunc(obj)
@ -255,19 +304,6 @@ func (dc *DeploymentController) enqueueDeployment(obj interface{}) {
dc.queue.Add(key)
}
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go dc.dController.Run(stopCh)
go dc.rcController.Run(stopCh)
go dc.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down deployment controller")
dc.queue.ShutDown()
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
@ -286,6 +322,8 @@ func (dc *DeploymentController) worker() {
}
}
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
startTime := time.Now()
defer func() {
@ -293,15 +331,15 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}()
obj, exists, err := dc.dStore.Store.GetByKey(key)
if !exists {
glog.Infof("Deployment has been deleted %v", key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
dc.queue.Add(key)
return err
}
if !exists {
glog.Infof("Deployment has been deleted %v", key)
return nil
}
d := *obj.(*extensions.Deployment)
switch d.Spec.Strategy.Type {
case extensions.RecreateDeploymentStrategyType:
@ -309,7 +347,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
case extensions.RollingUpdateDeploymentStrategyType:
return dc.syncRollingUpdateDeployment(d)
}
return fmt.Errorf("Unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error {
@ -349,14 +387,36 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension
// Update DeploymentStatus
return dc.updateDeploymentStatus(allRCs, newRC, deployment)
}
// Sync deployment status
totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs)
updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC})
if deployment.Status.Replicas != totalReplicas || deployment.Status.UpdatedReplicas != updatedReplicas {
return dc.updateDeploymentStatus(allRCs, newRC, deployment)
}
// TODO: raise an event, neither scaled up nor down.
return nil
}
func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
return deploymentutil.GetOldRCsFromLists(deployment, dc.client,
func(namespace string, options unversioned.ListOptions) (*api.PodList, error) {
podList, err := dc.podStore.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector))
return &podList, err
},
func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) {
return dc.rcStore.List()
})
}
// Returns an RC that matches the intent of the given deployment.
// It creates a new RC if required.
func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) {
existingNewRC, err := deploymentutil.GetNewRC(deployment, dc.client)
existingNewRC, err := deploymentutil.GetNewRCFromList(deployment, dc.client,
func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) {
return dc.rcStore.List()
})
if err != nil || existingNewRC != nil {
return existingNewRC, err
}
@ -385,40 +445,6 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
return createdRC, nil
}
func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
// TODO: (janet) HEAD >>> return deploymentutil.GetOldRCs(deployment, d.client)
namespace := deployment.ObjectMeta.Namespace
// 1. Find all pods whose labels match deployment.Spec.Selector
podList, err := dc.podStore.Pods(api.NamespaceAll).List(labels.SelectorFromSet(deployment.Spec.Selector))
if err != nil {
return nil, fmt.Errorf("error listing pods: %v", err)
}
// 2. Find the corresponding RCs for pods in podList.
oldRCs := map[string]api.ReplicationController{}
rcList, err := dc.rcStore.List()
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
}
for _, pod := range podList.Items {
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
for _, rc := range rcList {
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
if rcLabelsSelector.Matches(podLabelsSelector) {
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
if api.Semantic.DeepEqual(rc.Spec.Template, deploymentutil.GetNewRCTemplate(deployment)) {
continue
}
oldRCs[rc.ObjectMeta.Name] = rc
}
}
}
rcSlice := []*api.ReplicationController{}
for _, value := range oldRCs {
rcSlice = append(rcSlice, &value)
}
return rcSlice, nil
}
func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
if newRC.Spec.Replicas == deployment.Spec.Replicas {
// Scaling not required.
@ -510,7 +536,7 @@ func (dc *DeploymentController) updateDeploymentStatus(allRCs []*api.Replication
Replicas: totalReplicas,
UpdatedReplicas: updatedReplicas,
}
_, err := dc.client.Extensions().Deployments(api.NamespaceAll).UpdateStatus(&newDeployment)
_, err := dc.expClient.Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
return err
}
@ -521,7 +547,7 @@ func (dc *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationControl
}
newRC, err := dc.scaleRC(rc, newScale)
if err == nil {
d.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale)
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale)
}
return newRC, err
}
@ -534,5 +560,5 @@ func (dc *DeploymentController) scaleRC(rc *api.ReplicationController, newScale
func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
// TODO: Using client for now, update to use store when it is ready.
return dc.client.Extensions().Deployments(api.NamespaceAll).Update(deployment)
return dc.expClient.Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
}

View File

@ -364,7 +364,7 @@ func newFixture(t *testing.T) *fixture {
func (f *fixture) run(deploymentName string) {
f.client = testclient.NewSimpleFake(f.objects)
c := NewDeploymentController(f.client)
c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc)
c.rcStoreSynced = alwaysReady
c.podStoreSynced = alwaysReady
for _, d := range f.dStore {

View File

@ -22,33 +22,46 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
)
// Returns the old RCs targetted by the given Deployment.
// GetOldRCs returns the old RCs targeted by the given Deployment; get PodList and RCList from client interface.
func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, error) {
return GetOldRCsFromLists(deployment, c,
func(namespace string, options unversioned.ListOptions) (*api.PodList, error) {
return c.Pods(namespace).List(options)
},
func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) {
rcList, err := c.ReplicationControllers(namespace).List(options)
return rcList.Items, err
})
}
// GetOldRCsFromLists returns the old RCs targeted by the given Deployment; get PodList and RCList with input functions.
func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, unversioned.ListOptions) (*api.PodList, error), getRcList func(string, unversioned.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, error) {
namespace := deployment.ObjectMeta.Namespace
// 1. Find all pods whose labels match deployment.Spec.Selector
selector := labels.SelectorFromSet(deployment.Spec.Selector)
options := api.ListOptions{LabelSelector: selector}
podList, err := c.Pods(namespace).List(options)
podList, err := getPodList(namespace, options)
if err != nil {
return nil, fmt.Errorf("error listing pods: %v", err)
}
// 2. Find the corresponding RCs for pods in podList.
// TODO: Right now we list all RCs and then filter. We should add an API for this.
oldRCs := map[string]api.ReplicationController{}
rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{})
rcList, err := getRcList(namespace, api.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
}
newRCTemplate := GetNewRCTemplate(deployment)
for _, pod := range podList.Items {
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
for _, rc := range rcList.Items {
for _, rc := range rcList {
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
if rcLabelsSelector.Matches(podLabelsSelector) {
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
@ -70,20 +83,30 @@ func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.Rep
return requiredRCs, nil
}
// Returns an RC that matches the intent of the given deployment.
// GetNewRC returns an RC that matches the intent of the given deployment; get RCList from client interface.
// Returns nil if the new RC doesnt exist yet.
func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.ReplicationController, error) {
return GetNewRCFromList(deployment, c,
func(namespace string, options unversioned.ListOptions) ([]api.ReplicationController, error) {
rcList, err := c.ReplicationControllers(namespace).List(options)
return rcList.Items, err
})
}
// GetNewRCFromList returns an RC that matches the intent of the given deployment; get RCList with the input function.
// Returns nil if the new RC doesnt exist yet.
func GetNewRCFromList(deployment extensions.Deployment, c client.Interface, getRcList func(string, unversioned.ListOptions) ([]api.ReplicationController, error)) (*api.ReplicationController, error) {
namespace := deployment.ObjectMeta.Namespace
rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{})
rcList, err := getRcList(namespace, api.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
}
newRCTemplate := GetNewRCTemplate(deployment)
for i := range rcList.Items {
if api.Semantic.DeepEqual(rcList.Items[i].Spec.Template, &newRCTemplate) {
for i := range rcList {
if api.Semantic.DeepEqual(rcList[i].Spec.Template, &newRCTemplate) {
// This is the new RC.
return &rcList.Items[i], nil
return &rcList[i], nil
}
}
// new RC does not exist.

View File

@ -72,8 +72,14 @@ func testNewDeployment(f *Framework) {
})
Expect(err).NotTo(HaveOccurred())
defer func() {
deployment, err := c.Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
Logf("deleting deployment %s", deploymentName)
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
// TODO: remove this once we can delete rcs with deployment
newRC, err := deploymentutil.GetNewRC(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred())
}()
// Check that deployment is created fine.
deployment, err := c.Deployments(ns).Get(deploymentName)
@ -166,8 +172,14 @@ func testRollingUpdateDeployment(f *Framework) {
_, err = c.Deployments(ns).Create(&newDeployment)
Expect(err).NotTo(HaveOccurred())
defer func() {
deployment, err := c.Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
Logf("deleting deployment %s", deploymentName)
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
// TODO: remove this once we can delete rcs with deployment
newRC, err := deploymentutil.GetNewRC(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred())
}()
err = waitForDeploymentStatus(c, ns, deploymentName, 3, 2, 4, 0)
@ -247,8 +259,14 @@ func testRollingUpdateDeploymentEvents(f *Framework) {
_, err = c.Deployments(ns).Create(&newDeployment)
Expect(err).NotTo(HaveOccurred())
defer func() {
deployment, err := c.Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
Logf("deleting deployment %s", deploymentName)
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
// TODO: remove this once we can delete rcs with deployment
newRC, err := deploymentutil.GetNewRC(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred())
}()
err = waitForDeploymentStatus(c, ns, deploymentName, 1, 0, 2, 0)