switching rolling update to external clients

pull/8/head
David Eads 2018-08-02 16:32:30 -04:00
parent dab04dc6e0
commit 029b4388fe
7 changed files with 240 additions and 364 deletions

View File

@ -877,19 +877,6 @@ func WaitForObservedDeployment(getDeploymentFunc func() (*apps.Deployment, error
})
}
// TODO: remove the duplicate
// WaitForObservedInternalDeployment polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
// Returns error if polling timesout.
func WaitForObservedDeploymentInternal(getDeploymentFunc func() (*internalextensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {
return wait.Poll(interval, timeout, func() (bool, error) {
deployment, err := getDeploymentFunc()
if err != nil {
return false, err
}
return deployment.Status.ObservedGeneration >= desiredGeneration, nil
})
}
// ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one
// step. For example:
//

View File

@ -40,8 +40,7 @@ go_test(
"//pkg/api/testapi:go_default_library",
"//pkg/api/testing:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/kubectl/scheme:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/apps/v1beta1:go_default_library",
@ -63,8 +62,10 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/rest/fake:go_default_library",
@ -118,9 +119,6 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/apps/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/credentialprovider:go_default_library",
"//pkg/kubectl/apps:go_default_library",
@ -160,6 +158,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/util/integer:go_default_library",

View File

@ -58,7 +58,6 @@ go_library(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubectl/apply/parse:go_default_library",
"//pkg/kubectl/apply/strategy:go_default_library",

View File

@ -25,14 +25,14 @@ import (
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
@ -99,13 +99,13 @@ type RollingUpdateOptions struct {
EnforceNamespace bool
ScaleClient scaleclient.ScalesGetter
ClientSet internalclientset.Interface
ClientSet kubernetes.Interface
Builder *resource.Builder
ShouldValidate bool
Validator func(bool) (validation.Schema, error)
FindNewName func(*api.ReplicationController) string
FindNewName func(*corev1.ReplicationController) string
PrintFlags *genericclioptions.PrintFlags
ToPrinter func(string) (printers.ResourcePrinter, error)
@ -204,7 +204,7 @@ func (o *RollingUpdateOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, a
o.ShouldValidate = cmdutil.GetFlagBool(cmd, "validate")
o.Validator = f.Validator
o.FindNewName = func(obj *api.ReplicationController) string {
o.FindNewName = func(obj *corev1.ReplicationController) string {
return findNewName(args, obj)
}
@ -219,11 +219,7 @@ func (o *RollingUpdateOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, a
return err
}
clientConfig, err := f.ToRESTConfig()
if err != nil {
return err
}
o.ClientSet, err = internalclientset.NewForConfig(clientConfig)
o.ClientSet, err = f.KubernetesClientSet()
if err != nil {
return err
}
@ -251,9 +247,9 @@ func (o *RollingUpdateOptions) Run() error {
filename = o.FilenameOptions.Filenames[0]
}
coreClient := o.ClientSet.Core()
coreClient := o.ClientSet.CoreV1()
var newRc *api.ReplicationController
var newRc *corev1.ReplicationController
// fetch rc
oldRc, err := coreClient.ReplicationControllers(o.Namespace).Get(o.OldName, metav1.GetOptions{})
if err != nil {
@ -295,7 +291,7 @@ func (o *RollingUpdateOptions) Run() error {
return fmt.Errorf("please make sure %s exists and is not empty", filename)
}
uncastVersionedObj, err := legacyscheme.Scheme.ConvertToVersion(infos[0].Object, v1.SchemeGroupVersion)
uncastVersionedObj, err := scheme.Scheme.ConvertToVersion(infos[0].Object, corev1.SchemeGroupVersion)
if err != nil {
glog.V(4).Infof("Object %T is not a ReplicationController", infos[0].Object)
return fmt.Errorf("%s contains a %v not a ReplicationController", filename, infos[0].Object.GetObjectKind().GroupVersionKind())
@ -303,12 +299,7 @@ func (o *RollingUpdateOptions) Run() error {
switch t := uncastVersionedObj.(type) {
case *v1.ReplicationController:
replicasDefaulted = t.Spec.Replicas == nil
// previous code ignored the error. Seem like it's very unlikely to fail, so ok for now.
uncastObj, err := legacyscheme.Scheme.ConvertToVersion(uncastVersionedObj, api.SchemeGroupVersion)
if err == nil {
newRc, _ = uncastObj.(*api.ReplicationController)
}
newRc = t
}
if newRc == nil {
glog.V(4).Infof("Object %T is not a ReplicationController", infos[0].Object)
@ -343,7 +334,7 @@ func (o *RollingUpdateOptions) Run() error {
if len(o.PullPolicy) == 0 {
return fmt.Errorf("--image-pull-policy (Always|Never|IfNotPresent) must be provided when --image is the same as existing container image")
}
config.PullPolicy = api.PullPolicy(o.PullPolicy)
config.PullPolicy = corev1.PullPolicy(o.PullPolicy)
}
newRc, err = kubectl.CreateNewControllerFromCurrentController(coreClient, codec, config)
if err != nil {
@ -398,7 +389,8 @@ func (o *RollingUpdateOptions) Run() error {
}
// TODO: handle scales during rolling update
if replicasDefaulted {
newRc.Spec.Replicas = oldRc.Spec.Replicas
t := *oldRc.Spec.Replicas
newRc.Spec.Replicas = &t
}
if o.DryRun {
@ -467,7 +459,7 @@ func (o *RollingUpdateOptions) Run() error {
return printer.PrintObj(cmdutil.AsDefaultVersionedOrOriginal(newRc, nil), o.Out)
}
func findNewName(args []string, oldRc *api.ReplicationController) string {
func findNewName(args []string, oldRc *corev1.ReplicationController) string {
if len(args) >= 2 {
return args[1]
}

View File

@ -25,19 +25,15 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/api/pod"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/apps"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
)
// ControllerHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a controller's ReplicaSelector equals the Replicas count.
func ControllerHasDesiredReplicas(rcClient coreclient.ReplicationControllersGetter, controller *api.ReplicationController) wait.ConditionFunc {
func ControllerHasDesiredReplicas(rcClient corev1client.ReplicationControllersGetter, controller *corev1.ReplicationController) wait.ConditionFunc {
// If we're given a controller where the status lags the spec, it either means that the controller is stale,
// or that the rc manager hasn't noticed the update yet. Polling status.Replicas is not safe in the latter case.
@ -52,70 +48,7 @@ func ControllerHasDesiredReplicas(rcClient coreclient.ReplicationControllersGett
// or, after this check has passed, a modification causes the rc manager to create more pods.
// This will not be an issue once we've implemented graceful delete for rcs, but till then
// concurrent stop operations on the same rc might have unintended side effects.
return ctrl.Status.ObservedGeneration >= desiredGeneration && ctrl.Status.Replicas == ctrl.Spec.Replicas, nil
}
}
// ReplicaSetHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a ReplicaSet's ReplicaSelector equals the Replicas count.
func ReplicaSetHasDesiredReplicas(rsClient extensionsclient.ReplicaSetsGetter, replicaSet *extensions.ReplicaSet) wait.ConditionFunc {
// If we're given a ReplicaSet where the status lags the spec, it either means that the
// ReplicaSet is stale, or that the ReplicaSet manager hasn't noticed the update yet.
// Polling status.Replicas is not safe in the latter case.
desiredGeneration := replicaSet.Generation
return func() (bool, error) {
rs, err := rsClient.ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// There's a chance a concurrent update modifies the Spec.Replicas causing this check to
// pass, or, after this check has passed, a modification causes the ReplicaSet manager to
// create more pods. This will not be an issue once we've implemented graceful delete for
// ReplicaSets, but till then concurrent stop operations on the same ReplicaSet might have
// unintended side effects.
return rs.Status.ObservedGeneration >= desiredGeneration && rs.Status.Replicas == rs.Spec.Replicas, nil
}
}
// StatefulSetHasDesiredReplicas returns a condition that checks the number of StatefulSet replicas
func StatefulSetHasDesiredReplicas(ssClient appsclient.StatefulSetsGetter, ss *apps.StatefulSet) wait.ConditionFunc {
// If we're given a StatefulSet where the status lags the spec, it either means that the
// StatefulSet is stale, or that the StatefulSet manager hasn't noticed the update yet.
// Polling status.Replicas is not safe in the latter case.
desiredGeneration := ss.Generation
return func() (bool, error) {
ss, err := ssClient.StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// There's a chance a concurrent update modifies the Spec.Replicas causing this check to
// pass, or, after this check has passed, a modification causes the StatefulSet manager to
// create more pods. This will not be an issue once we've implemented graceful delete for
// StatefulSet, but till then concurrent stop operations on the same StatefulSet might have
// unintended side effects.
return ss.Status.ObservedGeneration != nil && *ss.Status.ObservedGeneration >= desiredGeneration && ss.Status.Replicas == ss.Spec.Replicas, nil
}
}
// DeploymentHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a deployment equals its updated replicas count.
// (non-terminated pods that have the desired template spec).
func DeploymentHasDesiredReplicas(dClient extensionsclient.DeploymentsGetter, deployment *extensions.Deployment) wait.ConditionFunc {
// If we're given a deployment where the status lags the spec, it either
// means that the deployment is stale, or that the deployment manager hasn't
// noticed the update yet. Polling status.Replicas is not safe in the latter
// case.
desiredGeneration := deployment.Generation
return func() (bool, error) {
deployment, err := dClient.Deployments(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return deployment.Status.ObservedGeneration >= desiredGeneration &&
deployment.Status.UpdatedReplicas == deployment.Spec.Replicas, nil
return ctrl.Status.ObservedGeneration >= desiredGeneration && ctrl.Status.Replicas == valOrZero(ctrl.Spec.Replicas), nil
}
}
@ -220,58 +153,3 @@ func PodNotPending(event watch.Event) (bool, error) {
}
return false, nil
}
// PodContainerRunning returns false until the named container has ContainerStatus running (at least once),
// and will return an error if the pod is deleted, runs to completion, or the container pod is not available.
func PodContainerRunning(containerName string) watch.ConditionFunc {
return func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(schema.GroupResource{Resource: "pods"}, "")
}
switch t := event.Object.(type) {
case *api.Pod:
switch t.Status.Phase {
case api.PodRunning, api.PodPending:
case api.PodFailed, api.PodSucceeded:
return false, ErrPodCompleted
default:
return false, nil
}
for _, s := range t.Status.ContainerStatuses {
if s.Name != containerName {
continue
}
if s.State.Terminated != nil {
return false, ErrContainerTerminated
}
return s.State.Running != nil, nil
}
for _, s := range t.Status.InitContainerStatuses {
if s.Name != containerName {
continue
}
if s.State.Terminated != nil {
return false, ErrContainerTerminated
}
return s.State.Running != nil, nil
}
return false, nil
}
return false, nil
}
}
// ServiceAccountHasSecrets returns true if the service account has at least one secret,
// false if it does not, or an error.
func ServiceAccountHasSecrets(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
return false, errors.NewNotFound(schema.GroupResource{Resource: "serviceaccounts"}, "")
}
switch t := event.Object.(type) {
case *api.ServiceAccount:
return len(t.Secrets) > 0, nil
}
return false, nil
}

View File

@ -23,7 +23,7 @@ import (
"strings"
"time"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -31,17 +31,27 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/util/integer"
"k8s.io/client-go/util/retry"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
apiv1 "k8s.io/kubernetes/pkg/apis/core/v1"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/kubectl/util"
)
func newInt32Ptr(val int) *int32 {
ret := int32(val)
return &ret
}
func valOrZero(val *int32) int32 {
if val == nil {
return int32(0)
}
return *val
}
const (
kubectlAnnotationPrefix = "kubectl.kubernetes.io/"
sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id"
@ -55,10 +65,10 @@ type RollingUpdaterConfig struct {
// Out is a writer for progress output.
Out io.Writer
// OldRC is an existing controller to be replaced.
OldRc *api.ReplicationController
OldRc *corev1.ReplicationController
// NewRc is a controller that will take ownership of updated pods (will be
// created if needed).
NewRc *api.ReplicationController
NewRc *corev1.ReplicationController
// UpdatePeriod is the time to wait between individual pod updates.
UpdatePeriod time.Duration
// Interval is the time to wait between polling controller status after
@ -96,7 +106,7 @@ type RollingUpdaterConfig struct {
// OnProgress is invoked if set during each scale cycle, to allow the caller to perform additional logic or
// abort the scale. If an error is returned the cleanup method will not be invoked. The percentage value
// is a synthetic "progress" calculation that represents the approximate percentage completion.
OnProgress func(oldRc, newRc *api.ReplicationController, percentage int) error
OnProgress func(oldRc, newRc *corev1.ReplicationController, percentage int) error
}
// RollingUpdaterCleanupPolicy is a cleanup action to take after the
@ -116,26 +126,26 @@ const (
// RollingUpdater provides methods for updating replicated pods in a predictable,
// fault-tolerant way.
type RollingUpdater struct {
rcClient coreclient.ReplicationControllersGetter
podClient coreclient.PodsGetter
rcClient corev1client.ReplicationControllersGetter
podClient corev1client.PodsGetter
scaleClient scaleclient.ScalesGetter
// Namespace for resources
ns string
// scaleAndWait scales a controller and returns its updated state.
scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error)
scaleAndWait func(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error)
//getOrCreateTargetController gets and validates an existing controller or
//makes a new one.
getOrCreateTargetController func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error)
getOrCreateTargetController func(controller *corev1.ReplicationController, sourceId string) (*corev1.ReplicationController, bool, error)
// cleanup performs post deployment cleanup tasks for newRc and oldRc.
cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error
cleanup func(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error
// getReadyPods returns the amount of old and new ready pods.
getReadyPods func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error)
getReadyPods func(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error)
// nowFn returns the current time used to calculate the minReadySeconds
nowFn func() metav1.Time
}
// NewRollingUpdater creates a RollingUpdater from a client.
func NewRollingUpdater(namespace string, rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter, sc scaleclient.ScalesGetter) *RollingUpdater {
func NewRollingUpdater(namespace string, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, sc scaleclient.ScalesGetter) *RollingUpdater {
updater := &RollingUpdater{
rcClient: rcClient,
podClient: podClient,
@ -203,8 +213,8 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
if err != nil {
return err
}
originReplicas := strconv.Itoa(int(existing.Spec.Replicas))
applyUpdate := func(rc *api.ReplicationController) {
originReplicas := strconv.Itoa(int(valOrZero(existing.Spec.Replicas)))
applyUpdate := func(rc *corev1.ReplicationController) {
if rc.Annotations == nil {
rc.Annotations = map[string]string{}
}
@ -231,15 +241,15 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
// the effective scale of the old RC regardless of the configuration
// (equivalent to 100% maxUnavailable).
if desired == 0 {
maxUnavailable = oldRc.Spec.Replicas
maxUnavailable = valOrZero(oldRc.Spec.Replicas)
minAvailable = 0
}
fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n",
newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, minAvailable, desired+maxSurge)
newRc.Name, valOrZero(newRc.Spec.Replicas), desired, oldRc.Name, valOrZero(oldRc.Spec.Replicas), minAvailable, desired+maxSurge)
// give a caller incremental notification and allow them to exit early
goal := desired - newRc.Spec.Replicas
goal := desired - valOrZero(newRc.Spec.Replicas)
if goal < 0 {
goal = -goal
}
@ -247,7 +257,7 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
if config.OnProgress == nil {
return nil
}
progress := desired - newRc.Spec.Replicas
progress := desired - valOrZero(newRc.Spec.Replicas)
if progress < 0 {
progress = -progress
}
@ -261,10 +271,10 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
// Scale newRc and oldRc until newRc has the desired number of replicas and
// oldRc has 0 replicas.
progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds()
for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 {
for valOrZero(newRc.Spec.Replicas) != desired || valOrZero(oldRc.Spec.Replicas) != 0 {
// Store the existing replica counts for progress timeout tracking.
newReplicas := newRc.Spec.Replicas
oldReplicas := oldRc.Spec.Replicas
newReplicas := valOrZero(newRc.Spec.Replicas)
oldReplicas := valOrZero(oldRc.Spec.Replicas)
// Scale up as much as possible.
scaledRc, err := r.scaleUp(newRc, oldRc, desired, maxSurge, maxUnavailable, scaleRetryParams, config)
@ -295,7 +305,7 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
// If we are making progress, continue to advance the progress deadline.
// Otherwise, time out with an error.
progressMade := (newRc.Spec.Replicas != newReplicas) || (oldRc.Spec.Replicas != oldReplicas)
progressMade := (valOrZero(newRc.Spec.Replicas) != newReplicas) || (valOrZero(oldRc.Spec.Replicas) != oldReplicas)
if progressMade {
progressDeadline = time.Now().UnixNano() + config.Timeout.Nanoseconds()
} else if time.Now().UnixNano() > progressDeadline {
@ -315,29 +325,30 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
// scaleUp scales up newRc to desired by whatever increment is possible given
// the configured surge threshold. scaleUp will safely no-op as necessary when
// it detects redundancy or other relevant conditions.
func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, desired, maxSurge, maxUnavailable int32, scaleRetryParams *RetryParams, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
func (r *RollingUpdater) scaleUp(newRc, oldRc *corev1.ReplicationController, desired, maxSurge, maxUnavailable int32, scaleRetryParams *RetryParams, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
// If we're already at the desired, do nothing.
if newRc.Spec.Replicas == desired {
if valOrZero(newRc.Spec.Replicas) == desired {
return newRc, nil
}
// Scale up as far as we can based on the surge limit.
increment := (desired + maxSurge) - (oldRc.Spec.Replicas + newRc.Spec.Replicas)
increment := (desired + maxSurge) - (valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas))
// If the old is already scaled down, go ahead and scale all the way up.
if oldRc.Spec.Replicas == 0 {
increment = desired - newRc.Spec.Replicas
if valOrZero(oldRc.Spec.Replicas) == 0 {
increment = desired - valOrZero(newRc.Spec.Replicas)
}
// We can't scale up without violating the surge limit, so do nothing.
if increment <= 0 {
return newRc, nil
}
// Increase the replica count, and deal with fenceposts.
newRc.Spec.Replicas += increment
if newRc.Spec.Replicas > desired {
newRc.Spec.Replicas = desired
nextVal := valOrZero(newRc.Spec.Replicas) + increment
newRc.Spec.Replicas = &nextVal
if valOrZero(newRc.Spec.Replicas) > desired {
newRc.Spec.Replicas = &desired
}
// Perform the scale-up.
fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas)
fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, valOrZero(newRc.Spec.Replicas))
scaledRc, err := r.scaleAndWait(newRc, scaleRetryParams, scaleRetryParams)
if err != nil {
return nil, err
@ -348,9 +359,9 @@ func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, desire
// scaleDown scales down oldRc to 0 at whatever decrement possible given the
// thresholds defined on the config. scaleDown will safely no-op as necessary
// when it detects redundancy or other relevant conditions.
func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int32, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
func (r *RollingUpdater) scaleDown(newRc, oldRc *corev1.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int32, config *RollingUpdaterConfig) (*corev1.ReplicationController, error) {
// Already scaled down; do nothing.
if oldRc.Spec.Replicas == 0 {
if valOrZero(oldRc.Spec.Replicas) == 0 {
return oldRc, nil
}
// Get ready pods. We shouldn't block, otherwise in case both old and new
@ -363,8 +374,8 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi
// The old controller is considered as part of the total because we want to
// maintain minimum availability even with a volatile old controller.
// Scale down as much as possible while maintaining minimum availability
allPods := oldRc.Spec.Replicas + newRc.Spec.Replicas
newUnavailable := newRc.Spec.Replicas - newAvailable
allPods := valOrZero(oldRc.Spec.Replicas) + valOrZero(newRc.Spec.Replicas)
newUnavailable := valOrZero(newRc.Spec.Replicas) - newAvailable
decrement := allPods - minAvailable - newUnavailable
// The decrement normally shouldn't drop below 0 because the available count
// always starts below the old replica count, but the old replica count can
@ -379,17 +390,18 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi
return oldRc, nil
}
// Reduce the replica count, and deal with fenceposts.
oldRc.Spec.Replicas -= decrement
if oldRc.Spec.Replicas < 0 {
oldRc.Spec.Replicas = 0
nextOldVal := valOrZero(oldRc.Spec.Replicas) - decrement
oldRc.Spec.Replicas = &nextOldVal
if valOrZero(oldRc.Spec.Replicas) < 0 {
oldRc.Spec.Replicas = newInt32Ptr(0)
}
// If the new is already fully scaled and available up to the desired size, go
// ahead and scale old all the way down.
if newRc.Spec.Replicas == desired && newAvailable == desired {
oldRc.Spec.Replicas = 0
if valOrZero(newRc.Spec.Replicas) == desired && newAvailable == desired {
oldRc.Spec.Replicas = newInt32Ptr(0)
}
// Perform the scale-down.
fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas)
fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, valOrZero(oldRc.Spec.Replicas))
retryWait := &RetryParams{config.Interval, config.Timeout}
scaledRc, err := r.scaleAndWait(oldRc, retryWait, retryWait)
if err != nil {
@ -399,9 +411,9 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi
}
// scalerScaleAndWait scales a controller using a Scaler and a real client.
func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
func (r *RollingUpdater) scaleAndWaitWithScaler(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error) {
scaler := NewScaler(r.scaleClient)
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil {
if err := scaler.Scale(rc.Namespace, rc.Name, uint(valOrZero(rc.Spec.Replicas)), &ScalePrecondition{-1, ""}, retry, wait, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil {
return nil, err
}
return r.rcClient.ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{})
@ -410,8 +422,8 @@ func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, r
// readyPods returns the old and new ready counts for their pods.
// If a pod is observed as being ready, it's considered ready even
// if it later becomes notReady.
func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) {
controllers := []*api.ReplicationController{oldRc, newRc}
func (r *RollingUpdater) readyPods(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error) {
controllers := []*corev1.ReplicationController{oldRc, newRc}
oldReady := int32(0)
newReady := int32(0)
if r.nowFn == nil {
@ -426,16 +438,12 @@ func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minR
if err != nil {
return 0, 0, err
}
for _, pod := range pods.Items {
v1Pod := &v1.Pod{}
if err := apiv1.Convert_core_Pod_To_v1_Pod(&pod, v1Pod, nil); err != nil {
return 0, 0, err
}
for _, v1Pod := range pods.Items {
// Do not count deleted pods as ready
if v1Pod.DeletionTimestamp != nil {
continue
}
if !podutil.IsPodAvailable(v1Pod, minReadySeconds, r.nowFn()) {
if !podutil.IsPodAvailable(&v1Pod, minReadySeconds, r.nowFn()) {
continue
}
switch controller.Name {
@ -457,7 +465,7 @@ func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minR
//
// Existing controllers are validated to ensure their sourceIdAnnotation
// matches sourceId; if there's a mismatch, an error is returned.
func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) {
func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *corev1.ReplicationController, sourceId string) (*corev1.ReplicationController, bool, error) {
existingRc, err := r.existingController(controller)
if err != nil {
if !errors.IsNotFound(err) {
@ -465,16 +473,16 @@ func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *api.R
// should create it.
return nil, false, err
}
if controller.Spec.Replicas <= 0 {
return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d\n", controller.Name, controller.Spec.Replicas)
if valOrZero(controller.Spec.Replicas) <= 0 {
return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d\n", controller.Name, valOrZero(controller.Spec.Replicas))
}
// The controller wasn't found, so create it.
if controller.Annotations == nil {
controller.Annotations = map[string]string{}
}
controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", controller.Spec.Replicas)
controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", valOrZero(controller.Spec.Replicas))
controller.Annotations[sourceIdAnnotation] = sourceId
controller.Spec.Replicas = 0
controller.Spec.Replicas = newInt32Ptr(0)
newRc, err := r.rcClient.ReplicationControllers(r.ns).Create(controller)
return newRc, false, err
}
@ -489,10 +497,10 @@ func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *api.R
}
// existingController verifies if the controller already exists
func (r *RollingUpdater) existingController(controller *api.ReplicationController) (*api.ReplicationController, error) {
func (r *RollingUpdater) existingController(controller *corev1.ReplicationController) (*corev1.ReplicationController, error) {
// without rc name but generate name, there's no existing rc
if len(controller.Name) == 0 && len(controller.GenerateName) > 0 {
return nil, errors.NewNotFound(api.Resource("replicationcontrollers"), controller.Name)
return nil, errors.NewNotFound(corev1.Resource("replicationcontrollers"), controller.Name)
}
// controller name is required to get rc back
return r.rcClient.ReplicationControllers(controller.Namespace).Get(controller.Name, metav1.GetOptions{})
@ -501,14 +509,14 @@ func (r *RollingUpdater) existingController(controller *api.ReplicationControlle
// cleanupWithClients performs cleanup tasks after the rolling update. Update
// process related annotations are removed from oldRc and newRc. The
// CleanupPolicy on config is executed.
func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error {
// Clean up annotations
var err error
newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(newRc.Name, metav1.GetOptions{})
if err != nil {
return err
}
applyUpdate := func(rc *api.ReplicationController) {
applyUpdate := func(rc *corev1.ReplicationController) {
delete(rc.Annotations, sourceIdAnnotation)
delete(rc.Annotations, desiredReplicasAnnotation)
}
@ -544,7 +552,7 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl
}
}
func Rename(c coreclient.ReplicationControllersGetter, rc *api.ReplicationController, newName string) error {
func Rename(c corev1client.ReplicationControllersGetter, rc *corev1.ReplicationController, newName string) error {
oldName := rc.Name
rc.Name = newName
rc.ResourceVersion = ""
@ -572,7 +580,7 @@ func Rename(c coreclient.ReplicationControllersGetter, rc *api.ReplicationContro
return err
}
func LoadExistingNextReplicationController(c coreclient.ReplicationControllersGetter, namespace, newName string) (*api.ReplicationController, error) {
func LoadExistingNextReplicationController(c corev1client.ReplicationControllersGetter, namespace, newName string) (*corev1.ReplicationController, error) {
if len(newName) == 0 {
return nil, nil
}
@ -589,10 +597,10 @@ type NewControllerConfig struct {
Image string
Container string
DeploymentKey string
PullPolicy api.PullPolicy
PullPolicy corev1.PullPolicy
}
func CreateNewControllerFromCurrentController(rcClient coreclient.ReplicationControllersGetter, codec runtime.Codec, cfg *NewControllerConfig) (*api.ReplicationController, error) {
func CreateNewControllerFromCurrentController(rcClient corev1client.ReplicationControllersGetter, codec runtime.Codec, cfg *NewControllerConfig) (*corev1.ReplicationController, error) {
containerIndex := 0
// load the old RC into the "new" RC
newRc, err := rcClient.ReplicationControllers(cfg.Namespace).Get(cfg.OldName, metav1.GetOptions{})
@ -669,19 +677,19 @@ func AbortRollingUpdate(c *RollingUpdaterConfig) error {
return nil
}
func GetNextControllerAnnotation(rc *api.ReplicationController) (string, bool) {
func GetNextControllerAnnotation(rc *corev1.ReplicationController) (string, bool) {
res, found := rc.Annotations[nextControllerAnnotation]
return res, found
}
func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
func SetNextControllerAnnotation(rc *corev1.ReplicationController, name string) {
if rc.Annotations == nil {
rc.Annotations = map[string]string{}
}
rc.Annotations[nextControllerAnnotation] = name
}
func UpdateExistingReplicationController(rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) {
func UpdateExistingReplicationController(rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, oldRc *corev1.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*corev1.ReplicationController, error) {
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
SetNextControllerAnnotation(oldRc, newName)
return AddDeploymentKeyToReplicationController(oldRc, rcClient, podClient, deploymentKey, deploymentValue, namespace, out)
@ -689,16 +697,16 @@ func UpdateExistingReplicationController(rcClient coreclient.ReplicationControll
// If we didn't need to update the controller for the deployment key, we still need to write
// the "next" controller.
applyUpdate := func(rc *api.ReplicationController) {
applyUpdate := func(rc *corev1.ReplicationController) {
SetNextControllerAnnotation(rc, newName)
}
return updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate)
}
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
func AddDeploymentKeyToReplicationController(oldRc *corev1.ReplicationController, rcClient corev1client.ReplicationControllersGetter, podClient corev1client.PodsGetter, deploymentKey, deploymentValue, namespace string, out io.Writer) (*corev1.ReplicationController, error) {
var err error
// First, update the template label. This ensures that any newly created pods will have the new label
applyUpdate := func(rc *api.ReplicationController) {
applyUpdate := func(rc *corev1.ReplicationController) {
if rc.Spec.Template.Labels == nil {
rc.Spec.Template.Labels = map[string]string{}
}
@ -718,7 +726,7 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, r
}
for ix := range podList.Items {
pod := &podList.Items[ix]
applyUpdate := func(p *api.Pod) {
applyUpdate := func(p *corev1.Pod) {
if p.Labels == nil {
p.Labels = map[string]string{
deploymentKey: deploymentValue,
@ -740,7 +748,7 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, r
for k, v := range oldRc.Spec.Selector {
selectorCopy[k] = v
}
applyUpdate = func(rc *api.ReplicationController) {
applyUpdate = func(rc *corev1.ReplicationController) {
rc.Spec.Selector[deploymentKey] = deploymentValue
}
// Update the selector of the rc so it manages all the pods we updated above
@ -768,13 +776,13 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, r
return oldRc, nil
}
type updateRcFunc func(controller *api.ReplicationController)
type updateRcFunc func(controller *corev1.ReplicationController)
// updateRcWithRetries retries updating the given rc on conflict with the following steps:
// 1. Get latest resource
// 2. applyUpdate
// 3. Update the resource
func updateRcWithRetries(rcClient coreclient.ReplicationControllersGetter, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) {
func updateRcWithRetries(rcClient corev1client.ReplicationControllersGetter, namespace string, rc *corev1.ReplicationController, applyUpdate updateRcFunc) (*corev1.ReplicationController, error) {
// Deep copy the rc in case we failed on Get during retry loop
oldRc := rc.DeepCopy()
err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
@ -799,13 +807,13 @@ func updateRcWithRetries(rcClient coreclient.ReplicationControllersGetter, names
return rc, err
}
type updatePodFunc func(controller *api.Pod)
type updatePodFunc func(controller *corev1.Pod)
// updatePodWithRetries retries updating the given pod on conflict with the following steps:
// 1. Get latest resource
// 2. applyUpdate
// 3. Update the resource
func updatePodWithRetries(podClient coreclient.PodsGetter, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
func updatePodWithRetries(podClient corev1client.PodsGetter, namespace string, pod *corev1.Pod, applyUpdate updatePodFunc) (*corev1.Pod, error) {
// Deep copy the pod in case we failed on Get during retry loop
oldPod := pod.DeepCopy()
err := retry.RetryOnConflict(retry.DefaultBackoff, func() (e error) {
@ -826,7 +834,7 @@ func updatePodWithRetries(podClient coreclient.PodsGetter, namespace string, pod
return pod, err
}
func FindSourceController(r coreclient.ReplicationControllersGetter, namespace, name string) (*api.ReplicationController, error) {
func FindSourceController(r corev1client.ReplicationControllersGetter, namespace, name string) (*corev1.ReplicationController, error) {
list, err := r.ReplicationControllers(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, err

View File

@ -26,27 +26,30 @@ import (
"testing"
"time"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
manualfake "k8s.io/client-go/rest/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/kubectl/scheme"
"k8s.io/kubernetes/pkg/kubectl/util"
)
func oldRc(replicas int, original int) *api.ReplicationController {
return &api.ReplicationController{
func oldRc(replicas int, original int) *corev1.ReplicationController {
t := replicas
replicasCopy := int32(t)
return &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo-v1",
@ -55,25 +58,25 @@ func oldRc(replicas int, original int) *api.ReplicationController {
originalReplicasAnnotation: fmt.Sprintf("%d", original),
},
},
Spec: api.ReplicationControllerSpec{
Replicas: int32(replicas),
Spec: corev1.ReplicationControllerSpec{
Replicas: &replicasCopy,
Selector: map[string]string{"version": "v1"},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-v1",
Labels: map[string]string{"version": "v1"},
},
},
},
Status: api.ReplicationControllerStatus{
Status: corev1.ReplicationControllerStatus{
Replicas: int32(replicas),
},
}
}
func newRc(replicas int, desired int) *api.ReplicationController {
func newRc(replicas int, desired int) *corev1.ReplicationController {
rc := oldRc(replicas, replicas)
rc.Spec.Template = &api.PodTemplateSpec{
rc.Spec.Template = &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-v2",
Labels: map[string]string{"version": "v2"},
@ -119,9 +122,9 @@ func TestUpdate(t *testing.T) {
tests := []struct {
name string
// oldRc is the "from" deployment
oldRc *api.ReplicationController
oldRc *corev1.ReplicationController
// newRc is the "to" deployment
newRc *api.ReplicationController
newRc *corev1.ReplicationController
// whether newRc existed (false means it was created)
newRcExists bool
maxUnavail intstr.IntOrString
@ -789,7 +792,7 @@ Scaling foo-v2 up to 2
t.Logf("running test %d (%s) (up: %v, down: %v, oldReady: %v, newReady: %v)", i, tt.name, upTo, downTo, oldReady, newReady)
updater := &RollingUpdater{
ns: "default",
scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
scaleAndWait: func(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error) {
// Return a scale up or scale down expectation depending on the rc,
// and throw errors if there is no expectation expressed for this
// call.
@ -804,23 +807,23 @@ Scaling foo-v2 up to 2
}
if expected == -1 {
t.Fatalf("unexpected scale of %s to %d", rc.Name, rc.Spec.Replicas)
} else if e, a := expected, int(rc.Spec.Replicas); e != a {
} else if e, a := expected, int(*rc.Spec.Replicas); e != a {
t.Fatalf("expected scale of %s to %d, got %d", rc.Name, e, a)
}
// Simulate the scale.
rc.Status.Replicas = rc.Spec.Replicas
rc.Status.Replicas = *rc.Spec.Replicas
return rc, nil
},
getOrCreateTargetController: func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) {
getOrCreateTargetController: func(controller *corev1.ReplicationController, sourceId string) (*corev1.ReplicationController, bool, error) {
// Simulate a create vs. update of an existing controller.
return tt.newRc, tt.newRcExists, nil
},
cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
cleanup: func(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error {
return nil
},
}
// Set up a mock readiness check which handles the test assertions.
updater.getReadyPods = func(oldRc, newRc *api.ReplicationController, minReadySecondsDeadline int32) (int32, int32, error) {
updater.getReadyPods = func(oldRc, newRc *corev1.ReplicationController, minReadySecondsDeadline int32) (int32, int32, error) {
// Return simulated readiness, and throw an error if this call has no
// expectations defined.
oldReady := next(&oldReady)
@ -860,18 +863,18 @@ func TestUpdate_progressTimeout(t *testing.T) {
newRc := newRc(0, 2)
updater := &RollingUpdater{
ns: "default",
scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
scaleAndWait: func(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error) {
// Do nothing.
return rc, nil
},
getOrCreateTargetController: func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) {
getOrCreateTargetController: func(controller *corev1.ReplicationController, sourceId string) (*corev1.ReplicationController, bool, error) {
return newRc, false, nil
},
cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
cleanup: func(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error {
return nil
},
}
updater.getReadyPods = func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) {
updater.getReadyPods = func(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error) {
// Coerce a timeout by pods never becoming ready.
return 0, 0, nil
}
@ -905,16 +908,16 @@ func TestUpdate_assignOriginalAnnotation(t *testing.T) {
rcClient: fake.Core(),
podClient: fake.Core(),
ns: "default",
scaleAndWait: func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
scaleAndWait: func(rc *corev1.ReplicationController, retry *RetryParams, wait *RetryParams) (*corev1.ReplicationController, error) {
return rc, nil
},
getOrCreateTargetController: func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) {
getOrCreateTargetController: func(controller *corev1.ReplicationController, sourceId string) (*corev1.ReplicationController, bool, error) {
return newRc, false, nil
},
cleanup: func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
cleanup: func(oldRc, newRc *corev1.ReplicationController, config *RollingUpdaterConfig) error {
return nil
},
getReadyPods: func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) {
getReadyPods: func(oldRc, newRc *corev1.ReplicationController, minReadySeconds int32) (int32, int32, error) {
return 1, 1, nil
},
}
@ -934,10 +937,10 @@ func TestUpdate_assignOriginalAnnotation(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
updateAction := fake.Actions()[1].(testcore.UpdateAction)
if updateAction.GetResource().GroupResource() != api.Resource("replicationcontrollers") {
if updateAction.GetResource().GroupResource() != corev1.Resource("replicationcontrollers") {
t.Fatalf("expected rc to be updated: %#v", updateAction)
}
if e, a := "1", updateAction.GetObject().(*api.ReplicationController).Annotations[originalReplicasAnnotation]; e != a {
if e, a := "1", updateAction.GetObject().(*corev1.ReplicationController).Annotations[originalReplicasAnnotation]; e != a {
t.Fatalf("expected annotation value %s, got %s", e, a)
}
}
@ -945,31 +948,31 @@ func TestUpdate_assignOriginalAnnotation(t *testing.T) {
func TestRollingUpdater_multipleContainersInPod(t *testing.T) {
tests := []struct {
name string
oldRc *api.ReplicationController
newRc *api.ReplicationController
oldRc *corev1.ReplicationController
newRc *corev1.ReplicationController
container string
image string
deploymentKey string
}{
{
name: "test1",
oldRc: &api.ReplicationController{
oldRc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Selector: map[string]string{
"dk": "old",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"dk": "old",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container1",
Image: "image1",
@ -983,23 +986,23 @@ func TestRollingUpdater_multipleContainersInPod(t *testing.T) {
},
},
},
newRc: &api.ReplicationController{
newRc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Selector: map[string]string{
"dk": "old",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"dk": "old",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container1",
Image: "newimage",
@ -1019,23 +1022,23 @@ func TestRollingUpdater_multipleContainersInPod(t *testing.T) {
},
{
name: "test2",
oldRc: &api.ReplicationController{
oldRc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "bar",
},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Selector: map[string]string{
"dk": "old",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"dk": "old",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container1",
Image: "image1",
@ -1045,23 +1048,23 @@ func TestRollingUpdater_multipleContainersInPod(t *testing.T) {
},
},
},
newRc: &api.ReplicationController{
newRc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "bar",
},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Selector: map[string]string{
"dk": "old",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"dk": "old",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "container1",
Image: "newimage",
@ -1081,7 +1084,7 @@ func TestRollingUpdater_multipleContainersInPod(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
fake := fake.NewSimpleClientset(tt.oldRc)
codec := testapi.Default.Codec()
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
deploymentHash, err := util.HashObject(tt.newRc, codec)
if err != nil {
@ -1238,7 +1241,7 @@ func TestRollingUpdater_cleanupWithClients_Rename(t *testing.T) {
}
func TestFindSourceController(t *testing.T) {
ctrl1 := api.ReplicationController{
ctrl1 := corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
@ -1247,7 +1250,7 @@ func TestFindSourceController(t *testing.T) {
},
},
}
ctrl2 := api.ReplicationController{
ctrl2 := corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "bar",
@ -1256,7 +1259,7 @@ func TestFindSourceController(t *testing.T) {
},
},
}
ctrl3 := api.ReplicationController{
ctrl3 := corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "baz",
@ -1266,46 +1269,46 @@ func TestFindSourceController(t *testing.T) {
},
}
tests := []struct {
list *api.ReplicationControllerList
expectedController *api.ReplicationController
list *corev1.ReplicationControllerList
expectedController *corev1.ReplicationController
name string
expectError bool
}{
{
list: &api.ReplicationControllerList{},
list: &corev1.ReplicationControllerList{},
expectError: true,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1},
list: &corev1.ReplicationControllerList{
Items: []corev1.ReplicationController{ctrl1},
},
name: "foo",
expectError: true,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1},
list: &corev1.ReplicationControllerList{
Items: []corev1.ReplicationController{ctrl1},
},
name: "bar",
expectedController: &ctrl1,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1, ctrl2},
list: &corev1.ReplicationControllerList{
Items: []corev1.ReplicationController{ctrl1, ctrl2},
},
name: "bar",
expectedController: &ctrl1,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1, ctrl2},
list: &corev1.ReplicationControllerList{
Items: []corev1.ReplicationController{ctrl1, ctrl2},
},
name: "foo",
expectedController: &ctrl2,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1, ctrl2, ctrl3},
list: &corev1.ReplicationControllerList{
Items: []corev1.ReplicationController{ctrl1, ctrl2, ctrl3},
},
name: "baz",
expectedController: &ctrl3,
@ -1330,29 +1333,29 @@ func TestFindSourceController(t *testing.T) {
func TestUpdateExistingReplicationController(t *testing.T) {
tests := []struct {
rc *api.ReplicationController
rc *corev1.ReplicationController
name string
deploymentKey string
deploymentValue string
expectedRc *api.ReplicationController
expectedRc *corev1.ReplicationController
expectErr bool
}{
{
rc: &api.ReplicationController{
rc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Spec: api.ReplicationControllerSpec{
Template: &api.PodTemplateSpec{},
Spec: corev1.ReplicationControllerSpec{
Template: &corev1.PodTemplateSpec{},
},
},
name: "foo",
deploymentKey: "dk",
deploymentValue: "some-hash",
expectedRc: &api.ReplicationController{
expectedRc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
@ -1360,11 +1363,11 @@ func TestUpdateExistingReplicationController(t *testing.T) {
"kubectl.kubernetes.io/next-controller-id": "foo",
},
},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Selector: map[string]string{
"dk": "some-hash",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"dk": "some-hash",
@ -1375,13 +1378,13 @@ func TestUpdateExistingReplicationController(t *testing.T) {
},
},
{
rc: &api.ReplicationController{
rc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Spec: api.ReplicationControllerSpec{
Template: &api.PodTemplateSpec{
Spec: corev1.ReplicationControllerSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"dk": "some-other-hash",
@ -1397,7 +1400,7 @@ func TestUpdateExistingReplicationController(t *testing.T) {
deploymentKey: "dk",
deploymentValue: "some-hash",
expectedRc: &api.ReplicationController{
expectedRc: &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
@ -1405,11 +1408,11 @@ func TestUpdateExistingReplicationController(t *testing.T) {
"kubectl.kubernetes.io/next-controller-id": "foo",
},
},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Selector: map[string]string{
"dk": "some-other-hash",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"dk": "some-other-hash",
@ -1439,27 +1442,34 @@ func TestUpdateExistingReplicationController(t *testing.T) {
}
func TestUpdateRcWithRetries(t *testing.T) {
codec := testapi.Default.Codec()
rc := &api.ReplicationController{
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
one := int32(1)
rc := &corev1.ReplicationController{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ReplicationController",
},
ObjectMeta: metav1.ObjectMeta{Name: "rc",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Replicas: &one,
Selector: map[string]string{
"foo": "bar",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: apitesting.V1DeepEqualSafePodSpec(),
},
},
}
rc.Spec.Template.Spec.SchedulerName = "default-scheduler"
// Test end to end updating of the rc with retries. Essentially make sure the update handler
// sees the right updates, failures in update/get are handled properly, and that the updated
@ -1471,17 +1481,17 @@ func TestUpdateRcWithRetries(t *testing.T) {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
updates := []*http.Response{
{StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict
{StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict
{StatusCode: 409, Header: header, Body: objBody(codec, &corev1.ReplicationController{})}, // conflict
{StatusCode: 409, Header: header, Body: objBody(codec, &corev1.ReplicationController{})}, // conflict
{StatusCode: 200, Header: header, Body: objBody(codec, &newRc)},
}
gets := []*http.Response{
{StatusCode: 500, Header: header, Body: objBody(codec, &api.ReplicationController{})},
{StatusCode: 500, Header: header, Body: objBody(codec, &corev1.ReplicationController{})},
{StatusCode: 200, Header: header, Body: objBody(codec, rc)},
}
fakeClient := &manualfake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
NegotiatedSerializer: legacyscheme.Codecs,
NegotiatedSerializer: scheme.Codecs,
Client: manualfake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == testapi.Default.ResourcePath("replicationcontrollers", "default", "rc") && m == "PUT":
@ -1489,8 +1499,9 @@ func TestUpdateRcWithRetries(t *testing.T) {
updates = updates[1:]
// We should always get an update with a valid rc even when the get fails. The rc should always
// contain the update.
if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !apiequality.Semantic.DeepEqual(rc, c) {
if c, ok := readOrDie(t, req, codec).(*corev1.ReplicationController); !ok || !apiequality.Semantic.DeepEqual(rc, c) {
t.Errorf("Unexpected update body, got %+v expected %+v", c, rc)
t.Error(diff.ObjectDiff(rc, c))
} else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" {
t.Errorf("Expected selector label update, got %+v", c.Spec.Selector)
} else {
@ -1507,13 +1518,13 @@ func TestUpdateRcWithRetries(t *testing.T) {
}
}),
}
clientConfig := &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs, GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}
clientConfig := &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: scheme.Codecs, GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}
restClient, _ := restclient.RESTClientFor(clientConfig)
restClient.Client = fakeClient.Client
clientset := internalclientset.New(restClient)
clientset := kubernetes.New(restClient)
if rc, err := updateRcWithRetries(
clientset.Core(), "default", rc, func(c *api.ReplicationController) {
clientset.CoreV1(), "default", rc, func(c *corev1.ReplicationController) {
c.Spec.Selector["baz"] = "foobar"
}); err != nil {
t.Errorf("unexpected error: %v", err)
@ -1531,8 +1542,10 @@ func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Obj
t.Errorf("Error reading: %v", err)
t.FailNow()
}
obj, err := runtime.Decode(codec, data)
codec2 := scheme.Codecs.UniversalDecoder(scheme.Scheme.PrioritizedVersionsAllGroups()...)
obj, err := runtime.Decode(codec2, data)
if err != nil {
t.Log(string(data))
t.Errorf("error decoding: %v", err)
t.FailNow()
}
@ -1545,14 +1558,14 @@ func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
func TestAddDeploymentHash(t *testing.T) {
buf := &bytes.Buffer{}
codec := testapi.Default.Codec()
rc := &api.ReplicationController{
codec := scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...)
rc := &corev1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{Name: "rc"},
Spec: api.ReplicationControllerSpec{
Spec: corev1.ReplicationControllerSpec{
Selector: map[string]string{
"foo": "bar",
},
Template: &api.PodTemplateSpec{
Template: &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
@ -1562,8 +1575,8 @@ func TestAddDeploymentHash(t *testing.T) {
},
}
podList := &api.PodList{
Items: []api.Pod{
podList := &corev1.PodList{
Items: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz"}},
@ -1574,7 +1587,7 @@ func TestAddDeploymentHash(t *testing.T) {
updatedRc := false
fakeClient := &manualfake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
NegotiatedSerializer: legacyscheme.Codecs,
NegotiatedSerializer: scheme.Codecs,
Client: manualfake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
@ -1587,17 +1600,17 @@ func TestAddDeploymentHash(t *testing.T) {
case p == testapi.Default.ResourcePath("pods", "default", "foo") && m == "PUT":
seen.Insert("foo")
obj := readOrDie(t, req, codec)
podList.Items[0] = *(obj.(*api.Pod))
podList.Items[0] = *(obj.(*corev1.Pod))
return &http.Response{StatusCode: 200, Header: header, Body: objBody(codec, &podList.Items[0])}, nil
case p == testapi.Default.ResourcePath("pods", "default", "bar") && m == "PUT":
seen.Insert("bar")
obj := readOrDie(t, req, codec)
podList.Items[1] = *(obj.(*api.Pod))
podList.Items[1] = *(obj.(*corev1.Pod))
return &http.Response{StatusCode: 200, Header: header, Body: objBody(codec, &podList.Items[1])}, nil
case p == testapi.Default.ResourcePath("pods", "default", "baz") && m == "PUT":
seen.Insert("baz")
obj := readOrDie(t, req, codec)
podList.Items[2] = *(obj.(*api.Pod))
podList.Items[2] = *(obj.(*corev1.Pod))
return &http.Response{StatusCode: 200, Header: header, Body: objBody(codec, &podList.Items[2])}, nil
case p == testapi.Default.ResourcePath("replicationcontrollers", "default", "rc") && m == "PUT":
updatedRc = true
@ -1608,12 +1621,12 @@ func TestAddDeploymentHash(t *testing.T) {
}
}),
}
clientConfig := &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs, GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}
clientConfig := &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: scheme.Codecs, GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}
restClient, _ := restclient.RESTClientFor(clientConfig)
restClient.Client = fakeClient.Client
clientset := internalclientset.New(restClient)
clientset := kubernetes.New(restClient)
if _, err := AddDeploymentKeyToReplicationController(rc, clientset.Core(), clientset.Core(), "dk", "hash", metav1.NamespaceDefault, buf); err != nil {
if _, err := AddDeploymentKeyToReplicationController(rc, clientset.CoreV1(), clientset.CoreV1(), "dk", "hash", metav1.NamespaceDefault, buf); err != nil {
t.Errorf("unexpected error: %v", err)
}
for _, pod := range podList.Items {
@ -1629,26 +1642,26 @@ func TestAddDeploymentHash(t *testing.T) {
func TestRollingUpdater_readyPods(t *testing.T) {
count := 0
now := metav1.Date(2016, time.April, 1, 1, 0, 0, 0, time.UTC)
mkpod := func(owner *api.ReplicationController, ready bool, readyTime metav1.Time) *api.Pod {
mkpod := func(owner *corev1.ReplicationController, ready bool, readyTime metav1.Time) *corev1.Pod {
count = count + 1
labels := map[string]string{}
for k, v := range owner.Spec.Selector {
labels[k] = v
}
status := api.ConditionTrue
status := corev1.ConditionTrue
if !ready {
status = api.ConditionFalse
status = corev1.ConditionFalse
}
return &api.Pod{
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: fmt.Sprintf("pod-%d", count),
Labels: labels,
},
Status: api.PodStatus{
Conditions: []api.PodCondition{
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{
Type: api.PodReady,
Type: corev1.PodReady,
Status: status,
LastTransitionTime: readyTime,
},
@ -1659,8 +1672,8 @@ func TestRollingUpdater_readyPods(t *testing.T) {
tests := []struct {
name string
oldRc *api.ReplicationController
newRc *api.ReplicationController
oldRc *corev1.ReplicationController
newRc *corev1.ReplicationController
// expectated old/new ready counts
oldReady int32
newReady int32