mirror of https://github.com/k3s-io/k3s
Merge pull request #49950 from irfanurrehman/fed-hpa-targetObj
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.. [Federation] Hpa controller controls target objects This is in the series of PRs over https://github.com/kubernetes/kubernetes/pull/45993. The last commit is reviewable. Probably the last PR in this chain with e2e tests for relevant scenario, including the scenario created by this PR is soon to follow. **Special notes for your reviewer**: @kubernetes/sig-federation-pr-reviews @quinton-hoole **Release note**: ```NONE ```pull/6/head
commit
8657a74e13
|
@ -44,15 +44,18 @@ go_library(
|
||||||
"//federation/apis/federation/v1beta1:go_default_library",
|
"//federation/apis/federation/v1beta1:go_default_library",
|
||||||
"//federation/client/clientset_generated/federation_clientset:go_default_library",
|
"//federation/client/clientset_generated/federation_clientset:go_default_library",
|
||||||
"//federation/pkg/federation-controller/util:go_default_library",
|
"//federation/pkg/federation-controller/util:go_default_library",
|
||||||
|
"//federation/pkg/federation-controller/util/hpa:go_default_library",
|
||||||
"//federation/pkg/federation-controller/util/planner:go_default_library",
|
"//federation/pkg/federation-controller/util/planner:go_default_library",
|
||||||
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
|
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
|
||||||
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
|
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
|
"//pkg/apis/extensions:go_default_library",
|
||||||
"//pkg/controller/namespace/deletion:go_default_library",
|
"//pkg/controller/namespace/deletion:go_default_library",
|
||||||
"//vendor/github.com/golang/glog:go_default_library",
|
"//vendor/github.com/golang/glog:go_default_library",
|
||||||
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
|
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
|
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
|
|
@ -17,10 +17,12 @@ limitations under the License.
|
||||||
package federatedtypes
|
package federatedtypes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"fmt"
|
|
||||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||||
|
"k8s.io/api/extensions/v1beta1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
pkgruntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
@ -31,6 +33,10 @@ import (
|
||||||
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
|
||||||
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||||
|
hpautil "k8s.io/kubernetes/federation/pkg/federation-controller/util/hpa"
|
||||||
|
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -165,7 +171,7 @@ func (a *HpaAdapter) NewTestObject(namespace string) pkgruntime.Object {
|
||||||
},
|
},
|
||||||
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
|
Spec: autoscalingv1.HorizontalPodAutoscalerSpec{
|
||||||
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
|
ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{
|
||||||
Kind: "replicaset",
|
Kind: "ReplicaSet",
|
||||||
Name: "myrs",
|
Name: "myrs",
|
||||||
},
|
},
|
||||||
MinReplicas: &min,
|
MinReplicas: &min,
|
||||||
|
@ -443,6 +449,10 @@ func (a *HpaAdapter) UpdateFederatedStatus(obj pkgruntime.Object, schedulingInfo
|
||||||
return fmt.Errorf("Error updating hpa: %s status in federation: %v", fedHpa.Name, err)
|
return fmt.Errorf("Error updating hpa: %s status in federation: %v", fedHpa.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := a.updateClusterListOnTargetObject(fedHpa, schedulingInfo.(*hpaSchedulingInfo).scheduleState); err != nil {
|
||||||
|
return fmt.Errorf("Error updating cluster list on object targetted by hpa: %s: %v", fedHpa.Name, err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -936,3 +946,70 @@ func (a *HpaAdapter) minReplicasIncreasable(obj pkgruntime.Object) bool {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateClusterListOnTargetObject passes the necessary info to the target object,
|
||||||
|
// so that the corresponding controller can act on that.
|
||||||
|
// This is used because if an hpa is active on a federated object it is supposed
|
||||||
|
// to control the replicas and presence/absence of target object from federated clusters.
|
||||||
|
func (a *HpaAdapter) updateClusterListOnTargetObject(fedHpa *autoscalingv1.HorizontalPodAutoscaler, scheduleStatus map[string]*replicaNums) error {
|
||||||
|
if len(fedHpa.Spec.ScaleTargetRef.Kind) <= 0 || len(fedHpa.Spec.ScaleTargetRef.Name) <= 0 {
|
||||||
|
// nothing to do
|
||||||
|
glog.Infof("Fed HPA: cluster list update on target object skipped for target obj: %s, kind: %s", fedHpa.Spec.ScaleTargetRef.Name, fedHpa.Spec.ScaleTargetRef.Kind)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
names := []string{}
|
||||||
|
for clusterName, replicas := range scheduleStatus {
|
||||||
|
if replicas != nil {
|
||||||
|
names = append(names, clusterName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
clusterNames := hpautil.ClusterNames{Names: names}
|
||||||
|
qualifiedKind := extensionsinternal.Kind(fedHpa.Spec.ScaleTargetRef.Kind)
|
||||||
|
targetObj, err := getRuntimeObjectForKind(a.client, qualifiedKind, fedHpa.Namespace, fedHpa.Spec.ScaleTargetRef.Name)
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
// Nothing to do; the target object does not exist in federation.
|
||||||
|
glog.Infof("Fed HPA: cluster list update on target object skipped for target obj: %s, kind: %s. Target object missing in federation", fedHpa.Spec.ScaleTargetRef.Name, fedHpa.Spec.ScaleTargetRef.Kind)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
updatedObj := hpautil.SetHpaTargetClusterList(targetObj, clusterNames)
|
||||||
|
_, err = updateRuntimeObjectForKind(a.client, qualifiedKind, fedHpa.Namespace, updatedObj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getRuntimeObjectForKind gets the hpa targetted object from the federation control plane.
|
||||||
|
// As of now, federation only supports "ReplicaSets" and "Deployments", which is the reason
|
||||||
|
// this function only lists these two types.
|
||||||
|
// TODO: update a similar info in federated hpa documentation.
|
||||||
|
func getRuntimeObjectForKind(c federationclientset.Interface, kind schema.GroupKind, ns, name string) (pkgruntime.Object, error) {
|
||||||
|
switch kind {
|
||||||
|
case extensionsinternal.Kind("ReplicaSet"):
|
||||||
|
return c.ExtensionsV1beta1().ReplicaSets(ns).Get(name, metav1.GetOptions{})
|
||||||
|
case extensionsinternal.Kind("Deployment"):
|
||||||
|
return c.ExtensionsV1beta1().Deployments(ns).Get(name, metav1.GetOptions{})
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unsupported federated kind targeted by hpa: %v", kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateRuntimeObjectForKind updates the hpa targetted object in the federation control plane.
|
||||||
|
// As of now, federation only supports "ReplicaSets" and "Deployments", which is the reason
|
||||||
|
// this function only lists these two types.
|
||||||
|
// TODO: update a similar info in federated hpa documentation.
|
||||||
|
func updateRuntimeObjectForKind(c federationclientset.Interface, kind schema.GroupKind, ns string, obj pkgruntime.Object) (pkgruntime.Object, error) {
|
||||||
|
switch kind {
|
||||||
|
case extensionsinternal.Kind("ReplicaSet"):
|
||||||
|
return c.ExtensionsV1beta1().ReplicaSets(ns).Update(obj.(*v1beta1.ReplicaSet))
|
||||||
|
case extensionsinternal.Kind("Deployment"):
|
||||||
|
return c.ExtensionsV1beta1().Deployments(ns).Update(obj.(*v1beta1.Deployment))
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("Unsupported federated kind targeted by hpa: %v", kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
fedapi "k8s.io/kubernetes/federation/apis/federation"
|
fedapi "k8s.io/kubernetes/federation/apis/federation"
|
||||||
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
|
||||||
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||||
|
hpautil "k8s.io/kubernetes/federation/pkg/federation-controller/util/hpa"
|
||||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
|
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
|
||||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
|
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
|
||||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
|
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
|
||||||
|
@ -47,9 +48,9 @@ const (
|
||||||
ActionDelete = "delete"
|
ActionDelete = "delete"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReplicaSchedulingStatus contains the status of the replica type objects (rs or deployment)
|
// ReplicaStatus contains the details of status fields from the cluster objects,
|
||||||
// that are being scheduled into joined clusters.
|
// which need accumulation to update the status of the federated object.
|
||||||
type ReplicaSchedulingStatus struct {
|
type ReplicaStatus struct {
|
||||||
Replicas int32
|
Replicas int32
|
||||||
UpdatedReplicas int32
|
UpdatedReplicas int32
|
||||||
FullyLabeledReplicas int32
|
FullyLabeledReplicas int32
|
||||||
|
@ -57,11 +58,18 @@ type ReplicaSchedulingStatus struct {
|
||||||
AvailableReplicas int32
|
AvailableReplicas int32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReplicaScheduleState is the result of adapter specific schedule() function,
|
||||||
|
// which is then used to update objects into clusters.
|
||||||
|
type ReplicaScheduleState struct {
|
||||||
|
isSelected bool
|
||||||
|
replicas int64
|
||||||
|
}
|
||||||
|
|
||||||
// ReplicaSchedulingInfo wraps the information that a replica type (rs or deployment)
|
// ReplicaSchedulingInfo wraps the information that a replica type (rs or deployment)
|
||||||
// SchedulingAdapter needs to update objects per a schedule.
|
// SchedulingAdapter needs to update objects per a schedule.
|
||||||
type ReplicaSchedulingInfo struct {
|
type ReplicaSchedulingInfo struct {
|
||||||
Schedule map[string]int64
|
ScheduleState map[string]*ReplicaScheduleState
|
||||||
Status ReplicaSchedulingStatus
|
Status ReplicaStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
// SchedulingAdapter defines operations for interacting with a
|
// SchedulingAdapter defines operations for interacting with a
|
||||||
|
@ -87,6 +95,58 @@ func (a *replicaSchedulingAdapter) IsSchedulingAdapter() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isSelected(names []string, name string) bool {
|
||||||
|
for _, val := range names {
|
||||||
|
if val == name {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func isObjHpaControlled(fedObj pkgruntime.Object) (bool, error) {
|
||||||
|
hpaSelectedClusters, error := hpautil.GetHpaTargetClusterList(fedObj)
|
||||||
|
if error != nil {
|
||||||
|
return false, error
|
||||||
|
}
|
||||||
|
|
||||||
|
if hpaSelectedClusters == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// initializeScheduleState initializes the schedule state for consumption by schedule
|
||||||
|
// functions (schedule or simple schedule). After this initialization the state would
|
||||||
|
// already have information, if only a subset of clusters targetted by hpa, or all clusters
|
||||||
|
// need to be considered by the actual scheduling functions.
|
||||||
|
// The return bool named hpaControlled tells if this object is controlled by hpa or not.
|
||||||
|
func initializeScheduleState(fedObj pkgruntime.Object, clusterNames []string) (map[string]*ReplicaScheduleState, bool, error) {
|
||||||
|
initialState := make(map[string]*ReplicaScheduleState)
|
||||||
|
hpaControlled := false
|
||||||
|
hpaSelectedClusters, error := hpautil.GetHpaTargetClusterList(fedObj)
|
||||||
|
if error != nil {
|
||||||
|
return nil, hpaControlled, error
|
||||||
|
}
|
||||||
|
|
||||||
|
if hpaSelectedClusters != nil {
|
||||||
|
hpaControlled = true
|
||||||
|
}
|
||||||
|
for _, clusterName := range clusterNames {
|
||||||
|
replicaState := ReplicaScheduleState{
|
||||||
|
isSelected: false,
|
||||||
|
replicas: 0,
|
||||||
|
}
|
||||||
|
if hpaControlled {
|
||||||
|
if isSelected(hpaSelectedClusters.Names, clusterName) {
|
||||||
|
replicaState.isSelected = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
initialState[clusterName] = &replicaState
|
||||||
|
}
|
||||||
|
return initialState, hpaControlled, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
|
func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*federationapi.Cluster, informer fedutil.FederatedInformer) (interface{}, error) {
|
||||||
var clusterNames []string
|
var clusterNames []string
|
||||||
for _, cluster := range clusters {
|
for _, cluster := range clusters {
|
||||||
|
@ -113,6 +173,23 @@ func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string
|
||||||
}
|
}
|
||||||
return clientset.Core().Pods(metadata.GetNamespace()).List(metav1.ListOptions{LabelSelector: selector.String()})
|
return clientset.Core().Pods(metadata.GetNamespace()).List(metav1.ListOptions{LabelSelector: selector.String()})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initializedState, hpaControlled, err := initializeScheduleState(obj, clusterNames)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if hpaControlled {
|
||||||
|
state, err := simpleSchedule(initializedState, key, objectGetter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &ReplicaSchedulingInfo{
|
||||||
|
ScheduleState: state,
|
||||||
|
Status: ReplicaStatus{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
|
currentReplicasPerCluster, estimatedCapacity, err := clustersReplicaState(clusterNames, key, objectGetter, podsGetter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -133,20 +210,14 @@ func (a *replicaSchedulingAdapter) GetSchedule(obj pkgruntime.Object, key string
|
||||||
plnr := planner.NewPlanner(fedPref)
|
plnr := planner.NewPlanner(fedPref)
|
||||||
|
|
||||||
return &ReplicaSchedulingInfo{
|
return &ReplicaSchedulingInfo{
|
||||||
Schedule: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity),
|
ScheduleState: schedule(plnr, obj, key, clusterNames, currentReplicasPerCluster, estimatedCapacity, initializedState),
|
||||||
Status: ReplicaSchedulingStatus{},
|
Status: ReplicaStatus{},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) {
|
func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster, clusterObj pkgruntime.Object, federationObjCopy pkgruntime.Object, schedulingInfo interface{}) (pkgruntime.Object, ScheduleAction, error) {
|
||||||
typedSchedulingInfo := schedulingInfo.(*ReplicaSchedulingInfo)
|
typedSchedulingInfo := schedulingInfo.(*ReplicaSchedulingInfo)
|
||||||
replicas, ok := typedSchedulingInfo.Schedule[cluster.Name]
|
clusterScheduleState := typedSchedulingInfo.ScheduleState[cluster.Name]
|
||||||
if !ok {
|
|
||||||
replicas = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
specReplicas := int32(replicas)
|
|
||||||
reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas))
|
|
||||||
|
|
||||||
if clusterObj != nil {
|
if clusterObj != nil {
|
||||||
schedulingStatusVal := reflect.ValueOf(typedSchedulingInfo).Elem().FieldByName("Status")
|
schedulingStatusVal := reflect.ValueOf(typedSchedulingInfo).Elem().FieldByName("Status")
|
||||||
|
@ -162,10 +233,19 @@ func (a *replicaSchedulingAdapter) ScheduleObject(cluster *federationapi.Cluster
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var action ScheduleAction = ""
|
var action ScheduleAction = ""
|
||||||
if replicas > 0 {
|
specReplicas := int32(0)
|
||||||
|
// If the cluster has been selected (isSelected = true; for example by hpa)
|
||||||
|
// and the obj does not get any replicas, then it should create one with
|
||||||
|
// 0 replicas (which can then be scaled by hpa in that cluster).
|
||||||
|
// On the other hand we keep the action as "unassigned" if this cluster was
|
||||||
|
// not selected, and let the sync controller decide what to do.
|
||||||
|
if clusterScheduleState.isSelected {
|
||||||
|
specReplicas = int32(clusterScheduleState.replicas)
|
||||||
action = ActionAdd
|
action = ActionAdd
|
||||||
}
|
}
|
||||||
|
reflect.ValueOf(federationObjCopy).Elem().FieldByName("Spec").FieldByName("Replicas").Set(reflect.ValueOf(&specReplicas))
|
||||||
return federationObjCopy, action, nil
|
return federationObjCopy, action, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,22 +253,48 @@ func (a *replicaSchedulingAdapter) UpdateFederatedStatus(obj pkgruntime.Object,
|
||||||
return a.updateStatusFunc(obj, schedulingInfo)
|
return a.updateStatusFunc(obj, schedulingInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64) map[string]int64 {
|
// simpleSchedule get replicas from only those clusters which are selected (by hpa scheduler).
|
||||||
|
// This aim of this is to ensure that this controller does not update objects, which are
|
||||||
|
// targetted by hpa.
|
||||||
|
func simpleSchedule(scheduleState map[string]*ReplicaScheduleState, key string, objectGetter func(clusterName string, key string) (interface{}, bool, error)) (map[string]*ReplicaScheduleState, error) {
|
||||||
|
for clusterName, state := range scheduleState {
|
||||||
|
// Get and consider replicas only for those clusters which are selected by hpa.
|
||||||
|
if state.isSelected {
|
||||||
|
obj, exists, err := objectGetter(clusterName, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
state.replicas = reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return scheduleState, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func schedule(planner *planner.Planner, obj pkgruntime.Object, key string, clusterNames []string, currentReplicasPerCluster map[string]int64, estimatedCapacity map[string]int64, initialState map[string]*ReplicaScheduleState) map[string]*ReplicaScheduleState {
|
||||||
// TODO: integrate real scheduler
|
// TODO: integrate real scheduler
|
||||||
replicas := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int()
|
replicas := reflect.ValueOf(obj).Elem().FieldByName("Spec").FieldByName("Replicas").Elem().Int()
|
||||||
scheduleResult, overflow := planner.Plan(replicas, clusterNames, currentReplicasPerCluster, estimatedCapacity, key)
|
scheduleResult, overflow := planner.Plan(replicas, clusterNames, currentReplicasPerCluster, estimatedCapacity, key)
|
||||||
|
|
||||||
// Ensure that all current clusters end up in the scheduling result.
|
// Ensure that all current clusters end up in the scheduling result.
|
||||||
result := make(map[string]int64)
|
// initialState, is preinitialized with all isSelected to false.
|
||||||
|
result := initialState
|
||||||
for clusterName := range currentReplicasPerCluster {
|
for clusterName := range currentReplicasPerCluster {
|
||||||
result[clusterName] = 0
|
// We consider 0 replicas equaling to no need of creating a new object.
|
||||||
|
// isSchedule remains false in such case.
|
||||||
|
result[clusterName].replicas = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
for clusterName, replicas := range scheduleResult {
|
for clusterName, replicas := range scheduleResult {
|
||||||
result[clusterName] = replicas
|
result[clusterName].isSelected = true
|
||||||
|
result[clusterName].replicas = replicas
|
||||||
}
|
}
|
||||||
for clusterName, replicas := range overflow {
|
for clusterName, replicas := range overflow {
|
||||||
result[clusterName] += replicas
|
result[clusterName].isSelected = true
|
||||||
|
result[clusterName].replicas += replicas
|
||||||
}
|
}
|
||||||
|
|
||||||
if glog.V(4) {
|
if glog.V(4) {
|
||||||
|
|
|
@ -88,6 +88,7 @@ filegroup(
|
||||||
"//federation/pkg/federation-controller/util/deletionhelper:all-srcs",
|
"//federation/pkg/federation-controller/util/deletionhelper:all-srcs",
|
||||||
"//federation/pkg/federation-controller/util/eventsink:all-srcs",
|
"//federation/pkg/federation-controller/util/eventsink:all-srcs",
|
||||||
"//federation/pkg/federation-controller/util/finalizers:all-srcs",
|
"//federation/pkg/federation-controller/util/finalizers:all-srcs",
|
||||||
|
"//federation/pkg/federation-controller/util/hpa:all-srcs",
|
||||||
"//federation/pkg/federation-controller/util/planner:all-srcs",
|
"//federation/pkg/federation-controller/util/planner:all-srcs",
|
||||||
"//federation/pkg/federation-controller/util/podanalyzer:all-srcs",
|
"//federation/pkg/federation-controller/util/podanalyzer:all-srcs",
|
||||||
"//federation/pkg/federation-controller/util/replicapreferences:all-srcs",
|
"//federation/pkg/federation-controller/util/replicapreferences:all-srcs",
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["hpa.go"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["hpa_test.go"],
|
||||||
|
library = ":go_default_library",
|
||||||
|
deps = [
|
||||||
|
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||||
|
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
|
@ -0,0 +1,75 @@
|
||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package hpa
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// FederatedAnnotationOnHpaTargetObj as key, is used by hpa controller to
|
||||||
|
// set selected cluster name list as annotation on the target object.
|
||||||
|
FederatedAnnotationOnHpaTargetObj = "federation.kubernetes.io/hpa-target-cluster-list"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClusterNames stores the list of clusters represented by names as appearing on federation
|
||||||
|
// cluster objects. This is set by federation hpa and used by target objects federation
|
||||||
|
// controller to restrict that target object to only these clusters.
|
||||||
|
type ClusterNames struct {
|
||||||
|
Names []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cn *ClusterNames) String() string {
|
||||||
|
annotationBytes, _ := json.Marshal(cn)
|
||||||
|
return string(annotationBytes[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetHpaTargetClusterList is used to get the list of clusters from the target object
|
||||||
|
// annotations.
|
||||||
|
func GetHpaTargetClusterList(obj runtime.Object) (*ClusterNames, error) {
|
||||||
|
accessor, _ := meta.Accessor(obj)
|
||||||
|
targetObjAnno := accessor.GetAnnotations()
|
||||||
|
if targetObjAnno == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
targetObjAnnoString, exists := targetObjAnno[FederatedAnnotationOnHpaTargetObj]
|
||||||
|
if !exists {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
clusterNames := &ClusterNames{}
|
||||||
|
if err := json.Unmarshal([]byte(targetObjAnnoString), clusterNames); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return clusterNames, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetHpaTargetClusterList is used to set the list of clusters on the target object
|
||||||
|
// annotations.
|
||||||
|
func SetHpaTargetClusterList(obj runtime.Object, clusterNames ClusterNames) runtime.Object {
|
||||||
|
accessor, _ := meta.Accessor(obj)
|
||||||
|
anno := accessor.GetAnnotations()
|
||||||
|
if anno == nil {
|
||||||
|
anno = make(map[string]string)
|
||||||
|
accessor.SetAnnotations(anno)
|
||||||
|
}
|
||||||
|
anno[FederatedAnnotationOnHpaTargetObj] = clusterNames.String()
|
||||||
|
return obj
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package hpa
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetHpaTargetClusterList(t *testing.T) {
|
||||||
|
// Any object is fine for this test.
|
||||||
|
obj := &autoscalingv1.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "myhpa",
|
||||||
|
Namespace: "myNamespace",
|
||||||
|
SelfLink: "/api/mylink",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
clusterNames *ClusterNames
|
||||||
|
expectedErr bool
|
||||||
|
}{
|
||||||
|
"Wrong data set on annotations should return unmarshalling error when retrieving": {
|
||||||
|
expectedErr: true,
|
||||||
|
},
|
||||||
|
"Get clusternames on annotations with 2 clusters, should have same names, which were set": {
|
||||||
|
clusterNames: &ClusterNames{
|
||||||
|
Names: []string{
|
||||||
|
"c1",
|
||||||
|
"c2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for testName, testCase := range testCases {
|
||||||
|
t.Run(testName, func(t *testing.T) {
|
||||||
|
accessor, _ := meta.Accessor(obj)
|
||||||
|
anno := accessor.GetAnnotations()
|
||||||
|
if anno == nil {
|
||||||
|
anno = make(map[string]string)
|
||||||
|
accessor.SetAnnotations(anno)
|
||||||
|
}
|
||||||
|
if testCase.expectedErr {
|
||||||
|
anno[FederatedAnnotationOnHpaTargetObj] = "{" //some random string
|
||||||
|
} else {
|
||||||
|
anno[FederatedAnnotationOnHpaTargetObj] = testCase.clusterNames.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
readNames, err := GetHpaTargetClusterList(obj)
|
||||||
|
|
||||||
|
if testCase.expectedErr {
|
||||||
|
require.Error(t, err, "An error was expected")
|
||||||
|
} else {
|
||||||
|
require.Equal(t, testCase.clusterNames, readNames, "Names should have been equal")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetHpaTargetClusterList(t *testing.T) {
|
||||||
|
// Any object is fine for this test.
|
||||||
|
obj := &autoscalingv1.HorizontalPodAutoscaler{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "myhpa",
|
||||||
|
Namespace: "myNamespace",
|
||||||
|
SelfLink: "/api/mylink",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
clusterNames ClusterNames
|
||||||
|
expectedErr bool
|
||||||
|
}{
|
||||||
|
"Get clusternames on annotations with 2 clusters, should have same names, which were set": {
|
||||||
|
clusterNames: ClusterNames{
|
||||||
|
Names: []string{
|
||||||
|
"c1",
|
||||||
|
"c2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for testName, testCase := range testCases {
|
||||||
|
t.Run(testName, func(t *testing.T) {
|
||||||
|
|
||||||
|
SetHpaTargetClusterList(obj, testCase.clusterNames)
|
||||||
|
readNames, err := GetHpaTargetClusterList(obj)
|
||||||
|
require.NoError(t, err, "An error should not have happened")
|
||||||
|
require.Equal(t, &testCase.clusterNames, readNames, "Names should have been equal")
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue