diff --git a/federation/cmd/federation-controller-manager/app/BUILD b/federation/cmd/federation-controller-manager/app/BUILD index 2be5e85b6d..1069570bbd 100644 --- a/federation/cmd/federation-controller-manager/app/BUILD +++ b/federation/cmd/federation-controller-manager/app/BUILD @@ -23,7 +23,6 @@ go_library( "//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library", "//federation/pkg/federatedtypes:go_default_library", "//federation/pkg/federation-controller/cluster:go_default_library", - "//federation/pkg/federation-controller/deployment:go_default_library", "//federation/pkg/federation-controller/ingress:go_default_library", "//federation/pkg/federation-controller/namespace:go_default_library", "//federation/pkg/federation-controller/service:go_default_library", diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 17b37d4004..1f8b41ec9b 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -37,7 +37,6 @@ import ( "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" "k8s.io/kubernetes/federation/pkg/federatedtypes" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" - deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment" ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service" @@ -166,15 +165,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err } } - if controllerEnabled(s.Controllers, serverResources, deploymentcontroller.ControllerName, deploymentcontroller.RequiredResources, true) { - glog.V(3).Infof("Loading client config for deployment controller %q", deploymentcontroller.UserAgentName) - deploymentClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, deploymentcontroller.UserAgentName)) - deploymentController := deploymentcontroller.NewDeploymentController(deploymentClientset) - glog.V(3).Infof("Running deployment controller") - // TODO: rename s.ConcurrentReplicaSetSyncs - go deploymentController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) - } - if controllerEnabled(s.Controllers, serverResources, ingresscontroller.ControllerName, ingresscontroller.RequiredResources, true) { glog.V(3).Infof("Loading client config for ingress controller %q", ingresscontroller.UserAgentName) ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, ingresscontroller.UserAgentName)) diff --git a/federation/pkg/federatedtypes/BUILD b/federation/pkg/federatedtypes/BUILD index 3391d59746..8e1f8e2fbc 100644 --- a/federation/pkg/federatedtypes/BUILD +++ b/federation/pkg/federatedtypes/BUILD @@ -14,6 +14,7 @@ go_library( "adapter.go", "configmap.go", "daemonset.go", + "deployment.go", "registry.go", "replicaset.go", "scheduling.go", @@ -32,6 +33,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", @@ -58,7 +60,7 @@ filegroup( go_test( name = "go_default_test", - srcs = ["replicaset_test.go"], + srcs = ["scheduling_test.go"], library = ":go_default_library", tags = ["automanaged"], deps = [ @@ -66,5 +68,6 @@ go_test( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", ], ) diff --git a/federation/pkg/federatedtypes/deployment.go b/federation/pkg/federatedtypes/deployment.go new file mode 100644 index 0000000000..72a007126a --- /dev/null +++ b/federation/pkg/federatedtypes/deployment.go @@ -0,0 +1,188 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package federatedtypes + +import ( + apiv1 "k8s.io/api/core/v1" + extensionsv1 "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" + fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +const ( + DeploymentKind = "deployment" + DeploymentControllerName = "deployments" + FedDeploymentPreferencesAnnotation = "federation.kubernetes.io/deployment-preferences" +) + +func init() { + RegisterFederatedType(DeploymentKind, DeploymentControllerName, []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource(DeploymentControllerName)}, NewDeploymentAdapter) +} + +type DeploymentAdapter struct { + *schedulingAdapter + client federationclientset.Interface +} + +func NewDeploymentAdapter(client federationclientset.Interface) FederatedTypeAdapter { + schedulingAdapter := schedulingAdapter{ + preferencesAnnotationName: FedDeploymentPreferencesAnnotation, + updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { + deployment := obj.(*extensionsv1.Deployment) + if status.Replicas != deployment.Status.Replicas || status.UpdatedReplicas != deployment.Status.UpdatedReplicas || + status.ReadyReplicas != deployment.Status.ReadyReplicas || status.AvailableReplicas != deployment.Status.AvailableReplicas { + deployment.Status = extensionsv1.DeploymentStatus{ + Replicas: status.Replicas, + UpdatedReplicas: status.UpdatedReplicas, + ReadyReplicas: status.ReadyReplicas, + AvailableReplicas: status.AvailableReplicas, + } + _, err := client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) + return err + } + return nil + }, + } + + return &DeploymentAdapter{&schedulingAdapter, client} +} + +func (a *DeploymentAdapter) Kind() string { + return DeploymentKind +} + +func (a *DeploymentAdapter) ObjectType() pkgruntime.Object { + return &extensionsv1.Deployment{} +} + +func (a *DeploymentAdapter) IsExpectedType(obj interface{}) bool { + _, ok := obj.(*extensionsv1.Deployment) + return ok +} + +func (a *DeploymentAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { + deployment := obj.(*extensionsv1.Deployment) + return fedutil.DeepCopyDeployment(deployment) +} + +func (a *DeploymentAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { + deployment1 := obj1.(*extensionsv1.Deployment) + deployment2 := obj2.(*extensionsv1.Deployment) + return fedutil.DeploymentEquivalent(deployment1, deployment2) +} + +func (a *DeploymentAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { + deployment := obj.(*extensionsv1.Deployment) + return types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name} +} + +func (a *DeploymentAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta { + return &obj.(*extensionsv1.Deployment).ObjectMeta +} + +func (a *DeploymentAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) { + deployment := obj.(*extensionsv1.Deployment) + return a.client.Extensions().Deployments(deployment.Namespace).Create(deployment) +} + +func (a *DeploymentAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error { + return a.client.Extensions().Deployments(namespacedName.Namespace).Delete(namespacedName.Name, options) +} + +func (a *DeploymentAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return a.client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *DeploymentAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return a.client.Extensions().Deployments(namespace).List(options) +} + +func (a *DeploymentAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) { + deployment := obj.(*extensionsv1.Deployment) + return a.client.Extensions().Deployments(deployment.Namespace).Update(deployment) +} + +func (a *DeploymentAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) { + return a.client.Extensions().Deployments(namespace).Watch(options) +} + +func (a *DeploymentAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + deployment := obj.(*extensionsv1.Deployment) + return client.Extensions().Deployments(deployment.Namespace).Create(deployment) +} + +func (a *DeploymentAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error { + return client.Extensions().Deployments(nsName.Namespace).Delete(nsName.Name, options) +} + +func (a *DeploymentAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) { + return client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{}) +} + +func (a *DeploymentAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) { + return client.Extensions().Deployments(namespace).List(options) +} + +func (a *DeploymentAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) { + deployment := obj.(*extensionsv1.Deployment) + return client.Extensions().Deployments(deployment.Namespace).Update(deployment) +} + +func (a *DeploymentAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) { + return client.Extensions().Deployments(namespace).Watch(options) +} + +func (a *DeploymentAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool { + deployment1 := obj1.(*extensionsv1.Deployment) + deployment2 := a.Copy(obj2).(*extensionsv1.Deployment) + deployment2.Spec.Replicas = deployment1.Spec.Replicas + return fedutil.DeploymentEquivalent(deployment1, deployment2) +} + +func (a *DeploymentAdapter) NewTestObject(namespace string) pkgruntime.Object { + replicas := int32(3) + zero := int64(0) + return &extensionsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-deployment-", + Namespace: namespace, + }, + Spec: extensionsv1.DeploymentSpec{ + Replicas: &replicas, + Template: apiv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: &zero, + Containers: []apiv1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } +} diff --git a/federation/pkg/federatedtypes/replicaset.go b/federation/pkg/federatedtypes/replicaset.go index 85f18597a7..35c53306ab 100644 --- a/federation/pkg/federatedtypes/replicaset.go +++ b/federation/pkg/federatedtypes/replicaset.go @@ -17,12 +17,6 @@ limitations under the License. package federatedtypes import ( - "bytes" - "fmt" - "sort" - "time" - - "github.com/golang/glog" apiv1 "k8s.io/api/core/v1" extensionsv1 "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,13 +24,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - fedapi "k8s.io/kubernetes/federation/apis/federation" - fedv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" ) @@ -46,28 +35,35 @@ const ( FedReplicaSetPreferencesAnnotation = "federation.kubernetes.io/replica-set-preferences" ) -type replicaSetUserInfo struct { - scheduleResult (map[string]int64) - fedStatus *extensionsv1.ReplicaSetStatus -} - func init() { RegisterFederatedType(ReplicaSetKind, ReplicaSetControllerName, []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource(ReplicaSetControllerName)}, NewReplicaSetAdapter) } type ReplicaSetAdapter struct { - client federationclientset.Interface - defaultPlanner *planner.Planner + *schedulingAdapter + client federationclientset.Interface } func NewReplicaSetAdapter(client federationclientset.Interface) FederatedTypeAdapter { - return &ReplicaSetAdapter{ - client: client, - defaultPlanner: planner.NewPlanner(&fedapi.ReplicaAllocationPreferences{ - Clusters: map[string]fedapi.ClusterPreferences{ - "*": {Weight: 1}, - }, - })} + schedulingAdapter := schedulingAdapter{ + preferencesAnnotationName: FedReplicaSetPreferencesAnnotation, + updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error { + rs := obj.(*extensionsv1.ReplicaSet) + if status.Replicas != rs.Status.Replicas || status.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas || + status.ReadyReplicas != rs.Status.ReadyReplicas || status.AvailableReplicas != rs.Status.AvailableReplicas { + rs.Status = extensionsv1.ReplicaSetStatus{ + Replicas: status.Replicas, + FullyLabeledReplicas: status.Replicas, + ReadyReplicas: status.ReadyReplicas, + AvailableReplicas: status.AvailableReplicas, + } + _, err := client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs) + return err + } + return nil + }, + } + return &ReplicaSetAdapter{&schedulingAdapter, client} } func (a *ReplicaSetAdapter) Kind() string { @@ -92,9 +88,7 @@ func (a *ReplicaSetAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object { } func (a *ReplicaSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool { - replicaset1 := obj1.(*extensionsv1.ReplicaSet) - replicaset2 := obj2.(*extensionsv1.ReplicaSet) - return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2) + return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2) } func (a *ReplicaSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName { @@ -158,79 +152,6 @@ func (a *ReplicaSetAdapter) ClusterWatch(client kubeclientset.Interface, namespa return client.Extensions().ReplicaSets(namespace).Watch(options) } -func (a *ReplicaSetAdapter) IsSchedulingAdapter() bool { - return true -} - -func (a *ReplicaSetAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*fedv1beta1.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) { - var clusterNames []string - for _, cluster := range clusters { - clusterNames = append(clusterNames, cluster.Name) - } - - // Schedule the pods across the existing clusters. - replicaSetGetter := func(clusterName, key string) (interface{}, bool, error) { - return informer.GetTargetStore().GetByKey(clusterName, key) - } - podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) { - clientset, err := informer.GetClientsetForCluster(clusterName) - if err != nil { - return nil, err - } - selector, err := metav1.LabelSelectorAsSelector(replicaSet.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid selector: %v", err) - } - return clientset.Core().Pods(replicaSet.ObjectMeta.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) - } - current, estimatedCapacity, err := clustersReplicaState(clusterNames, key, replicaSetGetter, podsGetter) - if err != nil { - return nil, err - } - rs := obj.(*extensionsv1.ReplicaSet) - return &SchedulingInfo{ - Schedule: a.schedule(rs, clusterNames, current, estimatedCapacity), - Status: SchedulingStatus{}, - }, nil -} - -func (a *ReplicaSetAdapter) ScheduleObject(cluster *fedv1beta1.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) { - rs := federationObjCopy.(*extensionsv1.ReplicaSet) - - replicas, ok := schedulingInfo.Schedule[cluster.Name] - if !ok { - replicas = 0 - } - specReplicas := int32(replicas) - rs.Spec.Replicas = &specReplicas - - if clusterObj != nil { - clusterRs := clusterObj.(*extensionsv1.ReplicaSet) - schedulingInfo.Status.Replicas += clusterRs.Status.Replicas - schedulingInfo.Status.FullyLabeledReplicas += clusterRs.Status.FullyLabeledReplicas - schedulingInfo.Status.ReadyReplicas += clusterRs.Status.ReadyReplicas - schedulingInfo.Status.AvailableReplicas += clusterRs.Status.AvailableReplicas - } - return rs, replicas > 0, nil -} - -func (a *ReplicaSetAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error { - rs := obj.(*extensionsv1.ReplicaSet) - - if status.Replicas != rs.Status.Replicas || status.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas || - status.ReadyReplicas != rs.Status.ReadyReplicas || status.AvailableReplicas != rs.Status.AvailableReplicas { - rs.Status = extensionsv1.ReplicaSetStatus{ - Replicas: status.Replicas, - FullyLabeledReplicas: status.Replicas, - ReadyReplicas: status.ReadyReplicas, - AvailableReplicas: status.AvailableReplicas, - } - _, err := a.client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs) - return err - } - return nil -} - func (a *ReplicaSetAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool { replicaset1 := obj1.(*extensionsv1.ReplicaSet) replicaset2 := a.Copy(obj2).(*extensionsv1.ReplicaSet) @@ -238,93 +159,6 @@ func (a *ReplicaSetAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Obj return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2) } -func (a *ReplicaSetAdapter) schedule(frs *extensionsv1.ReplicaSet, clusterNames []string, - current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { - // TODO: integrate real scheduler - - plnr := a.defaultPlanner - frsPref, err := replicapreferences.GetAllocationPreferences(frs, FedReplicaSetPreferencesAnnotation) - if err != nil { - glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err) - } - if frsPref != nil { // create a new planner if user specified a preference - plnr = planner.NewPlanner(frsPref) - } - - replicas := int64(*frs.Spec.Replicas) - scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity, - frs.Namespace+"/"+frs.Name) - // Ensure that the schedule being returned has scheduling instructions for - // all of the clusters that currently have replicas. A cluster that was in - // the previous schedule but is not in the new schedule should have zero - // replicas. - result := make(map[string]int64) - for clusterName := range current { - result[clusterName] = 0 - } - for clusterName, replicas := range scheduleResult { - result[clusterName] = replicas - } - for clusterName, replicas := range overflow { - result[clusterName] += replicas - } - if glog.V(4) { - buf := bytes.NewBufferString(fmt.Sprintf("Schedule - ReplicaSet: %s/%s\n", frs.Namespace, frs.Name)) - sort.Strings(clusterNames) - for _, clusterName := range clusterNames { - cur := current[clusterName] - target := scheduleResult[clusterName] - fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target) - if over, found := overflow[clusterName]; found { - fmt.Fprintf(buf, " overflow: %d", over) - } - if capacity, found := estimatedCapacity[clusterName]; found { - fmt.Fprintf(buf, " capacity: %d", capacity) - } - fmt.Fprintf(buf, "\n") - } - glog.V(4).Infof(buf.String()) - } - return result -} - -// clusterReplicaState returns information about the scheduling state of the pods running in the federated clusters. -func clustersReplicaState( - clusterNames []string, - replicaSetKey string, - replicaSetGetter func(clusterName string, key string) (interface{}, bool, error), - podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error)) (current map[string]int64, estimatedCapacity map[string]int64, err error) { - - current = make(map[string]int64) - estimatedCapacity = make(map[string]int64) - - for _, clusterName := range clusterNames { - rsObj, exists, err := replicaSetGetter(clusterName, replicaSetKey) - if err != nil { - return nil, nil, err - } - if !exists { - continue - } - rs := rsObj.(*extensionsv1.ReplicaSet) - if int32(*rs.Spec.Replicas) == rs.Status.ReadyReplicas { - current[clusterName] = int64(rs.Status.ReadyReplicas) - } else { - pods, err := podsGetter(clusterName, rs) - if err != nil { - return nil, nil, err - } - podStatus := podanalyzer.AnalyzePods(pods, time.Now()) - current[clusterName] = int64(podStatus.RunningAndReady) // include pending as well? - unschedulable := int64(podStatus.Unschedulable) - if unschedulable > 0 { - estimatedCapacity[clusterName] = int64(*rs.Spec.Replicas) - unschedulable - } - } - } - return current, estimatedCapacity, nil -} - func (a *ReplicaSetAdapter) NewTestObject(namespace string) pkgruntime.Object { replicas := int32(3) zero := int64(0) diff --git a/federation/pkg/federatedtypes/scheduling.go b/federation/pkg/federatedtypes/scheduling.go index c5d0b93a84..7183479c29 100644 --- a/federation/pkg/federatedtypes/scheduling.go +++ b/federation/pkg/federatedtypes/scheduling.go @@ -17,15 +17,31 @@ limitations under the License. package federatedtypes import ( + "bytes" + "fmt" + "reflect" + "sort" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgruntime "k8s.io/apimachinery/pkg/runtime" + fedapi "k8s.io/kubernetes/federation/apis/federation" federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" + + "github.com/golang/glog" ) // SchedulingStatus contains the status of the objects that are being // scheduled into joined clusters. type SchedulingStatus struct { Replicas int32 + UpdatedReplicas int32 FullyLabeledReplicas int32 ReadyReplicas int32 AvailableReplicas int32 @@ -49,3 +65,171 @@ type SchedulingAdapter interface { // equivalent ignoring differences due to scheduling. EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool } + +// schedulingAdapter is meant to be embedded in other type adapters that require +// workload scheduling. +type schedulingAdapter struct { + preferencesAnnotationName string + updateStatusFunc func(pkgruntime.Object, SchedulingStatus) error +} + +func (a *schedulingAdapter) IsSchedulingAdapter() bool { + return true +} + +func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) { + var clusterNames []string + for _, cluster := range clusters { + clusterNames = append(clusterNames, cluster.Name) + } + + // Schedule the pods across the existing clusters. + objectGetter := func(clusterName, key string) (interface{}, bool, error) { + return informer.GetTargetStore().GetByKey(clusterName, key) + } + podsGetter := func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error) { + clientset, err := informer.GetClientsetForCluster(clusterName) + if err != nil { + return nil, err + } + selectorObj := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Selector").Interface().(*metav1.LabelSelector) + selector, err := metav1.LabelSelectorAsSelector(selectorObj) + if err != nil { + return nil, fmt.Errorf("invalid selector: %v", err) + } + metadata, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + return clientset.Core().Pods(metadata.GetNamespace()).List(metav1.ListOptions{LabelSelector: selector.String()}) + } + currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter) + if err != nil { + return nil, err + } + + fedPref, err := replicapreferences.GetAllocationPreferences(obj, a.preferencesAnnotationName) + if err != nil { + glog.Info("Invalid workload-type specific preference, using default. object: %v, err: %v", obj, err) + } + if fedPref == nil { + fedPref = &fedapi.ReplicaAllocationPreferences{ + Clusters: map[string]fedapi.ClusterPreferences{ + "*": {Weight: 1}, + }, + } + } + + plnr := planner.NewPlanner(fedPref) + + return &SchedulingInfo{ + Schedule: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity), + Status: SchedulingStatus{}, + }, nil +} + +func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) { + replicas, ok := schedulingInfo.Schedule[cluster.Name] + if !ok { + replicas = 0 + } + + specReplicas := int32(replicas) + reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas)) + + if clusterObj != nil { + schedulingStatusVal := reflect.ValueOf(schedulingInfo).Elem().FieldByName("Status") + objStatusVal := reflect.ValueOf(clusterObj).Elem().FieldByName("Status") + for i := 0; i < schedulingStatusVal.NumField(); i++ { + schedulingStatusField := schedulingStatusVal.Field(i) + schedulingStatusFieldName := schedulingStatusVal.Type().Field(i).Name + objStatusField := objStatusVal.FieldByName(schedulingStatusFieldName) + if objStatusField.IsValid() { + current := schedulingStatusField.Int() + additional := objStatusField.Int() + schedulingStatusField.SetInt(current + additional) + } + } + } + return federationObjCopy, replicas > 0, nil +} + +func (a *schedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error { + return a.updateStatusFunc(obj, status) +} + +func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { + // TODO: integrate real scheduler + replicas := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int() + scheduleResult, overflow := planner.Plan(replicas, clusterNames, currentReplicasPerCluster, estimatedCapacity, key) + + // Ensure that all current clusters end up in the scheduling result. + result := make(map[string]int64) + for clusterName := range currentReplicasPerCluster { + result[clusterName] = 0 + } + + for clusterName, replicas := range scheduleResult { + result[clusterName] = replicas + } + for clusterName, replicas := range overflow { + result[clusterName] += replicas + } + + if glog.V(4) { + buf := bytes.NewBufferString(fmt.Sprintf("Schedule - %q\n", key)) + sort.Strings(clusterNames) + for _, clusterName := range clusterNames { + cur := currentReplicasPerCluster[clusterName] + target := scheduleResult[clusterName] + fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target) + if over, found := overflow[clusterName]; found { + fmt.Fprintf(buf, " overflow: %d", over) + } + if capacity, found := estimatedCapacity[clusterName]; found { + fmt.Fprintf(buf, " capacity: %d", capacity) + } + fmt.Fprintf(buf, "\n") + } + glog.V(4).Infof(buf.String()) + } + return result +} + +// clusterReplicaState returns information about the scheduling state of the pods running in the federated clusters. +func clustersReplicaState( + clusterNames []string, + key string, + objectGetter func(clusterName string, key string) (interface{}, bool, error), + podsGetter func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error)) (currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, err error) { + + currentReplicasPerCluster = make(map[string]int64) + estimatedCapacity = make(map[string]int64) + + for _, clusterName := range clusterNames { + obj, exists, err := objectGetter(clusterName, key) + if err != nil { + return nil, nil, err + } + if !exists { + continue + } + replicas := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int() + readyReplicas := reflect.ValueOf(obj).Elem().FieldByName("Status").FieldByName("ReadyReplicas").Int() + if replicas == readyReplicas { + currentReplicasPerCluster[clusterName] = readyReplicas + } else { + pods, err := podsGetter(clusterName, obj.(pkgruntime.Object)) + if err != nil { + return nil, nil, err + } + podStatus := podanalyzer.AnalyzePods(pods, time.Now()) + currentReplicasPerCluster[clusterName] = int64(podStatus.RunningAndReady) // include pending as well? + unschedulable := int64(podStatus.Unschedulable) + if unschedulable > 0 { + estimatedCapacity[clusterName] = replicas - unschedulable + } + } + } + return currentReplicasPerCluster, estimatedCapacity, nil +} diff --git a/federation/pkg/federatedtypes/replicaset_test.go b/federation/pkg/federatedtypes/scheduling_test.go similarity index 90% rename from federation/pkg/federatedtypes/replicaset_test.go rename to federation/pkg/federatedtypes/scheduling_test.go index 271612039b..d938e525a5 100644 --- a/federation/pkg/federatedtypes/replicaset_test.go +++ b/federation/pkg/federatedtypes/scheduling_test.go @@ -24,21 +24,22 @@ import ( apiv1 "k8s.io/api/core/v1" extensionsv1 "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkgruntime "k8s.io/apimachinery/pkg/runtime" "github.com/stretchr/testify/assert" ) func TestClusterReplicaState(t *testing.T) { - uncalledPodsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) { - t.Fatal("podsGetter should not be called when replica sets are all ready.") + uncalledPodsGetter := func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error) { + t.Fatal("podsGetter should not be called when workload objects are all ready.") return nil, nil } - podsByReplicaSet := make(map[*extensionsv1.ReplicaSet][]*apiv1.Pod) - podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) { - pods, ok := podsByReplicaSet[replicaSet] + podsByReplicaSet := make(map[pkgruntime.Object][]*apiv1.Pod) + podsGetter := func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error) { + pods, ok := podsByReplicaSet[obj] if !ok { - t.Fatalf("No pods found in test data for replica set named %v", replicaSet.Name) + t.Fatalf("No pods found in test data for replica set %v", obj) return nil, fmt.Errorf("Not found") } var podListPods []apiv1.Pod @@ -64,7 +65,7 @@ func TestClusterReplicaState(t *testing.T) { rs2Replicas int32 rs1ReadyReplicas int32 rs2ReadyReplicas int32 - podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) + podsGetter func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error) pod1Phase apiv1.PodPhase pod1Condition apiv1.PodCondition pod2Phase apiv1.PodPhase diff --git a/federation/pkg/federation-controller/BUILD b/federation/pkg/federation-controller/BUILD index 310b286024..1c5fd2d39b 100644 --- a/federation/pkg/federation-controller/BUILD +++ b/federation/pkg/federation-controller/BUILD @@ -25,7 +25,6 @@ filegroup( srcs = [ ":package-srcs", "//federation/pkg/federation-controller/cluster:all-srcs", - "//federation/pkg/federation-controller/deployment:all-srcs", "//federation/pkg/federation-controller/ingress:all-srcs", "//federation/pkg/federation-controller/namespace:all-srcs", "//federation/pkg/federation-controller/service:all-srcs", diff --git a/federation/pkg/federation-controller/deployment/BUILD b/federation/pkg/federation-controller/deployment/BUILD deleted file mode 100644 index 207236f2ea..0000000000 --- a/federation/pkg/federation-controller/deployment/BUILD +++ /dev/null @@ -1,76 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = ["deploymentcontroller.go"], - tags = ["automanaged"], - deps = [ - "//federation/apis/federation:go_default_library", - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset:go_default_library", - "//federation/pkg/federation-controller/util:go_default_library", - "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", - "//federation/pkg/federation-controller/util/eventsink:go_default_library", - "//federation/pkg/federation-controller/util/planner:go_default_library", - "//federation/pkg/federation-controller/util/replicapreferences:go_default_library", - "//pkg/api:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/controller:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["deploymentcontroller_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//federation/apis/federation/v1beta1:go_default_library", - "//federation/client/clientset_generated/federation_clientset/fake:go_default_library", - "//federation/pkg/federation-controller/util/test:go_default_library", - "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller.go b/federation/pkg/federation-controller/deployment/deploymentcontroller.go deleted file mode 100644 index ccb1e13a48..0000000000 --- a/federation/pkg/federation-controller/deployment/deploymentcontroller.go +++ /dev/null @@ -1,649 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package deployment - -import ( - "bytes" - "fmt" - "sort" - "time" - - "github.com/golang/glog" - - apiv1 "k8s.io/api/core/v1" - clientv1 "k8s.io/api/core/v1" - extensionsv1 "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - "k8s.io/client-go/util/workqueue" - fed "k8s.io/kubernetes/federation/apis/federation" - fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" - fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" - fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/planner" - "k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences" - "k8s.io/kubernetes/pkg/api" - kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/controller" -) - -const ( - FedDeploymentPreferencesAnnotation = "federation.kubernetes.io/deployment-preferences" - allClustersKey = "THE_ALL_CLUSTER_KEY" - UserAgentName = "federation-deployment-controller" - ControllerName = "deployments" -) - -var ( - RequiredResources = []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource("deployments")} - deploymentReviewDelay = 10 * time.Second - clusterAvailableDelay = 20 * time.Second - clusterUnavailableDelay = 60 * time.Second - allDeploymentReviewDelay = 2 * time.Minute - updateTimeout = 30 * time.Second -) - -type DeploymentController struct { - fedClient fedclientset.Interface - - deploymentController cache.Controller - deploymentStore cache.Store - - fedDeploymentInformer fedutil.FederatedInformer - fedPodInformer fedutil.FederatedInformer - - deploymentDeliverer *fedutil.DelayingDeliverer - clusterDeliverer *fedutil.DelayingDeliverer - deploymentWorkQueue workqueue.Interface - // For updating members of federation. - fedUpdater fedutil.FederatedUpdater - deploymentBackoff *flowcontrol.Backoff - eventRecorder record.EventRecorder - - deletionHelper *deletionhelper.DeletionHelper - - defaultPlanner *planner.Planner -} - -// NewDeploymentController returns a new deployment controller -func NewDeploymentController(federationClient fedclientset.Interface) *DeploymentController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient)) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName}) - - fdc := &DeploymentController{ - fedClient: federationClient, - deploymentDeliverer: fedutil.NewDelayingDeliverer(), - clusterDeliverer: fedutil.NewDelayingDeliverer(), - deploymentWorkQueue: workqueue.New(), - deploymentBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), - defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{ - Clusters: map[string]fed.ClusterPreferences{ - "*": {Weight: 1}, - }, - }), - eventRecorder: recorder, - } - - deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.Extensions().Deployments(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.Extensions().Deployments(metav1.NamespaceAll).Watch(options) - }, - }, - &extensionsv1.Deployment{}, - controller.NoResyncPeriodFunc(), - fedutil.NewTriggerOnAllChanges( - func(obj runtime.Object) { fdc.deliverLocalDeployment(obj, deploymentReviewDelay) }, - ), - ) - } - clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ - ClusterAvailable: func(cluster *fedv1.Cluster) { - fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) - }, - ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) { - fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay) - }, - } - fdc.fedDeploymentInformer = fedutil.NewFederatedInformer(federationClient, deploymentFedInformerFactory, &clusterLifecycle) - - podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return clientset.Core().Pods(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return clientset.Core().Pods(metav1.NamespaceAll).Watch(options) - }, - }, - &apiv1.Pod{}, - controller.NoResyncPeriodFunc(), - fedutil.NewTriggerOnAllChanges( - func(obj runtime.Object) { - fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, allDeploymentReviewDelay) - }, - ), - ) - } - fdc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{}) - - fdc.deploymentStore, fdc.deploymentController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return fdc.fedClient.Extensions().Deployments(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return fdc.fedClient.Extensions().Deployments(metav1.NamespaceAll).Watch(options) - }, - }, - &extensionsv1.Deployment{}, - controller.NoResyncPeriodFunc(), - fedutil.NewTriggerOnMetaAndSpecChanges( - func(obj runtime.Object) { fdc.deliverFedDeploymentObj(obj, deploymentReviewDelay) }, - ), - ) - - fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", updateTimeout, fdc.eventRecorder, - func(client kubeclientset.Interface, obj runtime.Object) error { - rs := obj.(*extensionsv1.Deployment) - _, err := client.Extensions().Deployments(rs.Namespace).Create(rs) - return err - }, - func(client kubeclientset.Interface, obj runtime.Object) error { - rs := obj.(*extensionsv1.Deployment) - _, err := client.Extensions().Deployments(rs.Namespace).Update(rs) - return err - }, - func(client kubeclientset.Interface, obj runtime.Object) error { - rs := obj.(*extensionsv1.Deployment) - orphanDependents := false - err := client.Extensions().Deployments(rs.Namespace).Delete(rs.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - return err - }) - - fdc.deletionHelper = deletionhelper.NewDeletionHelper( - fdc.updateDeployment, - // objNameFunc - func(obj runtime.Object) string { - deployment := obj.(*extensionsv1.Deployment) - return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name) - }, - fdc.fedDeploymentInformer, - fdc.fedUpdater, - ) - - return fdc -} - -// Sends the given updated object to apiserver. -// Assumes that the given object is a deployment. -func (fdc *DeploymentController) updateDeployment(obj runtime.Object) (runtime.Object, error) { - deployment := obj.(*extensionsv1.Deployment) - return fdc.fedClient.Extensions().Deployments(deployment.Namespace).Update(deployment) -} - -func (fdc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { - go fdc.deploymentController.Run(stopCh) - fdc.fedDeploymentInformer.Start() - fdc.fedPodInformer.Start() - - fdc.deploymentDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { - fdc.deploymentWorkQueue.Add(item.Key) - }) - fdc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { - fdc.reconcileDeploymentsOnClusterChange() - }) - - // Wait until the cluster is synced to prevent the update storm at the very beginning. - for !fdc.isSynced() { - time.Sleep(5 * time.Millisecond) - glog.V(3).Infof("Waiting for controller to sync up") - } - - for i := 0; i < workers; i++ { - go wait.Until(fdc.worker, time.Second, stopCh) - } - - fedutil.StartBackoffGC(fdc.deploymentBackoff, stopCh) - - <-stopCh - glog.Infof("Shutting down DeploymentController") - fdc.deploymentDeliverer.Stop() - fdc.clusterDeliverer.Stop() - fdc.deploymentWorkQueue.ShutDown() - fdc.fedDeploymentInformer.Stop() - fdc.fedPodInformer.Stop() -} - -func (fdc *DeploymentController) isSynced() bool { - if !fdc.fedDeploymentInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clustersFromDeps, err := fdc.fedDeploymentInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - if !fdc.fedDeploymentInformer.GetTargetStore().ClustersSynced(clustersFromDeps) { - return false - } - - if !fdc.fedPodInformer.ClustersSynced() { - glog.V(2).Infof("Cluster list not synced") - return false - } - clustersFromPods, err := fdc.fedPodInformer.GetReadyClusters() - if err != nil { - glog.Errorf("Failed to get ready clusters: %v", err) - return false - } - - // This also checks whether podInformer and deploymentInformer have the - // same cluster lists. - if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromDeps) { - glog.V(2).Infof("Pod informer not synced") - return false - } - if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromPods) { - glog.V(2).Infof("Pod informer not synced") - return false - } - - if !fdc.deploymentController.HasSynced() { - glog.V(2).Infof("federation deployment list not synced") - return false - } - return true -} - -func (fdc *DeploymentController) deliverLocalDeployment(obj interface{}, duration time.Duration) { - key, err := controller.KeyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %v: %v", obj, err) - return - } - _, exists, err := fdc.deploymentStore.GetByKey(key) - if err != nil { - glog.Errorf("Couldn't get federation deployment %v: %v", key, err) - return - } - if exists { // ignore deployments exists only in local k8s - fdc.deliverDeploymentByKey(key, duration, false) - } -} - -func (fdc *DeploymentController) deliverFedDeploymentObj(obj interface{}, delay time.Duration) { - key, err := controller.KeyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) - return - } - fdc.deliverDeploymentByKey(key, delay, false) -} - -func (fdc *DeploymentController) deliverDeploymentByKey(key string, delay time.Duration, failed bool) { - if failed { - fdc.deploymentBackoff.Next(key, time.Now()) - delay = delay + fdc.deploymentBackoff.Get(key) - } else { - fdc.deploymentBackoff.Reset(key) - } - fdc.deploymentDeliverer.DeliverAfter(key, nil, delay) -} - -func (fdc *DeploymentController) worker() { - for { - item, quit := fdc.deploymentWorkQueue.Get() - if quit { - return - } - key := item.(string) - status, err := fdc.reconcileDeployment(key) - fdc.deploymentWorkQueue.Done(item) - if err != nil { - glog.Errorf("Error syncing cluster controller: %v", err) - fdc.deliverDeploymentByKey(key, 0, true) - } else { - switch status { - case statusAllOk: - break - case statusError: - fdc.deliverDeploymentByKey(key, 0, true) - case statusNeedRecheck: - fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false) - case statusNotSynced: - fdc.deliverDeploymentByKey(key, clusterAvailableDelay, false) - default: - glog.Errorf("Unhandled reconciliation status: %s", status) - fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false) - } - } - } -} - -type podAnalysisResult struct { - // Total number of pods created. - total int - // Number of pods that are running and ready. - runningAndReady int - // Number of pods that have been in unschedulable state for UnshedulableThreshold seconds. - unschedulable int - - // TODO: Handle other scenarios like pod waiting too long for scheduler etc. -} - -const ( - // TODO: make it configurable - unschedulableThreshold = 60 * time.Second -) - -// A function that calculates how many pods from the list are in one of -// the meaningful (from the replica set perspective) states. This function is -// a temporary workaround against the current lack of ownerRef in pods. -// TODO(perotinus): Unify this with the ReplicaSet controller. -func analyzePods(selectorv1 *metav1.LabelSelector, allPods []fedutil.FederatedObject, currentTime time.Time) (map[string]podAnalysisResult, error) { - selector, err := metav1.LabelSelectorAsSelector(selectorv1) - if err != nil { - return nil, fmt.Errorf("invalid selector: %v", err) - } - result := make(map[string]podAnalysisResult) - - for _, fedObject := range allPods { - pod, isPod := fedObject.Object.(*apiv1.Pod) - if !isPod { - return nil, fmt.Errorf("invalid arg content - not a *pod") - } - if !selector.Empty() && selector.Matches(labels.Set(pod.Labels)) { - status := result[fedObject.ClusterName] - status.total++ - for _, condition := range pod.Status.Conditions { - if pod.Status.Phase == apiv1.PodRunning { - if condition.Type == apiv1.PodReady { - status.runningAndReady++ - } - } else { - if condition.Type == apiv1.PodScheduled && - condition.Status == apiv1.ConditionFalse && - condition.Reason == apiv1.PodReasonUnschedulable && - condition.LastTransitionTime.Add(unschedulableThreshold).Before(currentTime) { - - status.unschedulable++ - } - } - } - result[fedObject.ClusterName] = status - } - } - return result, nil -} - -func (fdc *DeploymentController) schedule(fd *extensionsv1.Deployment, clusters []*fedv1.Cluster, - current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 { - // TODO: integrate real scheduler - - plannerToBeUsed := fdc.defaultPlanner - fdPref, err := replicapreferences.GetAllocationPreferences(fd, FedDeploymentPreferencesAnnotation) - if err != nil { - glog.Info("Invalid Deployment specific preference, use default. deployment: %v, err: %v", fd.Name, err) - } - if fdPref != nil { // create a new planner if user specified a preference - plannerToBeUsed = planner.NewPlanner(fdPref) - } - replicas := int64(0) - if fd.Spec.Replicas != nil { - replicas = int64(*fd.Spec.Replicas) - } - var clusterNames []string - for _, cluster := range clusters { - clusterNames = append(clusterNames, cluster.Name) - } - scheduleResult, overflow := plannerToBeUsed.Plan(replicas, clusterNames, current, estimatedCapacity, - fd.Namespace+"/"+fd.Name) - // make sure the result contains all clusters that currently have some replicas. - result := make(map[string]int64) - for clusterName := range current { - result[clusterName] = 0 - } - for clusterName, replicas := range scheduleResult { - result[clusterName] = replicas - } - for clusterName, replicas := range overflow { - result[clusterName] += replicas - } - if glog.V(4) { - buf := bytes.NewBufferString(fmt.Sprintf("Schedule - Deployment: %s/%s\n", fd.Namespace, fd.Name)) - sort.Strings(clusterNames) - for _, clusterName := range clusterNames { - cur := current[clusterName] - target := scheduleResult[clusterName] - fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target) - if over, found := overflow[clusterName]; found { - fmt.Fprintf(buf, " overflow: %d", over) - } - if capacity, found := estimatedCapacity[clusterName]; found { - fmt.Fprintf(buf, " capacity: %d", capacity) - } - fmt.Fprintf(buf, "\n") - } - glog.V(4).Infof(buf.String()) - } - return result -} - -type reconciliationStatus string - -const ( - statusAllOk = reconciliationStatus("ALL_OK") - statusNeedRecheck = reconciliationStatus("RECHECK") - statusError = reconciliationStatus("ERROR") - statusNotSynced = reconciliationStatus("NOSYNC") -) - -func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliationStatus, error) { - if !fdc.isSynced() { - return statusNotSynced, nil - } - - glog.V(4).Infof("Start reconcile deployment %q", key) - startTime := time.Now() - defer glog.V(4).Infof("Finished reconcile deployment %q (%v)", key, time.Now().Sub(startTime)) - - objFromStore, exists, err := fdc.deploymentStore.GetByKey(key) - if err != nil { - return statusError, err - } - if !exists { - // don't delete local deployments for now. Do not reconcile it anymore. - return statusAllOk, nil - } - obj, err := api.Scheme.DeepCopy(objFromStore) - fd, ok := obj.(*extensionsv1.Deployment) - if err != nil || !ok { - glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err) - return statusError, err - } - - if fd.DeletionTimestamp != nil { - if err := fdc.delete(fd); err != nil { - glog.Errorf("Failed to delete %s: %v", fd.Name, err) - fdc.eventRecorder.Eventf(fd, api.EventTypeWarning, "DeleteFailed", - "Deployment delete failed: %v", err) - return statusError, err - } - return statusAllOk, nil - } - - glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for deployment: %s", - fd.Name) - // Add the required finalizers before creating a deployment in underlying clusters. - updatedDeploymentObj, err := fdc.deletionHelper.EnsureFinalizers(fd) - if err != nil { - glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in deployment %s: %v", - fd.Name, err) - return statusError, err - } - fd = updatedDeploymentObj.(*extensionsv1.Deployment) - - glog.V(3).Infof("Syncing deployment %s in underlying clusters", fd.Name) - - clusters, err := fdc.fedDeploymentInformer.GetReadyClusters() - if err != nil { - return statusError, err - } - - // collect current status and do schedule - allPods, err := fdc.fedPodInformer.GetTargetStore().List() - if err != nil { - return statusError, err - } - podStatus, err := analyzePods(fd.Spec.Selector, allPods, time.Now()) - current := make(map[string]int64) - estimatedCapacity := make(map[string]int64) - for _, cluster := range clusters { - ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(cluster.Name, key) - if err != nil { - return statusError, err - } - if exists { - ld := ldObj.(*extensionsv1.Deployment) - current[cluster.Name] = int64(podStatus[cluster.Name].runningAndReady) // include pending as well? - unschedulable := int64(podStatus[cluster.Name].unschedulable) - if unschedulable > 0 { - estimatedCapacity[cluster.Name] = int64(*ld.Spec.Replicas) - unschedulable - } - } - } - - scheduleResult := fdc.schedule(fd, clusters, current, estimatedCapacity) - - glog.V(4).Infof("Start syncing local deployment %s: %v", key, scheduleResult) - - fedStatus := extensionsv1.DeploymentStatus{ObservedGeneration: fd.Generation} - operations := make([]fedutil.FederatedOperation, 0) - for clusterName, replicas := range scheduleResult { - - ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(clusterName, key) - if err != nil { - return statusError, err - } - - // The object can be modified. - ld := fedutil.DeepCopyDeployment(fd) - specReplicas := int32(replicas) - ld.Spec.Replicas = &specReplicas - - if !exists { - if replicas > 0 { - operations = append(operations, fedutil.FederatedOperation{ - Type: fedutil.OperationTypeAdd, - Obj: ld, - ClusterName: clusterName, - Key: key, - }) - } - } else { - // TODO: Update only one deployment at a time if update strategy is rolling update. - - currentLd := ldObj.(*extensionsv1.Deployment) - // Update existing replica set, if needed. - if !fedutil.DeploymentEquivalent(ld, currentLd) { - operations = append(operations, fedutil.FederatedOperation{ - Type: fedutil.OperationTypeUpdate, - Obj: ld, - ClusterName: clusterName, - Key: key, - }) - glog.Infof("Updating %s in %s", currentLd.Name, clusterName) - } - fedStatus.Replicas += currentLd.Status.Replicas - fedStatus.AvailableReplicas += currentLd.Status.AvailableReplicas - fedStatus.UnavailableReplicas += currentLd.Status.UnavailableReplicas - fedStatus.ReadyReplicas += currentLd.Status.ReadyReplicas - } - } - if fedStatus.Replicas != fd.Status.Replicas || - fedStatus.AvailableReplicas != fd.Status.AvailableReplicas || - fedStatus.UnavailableReplicas != fd.Status.UnavailableReplicas || - fedStatus.ReadyReplicas != fd.Status.ReadyReplicas { - fd.Status = fedStatus - _, err = fdc.fedClient.Extensions().Deployments(fd.Namespace).UpdateStatus(fd) - if err != nil { - return statusError, err - } - } - - if len(operations) == 0 { - // Everything is in order - return statusAllOk, nil - } - err = fdc.fedUpdater.Update(operations) - if err != nil { - glog.Errorf("Failed to execute updates for %s: %v", key, err) - return statusError, err - } - - // Some operations were made, reconcile after a while. - return statusNeedRecheck, nil -} - -func (fdc *DeploymentController) reconcileDeploymentsOnClusterChange() { - if !fdc.isSynced() { - fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) - } - deps := fdc.deploymentStore.List() - for _, dep := range deps { - key, _ := controller.KeyFunc(dep) - fdc.deliverDeploymentByKey(key, 0, false) - } -} - -// delete deletes the given deployment or returns error if the deletion was not complete. -func (fdc *DeploymentController) delete(deployment *extensionsv1.Deployment) error { - glog.V(3).Infof("Handling deletion of deployment: %v", *deployment) - _, err := fdc.deletionHelper.HandleObjectInUnderlyingClusters(deployment) - if err != nil { - return err - } - - err = fdc.fedClient.Extensions().Deployments(deployment.Namespace).Delete(deployment.Name, nil) - if err != nil { - // Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. - // This is expected when we are processing an update as a result of deployment finalizer deletion. - // The process that deleted the last finalizer is also going to delete the deployment and we do not have to do anything. - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete deployment: %v", err) - } - } - return nil -} diff --git a/federation/pkg/federation-controller/sync/BUILD b/federation/pkg/federation-controller/sync/BUILD index 1258db9f16..8dc31c8162 100644 --- a/federation/pkg/federation-controller/sync/BUILD +++ b/federation/pkg/federation-controller/sync/BUILD @@ -44,6 +44,7 @@ go_test( name = "go_default_test", srcs = [ "controller_test.go", + "deploymentcontroller_test.go", "replicasetcontroller_test.go", ], library = ":go_default_library", @@ -62,6 +63,8 @@ go_test( "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", ], diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller_test.go b/federation/pkg/federation-controller/sync/deploymentcontroller_test.go similarity index 83% rename from federation/pkg/federation-controller/deployment/deploymentcontroller_test.go rename to federation/pkg/federation-controller/sync/deploymentcontroller_test.go index a03b05b484..cae13c2c32 100644 --- a/federation/pkg/federation-controller/deployment/deploymentcontroller_test.go +++ b/federation/pkg/federation-controller/sync/deploymentcontroller_test.go @@ -14,13 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package deployment +package sync import ( "flag" "fmt" "testing" - "time" apiv1 "k8s.io/api/core/v1" extensionsv1 "k8s.io/api/extensions/v1beta1" @@ -30,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake" + "k8s.io/kubernetes/federation/pkg/federatedtypes" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" @@ -39,7 +39,6 @@ import ( const ( deployments = "deployments" - pods = "pods" ) func TestDeploymentController(t *testing.T) { @@ -47,11 +46,6 @@ func TestDeploymentController(t *testing.T) { flag.Set("v", "5") flag.Parse() - deploymentReviewDelay = 500 * time.Millisecond - clusterAvailableDelay = 100 * time.Millisecond - clusterUnavailableDelay = 100 * time.Millisecond - allDeploymentReviewDelay = 500 * time.Millisecond - cluster1 := NewCluster("cluster1", apiv1.ConditionTrue) cluster2 := NewCluster("cluster2", apiv1.ConditionTrue) @@ -65,18 +59,15 @@ func TestDeploymentController(t *testing.T) { cluster1Client := &fakekubeclientset.Clientset{} cluster1Watch := RegisterFakeWatch(deployments, &cluster1Client.Fake) - _ = RegisterFakeWatch(pods, &cluster1Client.Fake) - RegisterFakeList(deployments, &cluster1Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}}) cluster1CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster1Client.Fake, cluster1Watch) cluster1UpdateChan := RegisterFakeCopyOnUpdate(deployments, &cluster1Client.Fake, cluster1Watch) cluster2Client := &fakekubeclientset.Clientset{} cluster2Watch := RegisterFakeWatch(deployments, &cluster2Client.Fake) - _ = RegisterFakeWatch(pods, &cluster2Client.Fake) - RegisterFakeList(deployments, &cluster2Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}}) cluster2CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster2Client.Fake, cluster2Watch) - deploymentController := NewDeploymentController(fakeClient) + deploymentController := newFederationSyncController(fakeClient, federatedtypes.NewDeploymentAdapter(fakeClient)) + deploymentController.minimizeLatency() clientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) { switch cluster.Name { case cluster1.Name: @@ -87,11 +78,10 @@ func TestDeploymentController(t *testing.T) { return nil, fmt.Errorf("Unknown cluster") } } - ToFederatedInformerForTestOnly(deploymentController.fedDeploymentInformer).SetClientFactory(clientFactory) - ToFederatedInformerForTestOnly(deploymentController.fedPodInformer).SetClientFactory(clientFactory) + ToFederatedInformerForTestOnly(deploymentController.informer).SetClientFactory(clientFactory) stop := make(chan struct{}) - go deploymentController.Run(5, stop) + go deploymentController.Run(stop) // Create deployment. Expect to see it in cluster1. dep1 := newDeploymentWithReplicas("depA", 6) @@ -113,7 +103,7 @@ func TestDeploymentController(t *testing.T) { } assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas))) err := WaitForStoreUpdate( - deploymentController.fedDeploymentInformer.GetTargetStore(), + deploymentController.informer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: dep1.Namespace, Name: dep1.Name}.String(), wait.ForeverTestTimeout) assert.Nil(t, err, "deployment should have appeared in the informer store") @@ -132,7 +122,7 @@ func TestDeploymentController(t *testing.T) { // Add new deployment with non-default replica placement preferences. dep2 := newDeploymentWithReplicas("deployment2", 9) dep2.Annotations = make(map[string]string) - dep2.Annotations[FedDeploymentPreferencesAnnotation] = `{"rebalance": true, + dep2.Annotations[federatedtypes.FedDeploymentPreferencesAnnotation] = `{"rebalance": true, "clusters": { "cluster1": {"weight": 2}, "cluster2": {"weight": 1}