Merge pull request #46260 from perotinus/depsyncconv2

Automatic merge from submit-queue (batch tested with PRs 47619, 47951, 46260, 48277)

[Federation] Convert the deployment controller to a sync controller.

This is based off of the work done for the ReplicaSet controller. It extracts out a schedulingAdapter that handles the shared logic between the two controllers.

Targets #40989

**Release note**:

```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-06-29 14:05:28 -07:00 committed by GitHub
commit 406c79cf1f
12 changed files with 417 additions and 951 deletions

View File

@ -23,7 +23,6 @@ go_library(
"//federation/pkg/dnsprovider/providers/google/clouddns:go_default_library",
"//federation/pkg/federatedtypes:go_default_library",
"//federation/pkg/federation-controller/cluster:go_default_library",
"//federation/pkg/federation-controller/deployment:go_default_library",
"//federation/pkg/federation-controller/ingress:go_default_library",
"//federation/pkg/federation-controller/namespace:go_default_library",
"//federation/pkg/federation-controller/service:go_default_library",

View File

@ -37,7 +37,6 @@ import (
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment"
ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress"
namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
@ -166,15 +165,6 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
}
}
if controllerEnabled(s.Controllers, serverResources, deploymentcontroller.ControllerName, deploymentcontroller.RequiredResources, true) {
glog.V(3).Infof("Loading client config for deployment controller %q", deploymentcontroller.UserAgentName)
deploymentClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, deploymentcontroller.UserAgentName))
deploymentController := deploymentcontroller.NewDeploymentController(deploymentClientset)
glog.V(3).Infof("Running deployment controller")
// TODO: rename s.ConcurrentReplicaSetSyncs
go deploymentController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop)
}
if controllerEnabled(s.Controllers, serverResources, ingresscontroller.ControllerName, ingresscontroller.RequiredResources, true) {
glog.V(3).Infof("Loading client config for ingress controller %q", ingresscontroller.UserAgentName)
ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, ingresscontroller.UserAgentName))

View File

@ -14,6 +14,7 @@ go_library(
"adapter.go",
"configmap.go",
"daemonset.go",
"deployment.go",
"registry.go",
"replicaset.go",
"scheduling.go",
@ -32,6 +33,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
@ -58,7 +60,7 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["replicaset_test.go"],
srcs = ["scheduling_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
@ -66,5 +68,6 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)

View File

@ -0,0 +1,188 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package federatedtypes
import (
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
const (
DeploymentKind = "deployment"
DeploymentControllerName = "deployments"
FedDeploymentPreferencesAnnotation = "federation.kubernetes.io/deployment-preferences"
)
func init() {
RegisterFederatedType(DeploymentKind, DeploymentControllerName, []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource(DeploymentControllerName)}, NewDeploymentAdapter)
}
type DeploymentAdapter struct {
*schedulingAdapter
client federationclientset.Interface
}
func NewDeploymentAdapter(client federationclientset.Interface) FederatedTypeAdapter {
schedulingAdapter := schedulingAdapter{
preferencesAnnotationName: FedDeploymentPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error {
deployment := obj.(*extensionsv1.Deployment)
if status.Replicas != deployment.Status.Replicas || status.UpdatedReplicas != deployment.Status.UpdatedReplicas ||
status.ReadyReplicas != deployment.Status.ReadyReplicas || status.AvailableReplicas != deployment.Status.AvailableReplicas {
deployment.Status = extensionsv1.DeploymentStatus{
Replicas: status.Replicas,
UpdatedReplicas: status.UpdatedReplicas,
ReadyReplicas: status.ReadyReplicas,
AvailableReplicas: status.AvailableReplicas,
}
_, err := client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
return err
}
return nil
},
}
return &DeploymentAdapter{&schedulingAdapter, client}
}
func (a *DeploymentAdapter) Kind() string {
return DeploymentKind
}
func (a *DeploymentAdapter) ObjectType() pkgruntime.Object {
return &extensionsv1.Deployment{}
}
func (a *DeploymentAdapter) IsExpectedType(obj interface{}) bool {
_, ok := obj.(*extensionsv1.Deployment)
return ok
}
func (a *DeploymentAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
deployment := obj.(*extensionsv1.Deployment)
return fedutil.DeepCopyDeployment(deployment)
}
func (a *DeploymentAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
deployment1 := obj1.(*extensionsv1.Deployment)
deployment2 := obj2.(*extensionsv1.Deployment)
return fedutil.DeploymentEquivalent(deployment1, deployment2)
}
func (a *DeploymentAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
deployment := obj.(*extensionsv1.Deployment)
return types.NamespacedName{Namespace: deployment.Namespace, Name: deployment.Name}
}
func (a *DeploymentAdapter) ObjectMeta(obj pkgruntime.Object) *metav1.ObjectMeta {
return &obj.(*extensionsv1.Deployment).ObjectMeta
}
func (a *DeploymentAdapter) FedCreate(obj pkgruntime.Object) (pkgruntime.Object, error) {
deployment := obj.(*extensionsv1.Deployment)
return a.client.Extensions().Deployments(deployment.Namespace).Create(deployment)
}
func (a *DeploymentAdapter) FedDelete(namespacedName types.NamespacedName, options *metav1.DeleteOptions) error {
return a.client.Extensions().Deployments(namespacedName.Namespace).Delete(namespacedName.Name, options)
}
func (a *DeploymentAdapter) FedGet(namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return a.client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
}
func (a *DeploymentAdapter) FedList(namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return a.client.Extensions().Deployments(namespace).List(options)
}
func (a *DeploymentAdapter) FedUpdate(obj pkgruntime.Object) (pkgruntime.Object, error) {
deployment := obj.(*extensionsv1.Deployment)
return a.client.Extensions().Deployments(deployment.Namespace).Update(deployment)
}
func (a *DeploymentAdapter) FedWatch(namespace string, options metav1.ListOptions) (watch.Interface, error) {
return a.client.Extensions().Deployments(namespace).Watch(options)
}
func (a *DeploymentAdapter) ClusterCreate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
deployment := obj.(*extensionsv1.Deployment)
return client.Extensions().Deployments(deployment.Namespace).Create(deployment)
}
func (a *DeploymentAdapter) ClusterDelete(client kubeclientset.Interface, nsName types.NamespacedName, options *metav1.DeleteOptions) error {
return client.Extensions().Deployments(nsName.Namespace).Delete(nsName.Name, options)
}
func (a *DeploymentAdapter) ClusterGet(client kubeclientset.Interface, namespacedName types.NamespacedName) (pkgruntime.Object, error) {
return client.Extensions().Deployments(namespacedName.Namespace).Get(namespacedName.Name, metav1.GetOptions{})
}
func (a *DeploymentAdapter) ClusterList(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (pkgruntime.Object, error) {
return client.Extensions().Deployments(namespace).List(options)
}
func (a *DeploymentAdapter) ClusterUpdate(client kubeclientset.Interface, obj pkgruntime.Object) (pkgruntime.Object, error) {
deployment := obj.(*extensionsv1.Deployment)
return client.Extensions().Deployments(deployment.Namespace).Update(deployment)
}
func (a *DeploymentAdapter) ClusterWatch(client kubeclientset.Interface, namespace string, options metav1.ListOptions) (watch.Interface, error) {
return client.Extensions().Deployments(namespace).Watch(options)
}
func (a *DeploymentAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool {
deployment1 := obj1.(*extensionsv1.Deployment)
deployment2 := a.Copy(obj2).(*extensionsv1.Deployment)
deployment2.Spec.Replicas = deployment1.Spec.Replicas
return fedutil.DeploymentEquivalent(deployment1, deployment2)
}
func (a *DeploymentAdapter) NewTestObject(namespace string) pkgruntime.Object {
replicas := int32(3)
zero := int64(0)
return &extensionsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-deployment-",
Namespace: namespace,
},
Spec: extensionsv1.DeploymentSpec{
Replicas: &replicas,
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"foo": "bar"},
},
Spec: apiv1.PodSpec{
TerminationGracePeriodSeconds: &zero,
Containers: []apiv1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
}

View File

@ -17,12 +17,6 @@ limitations under the License.
package federatedtypes
import (
"bytes"
"fmt"
"sort"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -30,13 +24,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
fedapi "k8s.io/kubernetes/federation/apis/federation"
fedv1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
)
@ -46,28 +35,35 @@ const (
FedReplicaSetPreferencesAnnotation = "federation.kubernetes.io/replica-set-preferences"
)
type replicaSetUserInfo struct {
scheduleResult (map[string]int64)
fedStatus *extensionsv1.ReplicaSetStatus
}
func init() {
RegisterFederatedType(ReplicaSetKind, ReplicaSetControllerName, []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource(ReplicaSetControllerName)}, NewReplicaSetAdapter)
}
type ReplicaSetAdapter struct {
client federationclientset.Interface
defaultPlanner *planner.Planner
*schedulingAdapter
client federationclientset.Interface
}
func NewReplicaSetAdapter(client federationclientset.Interface) FederatedTypeAdapter {
return &ReplicaSetAdapter{
client: client,
defaultPlanner: planner.NewPlanner(&fedapi.ReplicaAllocationPreferences{
Clusters: map[string]fedapi.ClusterPreferences{
"*": {Weight: 1},
},
})}
schedulingAdapter := schedulingAdapter{
preferencesAnnotationName: FedReplicaSetPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, status SchedulingStatus) error {
rs := obj.(*extensionsv1.ReplicaSet)
if status.Replicas != rs.Status.Replicas || status.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas ||
status.ReadyReplicas != rs.Status.ReadyReplicas || status.AvailableReplicas != rs.Status.AvailableReplicas {
rs.Status = extensionsv1.ReplicaSetStatus{
Replicas: status.Replicas,
FullyLabeledReplicas: status.Replicas,
ReadyReplicas: status.ReadyReplicas,
AvailableReplicas: status.AvailableReplicas,
}
_, err := client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs)
return err
}
return nil
},
}
return &ReplicaSetAdapter{&schedulingAdapter, client}
}
func (a *ReplicaSetAdapter) Kind() string {
@ -92,9 +88,7 @@ func (a *ReplicaSetAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
}
func (a *ReplicaSetAdapter) Equivalent(obj1, obj2 pkgruntime.Object) bool {
replicaset1 := obj1.(*extensionsv1.ReplicaSet)
replicaset2 := obj2.(*extensionsv1.ReplicaSet)
return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2)
return fedutil.ObjectMetaAndSpecEquivalent(obj1, obj2)
}
func (a *ReplicaSetAdapter) NamespacedName(obj pkgruntime.Object) types.NamespacedName {
@ -158,79 +152,6 @@ func (a *ReplicaSetAdapter) ClusterWatch(client kubeclientset.Interface, namespa
return client.Extensions().ReplicaSets(namespace).Watch(options)
}
func (a *ReplicaSetAdapter) IsSchedulingAdapter() bool {
return true
}
func (a *ReplicaSetAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*fedv1beta1.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) {
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
// Schedule the pods across the existing clusters.
replicaSetGetter := func(clusterName, key string) (interface{}, bool, error) {
return informer.GetTargetStore().GetByKey(clusterName, key)
}
podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) {
clientset, err := informer.GetClientsetForCluster(clusterName)
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(replicaSet.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}
return clientset.Core().Pods(replicaSet.ObjectMeta.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
}
current, estimatedCapacity, err := clustersReplicaState(clusterNames, key, replicaSetGetter, podsGetter)
if err != nil {
return nil, err
}
rs := obj.(*extensionsv1.ReplicaSet)
return &SchedulingInfo{
Schedule: a.schedule(rs, clusterNames, current, estimatedCapacity),
Status: SchedulingStatus{},
}, nil
}
func (a *ReplicaSetAdapter) ScheduleObject(cluster *fedv1beta1.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) {
rs := federationObjCopy.(*extensionsv1.ReplicaSet)
replicas, ok := schedulingInfo.Schedule[cluster.Name]
if !ok {
replicas = 0
}
specReplicas := int32(replicas)
rs.Spec.Replicas = &specReplicas
if clusterObj != nil {
clusterRs := clusterObj.(*extensionsv1.ReplicaSet)
schedulingInfo.Status.Replicas += clusterRs.Status.Replicas
schedulingInfo.Status.FullyLabeledReplicas += clusterRs.Status.FullyLabeledReplicas
schedulingInfo.Status.ReadyReplicas += clusterRs.Status.ReadyReplicas
schedulingInfo.Status.AvailableReplicas += clusterRs.Status.AvailableReplicas
}
return rs, replicas > 0, nil
}
func (a *ReplicaSetAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error {
rs := obj.(*extensionsv1.ReplicaSet)
if status.Replicas != rs.Status.Replicas || status.FullyLabeledReplicas != rs.Status.FullyLabeledReplicas ||
status.ReadyReplicas != rs.Status.ReadyReplicas || status.AvailableReplicas != rs.Status.AvailableReplicas {
rs.Status = extensionsv1.ReplicaSetStatus{
Replicas: status.Replicas,
FullyLabeledReplicas: status.Replicas,
ReadyReplicas: status.ReadyReplicas,
AvailableReplicas: status.AvailableReplicas,
}
_, err := a.client.Extensions().ReplicaSets(rs.Namespace).UpdateStatus(rs)
return err
}
return nil
}
func (a *ReplicaSetAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool {
replicaset1 := obj1.(*extensionsv1.ReplicaSet)
replicaset2 := a.Copy(obj2).(*extensionsv1.ReplicaSet)
@ -238,93 +159,6 @@ func (a *ReplicaSetAdapter) EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Obj
return fedutil.ObjectMetaAndSpecEquivalent(replicaset1, replicaset2)
}
func (a *ReplicaSetAdapter) schedule(frs *extensionsv1.ReplicaSet, clusterNames []string,
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
plnr := a.defaultPlanner
frsPref, err := replicapreferences.GetAllocationPreferences(frs, FedReplicaSetPreferencesAnnotation)
if err != nil {
glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err)
}
if frsPref != nil { // create a new planner if user specified a preference
plnr = planner.NewPlanner(frsPref)
}
replicas := int64(*frs.Spec.Replicas)
scheduleResult, overflow := plnr.Plan(replicas, clusterNames, current, estimatedCapacity,
frs.Namespace+"/"+frs.Name)
// Ensure that the schedule being returned has scheduling instructions for
// all of the clusters that currently have replicas. A cluster that was in
// the previous schedule but is not in the new schedule should have zero
// replicas.
result := make(map[string]int64)
for clusterName := range current {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
if glog.V(4) {
buf := bytes.NewBufferString(fmt.Sprintf("Schedule - ReplicaSet: %s/%s\n", frs.Namespace, frs.Name))
sort.Strings(clusterNames)
for _, clusterName := range clusterNames {
cur := current[clusterName]
target := scheduleResult[clusterName]
fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
if over, found := overflow[clusterName]; found {
fmt.Fprintf(buf, " overflow: %d", over)
}
if capacity, found := estimatedCapacity[clusterName]; found {
fmt.Fprintf(buf, " capacity: %d", capacity)
}
fmt.Fprintf(buf, "\n")
}
glog.V(4).Infof(buf.String())
}
return result
}
// clusterReplicaState returns information about the scheduling state of the pods running in the federated clusters.
func clustersReplicaState(
clusterNames []string,
replicaSetKey string,
replicaSetGetter func(clusterName string, key string) (interface{}, bool, error),
podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error)) (current map[string]int64, estimatedCapacity map[string]int64, err error) {
current = make(map[string]int64)
estimatedCapacity = make(map[string]int64)
for _, clusterName := range clusterNames {
rsObj, exists, err := replicaSetGetter(clusterName, replicaSetKey)
if err != nil {
return nil, nil, err
}
if !exists {
continue
}
rs := rsObj.(*extensionsv1.ReplicaSet)
if int32(*rs.Spec.Replicas) == rs.Status.ReadyReplicas {
current[clusterName] = int64(rs.Status.ReadyReplicas)
} else {
pods, err := podsGetter(clusterName, rs)
if err != nil {
return nil, nil, err
}
podStatus := podanalyzer.AnalyzePods(pods, time.Now())
current[clusterName] = int64(podStatus.RunningAndReady) // include pending as well?
unschedulable := int64(podStatus.Unschedulable)
if unschedulable > 0 {
estimatedCapacity[clusterName] = int64(*rs.Spec.Replicas) - unschedulable
}
}
}
return current, estimatedCapacity, nil
}
func (a *ReplicaSetAdapter) NewTestObject(namespace string) pkgruntime.Object {
replicas := int32(3)
zero := int64(0)

View File

@ -17,15 +17,31 @@ limitations under the License.
package federatedtypes
import (
"bytes"
"fmt"
"reflect"
"sort"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
fedapi "k8s.io/kubernetes/federation/apis/federation"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
"github.com/golang/glog"
)
// SchedulingStatus contains the status of the objects that are being
// scheduled into joined clusters.
type SchedulingStatus struct {
Replicas int32
UpdatedReplicas int32
FullyLabeledReplicas int32
ReadyReplicas int32
AvailableReplicas int32
@ -49,3 +65,171 @@ type SchedulingAdapter interface {
// equivalent ignoring differences due to scheduling.
EquivalentIgnoringSchedule(obj1, obj2 pkgruntime.Object) bool
}
// schedulingAdapter is meant to be embedded in other type adapters that require
// workload scheduling.
type schedulingAdapter struct {
preferencesAnnotationName string
updateStatusFunc func(pkgruntime.Object, SchedulingStatus) error
}
func (a *schedulingAdapter) IsSchedulingAdapter() bool {
return true
}
func (a *schedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (*SchedulingInfo, error) {
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
// Schedule the pods across the existing clusters.
objectGetter := func(clusterName, key string) (interface{}, bool, error) {
return informer.GetTargetStore().GetByKey(clusterName, key)
}
podsGetter := func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error) {
clientset, err := informer.GetClientsetForCluster(clusterName)
if err != nil {
return nil, err
}
selectorObj := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Selector").Interface().(*metav1.LabelSelector)
selector, err := metav1.LabelSelectorAsSelector(selectorObj)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
return clientset.Core().Pods(metadata.GetNamespace()).List(metav1.ListOptions{LabelSelector: selector.String()})
}
currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
if err != nil {
return nil, err
}
fedPref, err := replicapreferences.GetAllocationPreferences(obj, a.preferencesAnnotationName)
if err != nil {
glog.Info("Invalid workload-type specific preference, using default. object: %v, err: %v", obj, err)
}
if fedPref == nil {
fedPref = &fedapi.ReplicaAllocationPreferences{
Clusters: map[string]fedapi.ClusterPreferences{
"*": {Weight: 1},
},
}
}
plnr := planner.NewPlanner(fedPref)
return &SchedulingInfo{
Schedule: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity),
Status: SchedulingStatus{},
}, nil
}
func (a *schedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo *SchedulingInfo) (pkgruntime.Object, bool, error) {
replicas, ok := schedulingInfo.Schedule[cluster.Name]
if !ok {
replicas = 0
}
specReplicas := int32(replicas)
reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas))
if clusterObj != nil {
schedulingStatusVal := reflect.ValueOf(schedulingInfo).Elem().FieldByName("Status")
objStatusVal := reflect.ValueOf(clusterObj).Elem().FieldByName("Status")
for i := 0; i < schedulingStatusVal.NumField(); i++ {
schedulingStatusField := schedulingStatusVal.Field(i)
schedulingStatusFieldName := schedulingStatusVal.Type().Field(i).Name
objStatusField := objStatusVal.FieldByName(schedulingStatusFieldName)
if objStatusField.IsValid() {
current := schedulingStatusField.Int()
additional := objStatusField.Int()
schedulingStatusField.SetInt(current + additional)
}
}
}
return federationObjCopy, replicas > 0, nil
}
func (a *schedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object, status SchedulingStatus) error {
return a.updateStatusFunc(obj, status)
}
func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
replicas := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int()
scheduleResult, overflow := planner.Plan(replicas, clusterNames, currentReplicasPerCluster, estimatedCapacity, key)
// Ensure that all current clusters end up in the scheduling result.
result := make(map[string]int64)
for clusterName := range currentReplicasPerCluster {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
if glog.V(4) {
buf := bytes.NewBufferString(fmt.Sprintf("Schedule - %q\n", key))
sort.Strings(clusterNames)
for _, clusterName := range clusterNames {
cur := currentReplicasPerCluster[clusterName]
target := scheduleResult[clusterName]
fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
if over, found := overflow[clusterName]; found {
fmt.Fprintf(buf, " overflow: %d", over)
}
if capacity, found := estimatedCapacity[clusterName]; found {
fmt.Fprintf(buf, " capacity: %d", capacity)
}
fmt.Fprintf(buf, "\n")
}
glog.V(4).Infof(buf.String())
}
return result
}
// clusterReplicaState returns information about the scheduling state of the pods running in the federated clusters.
func clustersReplicaState(
clusterNames []string,
key string,
objectGetter func(clusterName string, key string) (interface{}, bool, error),
podsGetter func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error)) (currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, err error) {
currentReplicasPerCluster = make(map[string]int64)
estimatedCapacity = make(map[string]int64)
for _, clusterName := range clusterNames {
obj, exists, err := objectGetter(clusterName, key)
if err != nil {
return nil, nil, err
}
if !exists {
continue
}
replicas := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int()
readyReplicas := reflect.ValueOf(obj).Elem().FieldByName("Status").FieldByName("ReadyReplicas").Int()
if replicas == readyReplicas {
currentReplicasPerCluster[clusterName] = readyReplicas
} else {
pods, err := podsGetter(clusterName, obj.(pkgruntime.Object))
if err != nil {
return nil, nil, err
}
podStatus := podanalyzer.AnalyzePods(pods, time.Now())
currentReplicasPerCluster[clusterName] = int64(podStatus.RunningAndReady) // include pending as well?
unschedulable := int64(podStatus.Unschedulable)
if unschedulable > 0 {
estimatedCapacity[clusterName] = replicas - unschedulable
}
}
}
return currentReplicasPerCluster, estimatedCapacity, nil
}

View File

@ -24,21 +24,22 @@ import (
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"github.com/stretchr/testify/assert"
)
func TestClusterReplicaState(t *testing.T) {
uncalledPodsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) {
t.Fatal("podsGetter should not be called when replica sets are all ready.")
uncalledPodsGetter := func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error) {
t.Fatal("podsGetter should not be called when workload objects are all ready.")
return nil, nil
}
podsByReplicaSet := make(map[*extensionsv1.ReplicaSet][]*apiv1.Pod)
podsGetter := func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error) {
pods, ok := podsByReplicaSet[replicaSet]
podsByReplicaSet := make(map[pkgruntime.Object][]*apiv1.Pod)
podsGetter := func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error) {
pods, ok := podsByReplicaSet[obj]
if !ok {
t.Fatalf("No pods found in test data for replica set named %v", replicaSet.Name)
t.Fatalf("No pods found in test data for replica set %v", obj)
return nil, fmt.Errorf("Not found")
}
var podListPods []apiv1.Pod
@ -64,7 +65,7 @@ func TestClusterReplicaState(t *testing.T) {
rs2Replicas int32
rs1ReadyReplicas int32
rs2ReadyReplicas int32
podsGetter func(clusterName string, replicaSet *extensionsv1.ReplicaSet) (*apiv1.PodList, error)
podsGetter func(clusterName string, obj pkgruntime.Object) (*apiv1.PodList, error)
pod1Phase apiv1.PodPhase
pod1Condition apiv1.PodCondition
pod2Phase apiv1.PodPhase

View File

@ -25,7 +25,6 @@ filegroup(
srcs = [
":package-srcs",
"//federation/pkg/federation-controller/cluster:all-srcs",
"//federation/pkg/federation-controller/deployment:all-srcs",
"//federation/pkg/federation-controller/ingress:all-srcs",
"//federation/pkg/federation-controller/namespace:all-srcs",
"//federation/pkg/federation-controller/service:all-srcs",

View File

@ -1,76 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["deploymentcontroller.go"],
tags = ["automanaged"],
deps = [
"//federation/apis/federation:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset:go_default_library",
"//federation/pkg/federation-controller/util:go_default_library",
"//federation/pkg/federation-controller/util/deletionhelper:go_default_library",
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["deploymentcontroller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/client/clientset_generated/federation_clientset/fake:go_default_library",
"//federation/pkg/federation-controller/util/test:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,649 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
import (
"bytes"
"fmt"
"sort"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
clientv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
fed "k8s.io/kubernetes/federation/apis/federation"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
"k8s.io/kubernetes/pkg/api"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller"
)
const (
FedDeploymentPreferencesAnnotation = "federation.kubernetes.io/deployment-preferences"
allClustersKey = "THE_ALL_CLUSTER_KEY"
UserAgentName = "federation-deployment-controller"
ControllerName = "deployments"
)
var (
RequiredResources = []schema.GroupVersionResource{extensionsv1.SchemeGroupVersion.WithResource("deployments")}
deploymentReviewDelay = 10 * time.Second
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
allDeploymentReviewDelay = 2 * time.Minute
updateTimeout = 30 * time.Second
)
type DeploymentController struct {
fedClient fedclientset.Interface
deploymentController cache.Controller
deploymentStore cache.Store
fedDeploymentInformer fedutil.FederatedInformer
fedPodInformer fedutil.FederatedInformer
deploymentDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer
deploymentWorkQueue workqueue.Interface
// For updating members of federation.
fedUpdater fedutil.FederatedUpdater
deploymentBackoff *flowcontrol.Backoff
eventRecorder record.EventRecorder
deletionHelper *deletionhelper.DeletionHelper
defaultPlanner *planner.Planner
}
// NewDeploymentController returns a new deployment controller
func NewDeploymentController(federationClient fedclientset.Interface) *DeploymentController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
fdc := &DeploymentController{
fedClient: federationClient,
deploymentDeliverer: fedutil.NewDelayingDeliverer(),
clusterDeliverer: fedutil.NewDelayingDeliverer(),
deploymentWorkQueue: workqueue.New(),
deploymentBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{
Clusters: map[string]fed.ClusterPreferences{
"*": {Weight: 1},
},
}),
eventRecorder: recorder,
}
deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.Extensions().Deployments(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.Extensions().Deployments(metav1.NamespaceAll).Watch(options)
},
},
&extensionsv1.Deployment{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) { fdc.deliverLocalDeployment(obj, deploymentReviewDelay) },
),
)
}
clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedv1.Cluster) {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
},
ClusterUnavailable: func(cluster *fedv1.Cluster, _ []interface{}) {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterUnavailableDelay)
},
}
fdc.fedDeploymentInformer = fedutil.NewFederatedInformer(federationClient, deploymentFedInformerFactory, &clusterLifecycle)
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.Core().Pods(metav1.NamespaceAll).Watch(options)
},
},
&apiv1.Pod{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnAllChanges(
func(obj runtime.Object) {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, allDeploymentReviewDelay)
},
),
)
}
fdc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
fdc.deploymentStore, fdc.deploymentController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fdc.fedClient.Extensions().Deployments(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fdc.fedClient.Extensions().Deployments(metav1.NamespaceAll).Watch(options)
},
},
&extensionsv1.Deployment{},
controller.NoResyncPeriodFunc(),
fedutil.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { fdc.deliverFedDeploymentObj(obj, deploymentReviewDelay) },
),
)
fdc.fedUpdater = fedutil.NewFederatedUpdater(fdc.fedDeploymentInformer, "deployment", updateTimeout, fdc.eventRecorder,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment)
_, err := client.Extensions().Deployments(rs.Namespace).Create(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment)
_, err := client.Extensions().Deployments(rs.Namespace).Update(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.Deployment)
orphanDependents := false
err := client.Extensions().Deployments(rs.Namespace).Delete(rs.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
return err
})
fdc.deletionHelper = deletionhelper.NewDeletionHelper(
fdc.updateDeployment,
// objNameFunc
func(obj runtime.Object) string {
deployment := obj.(*extensionsv1.Deployment)
return fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)
},
fdc.fedDeploymentInformer,
fdc.fedUpdater,
)
return fdc
}
// Sends the given updated object to apiserver.
// Assumes that the given object is a deployment.
func (fdc *DeploymentController) updateDeployment(obj runtime.Object) (runtime.Object, error) {
deployment := obj.(*extensionsv1.Deployment)
return fdc.fedClient.Extensions().Deployments(deployment.Namespace).Update(deployment)
}
func (fdc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
go fdc.deploymentController.Run(stopCh)
fdc.fedDeploymentInformer.Start()
fdc.fedPodInformer.Start()
fdc.deploymentDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
fdc.deploymentWorkQueue.Add(item.Key)
})
fdc.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
fdc.reconcileDeploymentsOnClusterChange()
})
// Wait until the cluster is synced to prevent the update storm at the very beginning.
for !fdc.isSynced() {
time.Sleep(5 * time.Millisecond)
glog.V(3).Infof("Waiting for controller to sync up")
}
for i := 0; i < workers; i++ {
go wait.Until(fdc.worker, time.Second, stopCh)
}
fedutil.StartBackoffGC(fdc.deploymentBackoff, stopCh)
<-stopCh
glog.Infof("Shutting down DeploymentController")
fdc.deploymentDeliverer.Stop()
fdc.clusterDeliverer.Stop()
fdc.deploymentWorkQueue.ShutDown()
fdc.fedDeploymentInformer.Stop()
fdc.fedPodInformer.Stop()
}
func (fdc *DeploymentController) isSynced() bool {
if !fdc.fedDeploymentInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clustersFromDeps, err := fdc.fedDeploymentInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !fdc.fedDeploymentInformer.GetTargetStore().ClustersSynced(clustersFromDeps) {
return false
}
if !fdc.fedPodInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clustersFromPods, err := fdc.fedPodInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
// This also checks whether podInformer and deploymentInformer have the
// same cluster lists.
if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromDeps) {
glog.V(2).Infof("Pod informer not synced")
return false
}
if !fdc.fedPodInformer.GetTargetStore().ClustersSynced(clustersFromPods) {
glog.V(2).Infof("Pod informer not synced")
return false
}
if !fdc.deploymentController.HasSynced() {
glog.V(2).Infof("federation deployment list not synced")
return false
}
return true
}
func (fdc *DeploymentController) deliverLocalDeployment(obj interface{}, duration time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return
}
_, exists, err := fdc.deploymentStore.GetByKey(key)
if err != nil {
glog.Errorf("Couldn't get federation deployment %v: %v", key, err)
return
}
if exists { // ignore deployments exists only in local k8s
fdc.deliverDeploymentByKey(key, duration, false)
}
}
func (fdc *DeploymentController) deliverFedDeploymentObj(obj interface{}, delay time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
fdc.deliverDeploymentByKey(key, delay, false)
}
func (fdc *DeploymentController) deliverDeploymentByKey(key string, delay time.Duration, failed bool) {
if failed {
fdc.deploymentBackoff.Next(key, time.Now())
delay = delay + fdc.deploymentBackoff.Get(key)
} else {
fdc.deploymentBackoff.Reset(key)
}
fdc.deploymentDeliverer.DeliverAfter(key, nil, delay)
}
func (fdc *DeploymentController) worker() {
for {
item, quit := fdc.deploymentWorkQueue.Get()
if quit {
return
}
key := item.(string)
status, err := fdc.reconcileDeployment(key)
fdc.deploymentWorkQueue.Done(item)
if err != nil {
glog.Errorf("Error syncing cluster controller: %v", err)
fdc.deliverDeploymentByKey(key, 0, true)
} else {
switch status {
case statusAllOk:
break
case statusError:
fdc.deliverDeploymentByKey(key, 0, true)
case statusNeedRecheck:
fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false)
case statusNotSynced:
fdc.deliverDeploymentByKey(key, clusterAvailableDelay, false)
default:
glog.Errorf("Unhandled reconciliation status: %s", status)
fdc.deliverDeploymentByKey(key, deploymentReviewDelay, false)
}
}
}
}
type podAnalysisResult struct {
// Total number of pods created.
total int
// Number of pods that are running and ready.
runningAndReady int
// Number of pods that have been in unschedulable state for UnshedulableThreshold seconds.
unschedulable int
// TODO: Handle other scenarios like pod waiting too long for scheduler etc.
}
const (
// TODO: make it configurable
unschedulableThreshold = 60 * time.Second
)
// A function that calculates how many pods from the list are in one of
// the meaningful (from the replica set perspective) states. This function is
// a temporary workaround against the current lack of ownerRef in pods.
// TODO(perotinus): Unify this with the ReplicaSet controller.
func analyzePods(selectorv1 *metav1.LabelSelector, allPods []fedutil.FederatedObject, currentTime time.Time) (map[string]podAnalysisResult, error) {
selector, err := metav1.LabelSelectorAsSelector(selectorv1)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}
result := make(map[string]podAnalysisResult)
for _, fedObject := range allPods {
pod, isPod := fedObject.Object.(*apiv1.Pod)
if !isPod {
return nil, fmt.Errorf("invalid arg content - not a *pod")
}
if !selector.Empty() && selector.Matches(labels.Set(pod.Labels)) {
status := result[fedObject.ClusterName]
status.total++
for _, condition := range pod.Status.Conditions {
if pod.Status.Phase == apiv1.PodRunning {
if condition.Type == apiv1.PodReady {
status.runningAndReady++
}
} else {
if condition.Type == apiv1.PodScheduled &&
condition.Status == apiv1.ConditionFalse &&
condition.Reason == apiv1.PodReasonUnschedulable &&
condition.LastTransitionTime.Add(unschedulableThreshold).Before(currentTime) {
status.unschedulable++
}
}
}
result[fedObject.ClusterName] = status
}
}
return result, nil
}
func (fdc *DeploymentController) schedule(fd *extensionsv1.Deployment, clusters []*fedv1.Cluster,
current map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
// TODO: integrate real scheduler
plannerToBeUsed := fdc.defaultPlanner
fdPref, err := replicapreferences.GetAllocationPreferences(fd, FedDeploymentPreferencesAnnotation)
if err != nil {
glog.Info("Invalid Deployment specific preference, use default. deployment: %v, err: %v", fd.Name, err)
}
if fdPref != nil { // create a new planner if user specified a preference
plannerToBeUsed = planner.NewPlanner(fdPref)
}
replicas := int64(0)
if fd.Spec.Replicas != nil {
replicas = int64(*fd.Spec.Replicas)
}
var clusterNames []string
for _, cluster := range clusters {
clusterNames = append(clusterNames, cluster.Name)
}
scheduleResult, overflow := plannerToBeUsed.Plan(replicas, clusterNames, current, estimatedCapacity,
fd.Namespace+"/"+fd.Name)
// make sure the result contains all clusters that currently have some replicas.
result := make(map[string]int64)
for clusterName := range current {
result[clusterName] = 0
}
for clusterName, replicas := range scheduleResult {
result[clusterName] = replicas
}
for clusterName, replicas := range overflow {
result[clusterName] += replicas
}
if glog.V(4) {
buf := bytes.NewBufferString(fmt.Sprintf("Schedule - Deployment: %s/%s\n", fd.Namespace, fd.Name))
sort.Strings(clusterNames)
for _, clusterName := range clusterNames {
cur := current[clusterName]
target := scheduleResult[clusterName]
fmt.Fprintf(buf, "%s: current: %d target: %d", clusterName, cur, target)
if over, found := overflow[clusterName]; found {
fmt.Fprintf(buf, " overflow: %d", over)
}
if capacity, found := estimatedCapacity[clusterName]; found {
fmt.Fprintf(buf, " capacity: %d", capacity)
}
fmt.Fprintf(buf, "\n")
}
glog.V(4).Infof(buf.String())
}
return result
}
type reconciliationStatus string
const (
statusAllOk = reconciliationStatus("ALL_OK")
statusNeedRecheck = reconciliationStatus("RECHECK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)
func (fdc *DeploymentController) reconcileDeployment(key string) (reconciliationStatus, error) {
if !fdc.isSynced() {
return statusNotSynced, nil
}
glog.V(4).Infof("Start reconcile deployment %q", key)
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile deployment %q (%v)", key, time.Now().Sub(startTime))
objFromStore, exists, err := fdc.deploymentStore.GetByKey(key)
if err != nil {
return statusError, err
}
if !exists {
// don't delete local deployments for now. Do not reconcile it anymore.
return statusAllOk, nil
}
obj, err := api.Scheme.DeepCopy(objFromStore)
fd, ok := obj.(*extensionsv1.Deployment)
if err != nil || !ok {
glog.Errorf("Error in retrieving obj from store: %v, %v", ok, err)
return statusError, err
}
if fd.DeletionTimestamp != nil {
if err := fdc.delete(fd); err != nil {
glog.Errorf("Failed to delete %s: %v", fd.Name, err)
fdc.eventRecorder.Eventf(fd, api.EventTypeWarning, "DeleteFailed",
"Deployment delete failed: %v", err)
return statusError, err
}
return statusAllOk, nil
}
glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for deployment: %s",
fd.Name)
// Add the required finalizers before creating a deployment in underlying clusters.
updatedDeploymentObj, err := fdc.deletionHelper.EnsureFinalizers(fd)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in deployment %s: %v",
fd.Name, err)
return statusError, err
}
fd = updatedDeploymentObj.(*extensionsv1.Deployment)
glog.V(3).Infof("Syncing deployment %s in underlying clusters", fd.Name)
clusters, err := fdc.fedDeploymentInformer.GetReadyClusters()
if err != nil {
return statusError, err
}
// collect current status and do schedule
allPods, err := fdc.fedPodInformer.GetTargetStore().List()
if err != nil {
return statusError, err
}
podStatus, err := analyzePods(fd.Spec.Selector, allPods, time.Now())
current := make(map[string]int64)
estimatedCapacity := make(map[string]int64)
for _, cluster := range clusters {
ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
return statusError, err
}
if exists {
ld := ldObj.(*extensionsv1.Deployment)
current[cluster.Name] = int64(podStatus[cluster.Name].runningAndReady) // include pending as well?
unschedulable := int64(podStatus[cluster.Name].unschedulable)
if unschedulable > 0 {
estimatedCapacity[cluster.Name] = int64(*ld.Spec.Replicas) - unschedulable
}
}
}
scheduleResult := fdc.schedule(fd, clusters, current, estimatedCapacity)
glog.V(4).Infof("Start syncing local deployment %s: %v", key, scheduleResult)
fedStatus := extensionsv1.DeploymentStatus{ObservedGeneration: fd.Generation}
operations := make([]fedutil.FederatedOperation, 0)
for clusterName, replicas := range scheduleResult {
ldObj, exists, err := fdc.fedDeploymentInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return statusError, err
}
// The object can be modified.
ld := fedutil.DeepCopyDeployment(fd)
specReplicas := int32(replicas)
ld.Spec.Replicas = &specReplicas
if !exists {
if replicas > 0 {
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: ld,
ClusterName: clusterName,
Key: key,
})
}
} else {
// TODO: Update only one deployment at a time if update strategy is rolling update.
currentLd := ldObj.(*extensionsv1.Deployment)
// Update existing replica set, if needed.
if !fedutil.DeploymentEquivalent(ld, currentLd) {
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: ld,
ClusterName: clusterName,
Key: key,
})
glog.Infof("Updating %s in %s", currentLd.Name, clusterName)
}
fedStatus.Replicas += currentLd.Status.Replicas
fedStatus.AvailableReplicas += currentLd.Status.AvailableReplicas
fedStatus.UnavailableReplicas += currentLd.Status.UnavailableReplicas
fedStatus.ReadyReplicas += currentLd.Status.ReadyReplicas
}
}
if fedStatus.Replicas != fd.Status.Replicas ||
fedStatus.AvailableReplicas != fd.Status.AvailableReplicas ||
fedStatus.UnavailableReplicas != fd.Status.UnavailableReplicas ||
fedStatus.ReadyReplicas != fd.Status.ReadyReplicas {
fd.Status = fedStatus
_, err = fdc.fedClient.Extensions().Deployments(fd.Namespace).UpdateStatus(fd)
if err != nil {
return statusError, err
}
}
if len(operations) == 0 {
// Everything is in order
return statusAllOk, nil
}
err = fdc.fedUpdater.Update(operations)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err
}
// Some operations were made, reconcile after a while.
return statusNeedRecheck, nil
}
func (fdc *DeploymentController) reconcileDeploymentsOnClusterChange() {
if !fdc.isSynced() {
fdc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}
deps := fdc.deploymentStore.List()
for _, dep := range deps {
key, _ := controller.KeyFunc(dep)
fdc.deliverDeploymentByKey(key, 0, false)
}
}
// delete deletes the given deployment or returns error if the deletion was not complete.
func (fdc *DeploymentController) delete(deployment *extensionsv1.Deployment) error {
glog.V(3).Infof("Handling deletion of deployment: %v", *deployment)
_, err := fdc.deletionHelper.HandleObjectInUnderlyingClusters(deployment)
if err != nil {
return err
}
err = fdc.fedClient.Extensions().Deployments(deployment.Namespace).Delete(deployment.Name, nil)
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
// This is expected when we are processing an update as a result of deployment finalizer deletion.
// The process that deleted the last finalizer is also going to delete the deployment and we do not have to do anything.
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete deployment: %v", err)
}
}
return nil
}

View File

@ -44,6 +44,7 @@ go_test(
name = "go_default_test",
srcs = [
"controller_test.go",
"deploymentcontroller_test.go",
"replicasetcontroller_test.go",
],
library = ":go_default_library",
@ -62,6 +63,8 @@ go_test(
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],

View File

@ -14,13 +14,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
package sync
import (
"flag"
"fmt"
"testing"
"time"
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
@ -30,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
fedv1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
@ -39,7 +39,6 @@ import (
const (
deployments = "deployments"
pods = "pods"
)
func TestDeploymentController(t *testing.T) {
@ -47,11 +46,6 @@ func TestDeploymentController(t *testing.T) {
flag.Set("v", "5")
flag.Parse()
deploymentReviewDelay = 500 * time.Millisecond
clusterAvailableDelay = 100 * time.Millisecond
clusterUnavailableDelay = 100 * time.Millisecond
allDeploymentReviewDelay = 500 * time.Millisecond
cluster1 := NewCluster("cluster1", apiv1.ConditionTrue)
cluster2 := NewCluster("cluster2", apiv1.ConditionTrue)
@ -65,18 +59,15 @@ func TestDeploymentController(t *testing.T) {
cluster1Client := &fakekubeclientset.Clientset{}
cluster1Watch := RegisterFakeWatch(deployments, &cluster1Client.Fake)
_ = RegisterFakeWatch(pods, &cluster1Client.Fake)
RegisterFakeList(deployments, &cluster1Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}})
cluster1CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster1Client.Fake, cluster1Watch)
cluster1UpdateChan := RegisterFakeCopyOnUpdate(deployments, &cluster1Client.Fake, cluster1Watch)
cluster2Client := &fakekubeclientset.Clientset{}
cluster2Watch := RegisterFakeWatch(deployments, &cluster2Client.Fake)
_ = RegisterFakeWatch(pods, &cluster2Client.Fake)
RegisterFakeList(deployments, &cluster2Client.Fake, &extensionsv1.DeploymentList{Items: []extensionsv1.Deployment{}})
cluster2CreateChan := RegisterFakeCopyOnCreate(deployments, &cluster2Client.Fake, cluster2Watch)
deploymentController := NewDeploymentController(fakeClient)
deploymentController := newFederationSyncController(fakeClient, federatedtypes.NewDeploymentAdapter(fakeClient))
deploymentController.minimizeLatency()
clientFactory := func(cluster *fedv1.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
@ -87,11 +78,10 @@ func TestDeploymentController(t *testing.T) {
return nil, fmt.Errorf("Unknown cluster")
}
}
ToFederatedInformerForTestOnly(deploymentController.fedDeploymentInformer).SetClientFactory(clientFactory)
ToFederatedInformerForTestOnly(deploymentController.fedPodInformer).SetClientFactory(clientFactory)
ToFederatedInformerForTestOnly(deploymentController.informer).SetClientFactory(clientFactory)
stop := make(chan struct{})
go deploymentController.Run(5, stop)
go deploymentController.Run(stop)
// Create deployment. Expect to see it in cluster1.
dep1 := newDeploymentWithReplicas("depA", 6)
@ -113,7 +103,7 @@ func TestDeploymentController(t *testing.T) {
}
assert.NoError(t, CheckObjectFromChan(cluster1CreateChan, checkDeployment(dep1, *dep1.Spec.Replicas)))
err := WaitForStoreUpdate(
deploymentController.fedDeploymentInformer.GetTargetStore(),
deploymentController.informer.GetTargetStore(),
cluster1.Name, types.NamespacedName{Namespace: dep1.Namespace, Name: dep1.Name}.String(), wait.ForeverTestTimeout)
assert.Nil(t, err, "deployment should have appeared in the informer store")
@ -132,7 +122,7 @@ func TestDeploymentController(t *testing.T) {
// Add new deployment with non-default replica placement preferences.
dep2 := newDeploymentWithReplicas("deployment2", 9)
dep2.Annotations = make(map[string]string)
dep2.Annotations[FedDeploymentPreferencesAnnotation] = `{"rebalance": true,
dep2.Annotations[federatedtypes.FedDeploymentPreferencesAnnotation] = `{"rebalance": true,
"clusters": {
"cluster1": {"weight": 2},
"cluster2": {"weight": 1}