From f756e43e7fa6abfb0cebd68d1c8e7fae21819a59 Mon Sep 17 00:00:00 2001 From: deads2k Date: Thu, 8 Sep 2016 10:24:02 -0400 Subject: [PATCH] convert rolling updater to generated client --- pkg/client/unversioned/conditions.go | 25 ++++--- pkg/kubectl/cmd/rollingupdate.go | 26 +++---- pkg/kubectl/cmd/util/clientcache.go | 37 ++++++++++ pkg/kubectl/cmd/util/factory.go | 15 ++-- pkg/kubectl/rolling_updater.go | 85 +++++++++++----------- pkg/kubectl/scale.go | 19 +++-- pkg/kubectl/stop.go | 104 +++++++++++++-------------- 7 files changed, 174 insertions(+), 137 deletions(-) diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index cd913de61a..973fb92a8e 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -25,20 +25,24 @@ import ( "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" + appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/unversioned" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) // ControllerHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a controller's ReplicaSelector equals the Replicas count. -func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc { +func ControllerHasDesiredReplicas(rcClient coreclient.ReplicationControllersGetter, controller *api.ReplicationController) wait.ConditionFunc { // If we're given a controller where the status lags the spec, it either means that the controller is stale, // or that the rc manager hasn't noticed the update yet. Polling status.Replicas is not safe in the latter case. desiredGeneration := controller.Generation return func() (bool, error) { - ctrl, err := c.ReplicationControllers(controller.Namespace).Get(controller.Name) + ctrl, err := rcClient.ReplicationControllers(controller.Namespace).Get(controller.Name) if err != nil { return false, err } @@ -52,7 +56,7 @@ func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationContro // ReplicaSetHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a ReplicaSet's ReplicaSelector equals the Replicas count. -func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions.ReplicaSet) wait.ConditionFunc { +func ReplicaSetHasDesiredReplicas(rsClient extensionsclient.ReplicaSetsGetter, replicaSet *extensions.ReplicaSet) wait.ConditionFunc { // If we're given a ReplicaSet where the status lags the spec, it either means that the // ReplicaSet is stale, or that the ReplicaSet manager hasn't noticed the update yet. @@ -60,7 +64,7 @@ func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions. desiredGeneration := replicaSet.Generation return func() (bool, error) { - rs, err := c.ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name) + rs, err := rsClient.ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name) if err != nil { return false, err } @@ -73,10 +77,10 @@ func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions. } } -func PetSetHasDesiredPets(c AppsInterface, petset *apps.PetSet) wait.ConditionFunc { +func PetSetHasDesiredPets(psClient appsclient.PetSetsGetter, petset *apps.PetSet) wait.ConditionFunc { // TODO: Differentiate between 0 pets and a really quick scale down using generation. return func() (bool, error) { - ps, err := c.PetSets(petset.Namespace).Get(petset.Name) + ps, err := psClient.PetSets(petset.Namespace).Get(petset.Name) if err != nil { return false, err } @@ -86,10 +90,9 @@ func PetSetHasDesiredPets(c AppsInterface, petset *apps.PetSet) wait.ConditionFu // JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count // for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. -func JobHasDesiredParallelism(c BatchInterface, job *batch.Job) wait.ConditionFunc { - +func JobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc { return func() (bool, error) { - job, err := c.Jobs(job.Namespace).Get(job.Name) + job, err := jobClient.Jobs(job.Namespace).Get(job.Name) if err != nil { return false, err } @@ -112,7 +115,7 @@ func JobHasDesiredParallelism(c BatchInterface, job *batch.Job) wait.ConditionFu // DeploymentHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a deployment equals its updated replicas count. // (non-terminated pods that have the desired template spec). -func DeploymentHasDesiredReplicas(c ExtensionsInterface, deployment *extensions.Deployment) wait.ConditionFunc { +func DeploymentHasDesiredReplicas(dClient extensionsclient.DeploymentsGetter, deployment *extensions.Deployment) wait.ConditionFunc { // If we're given a deployment where the status lags the spec, it either // means that the deployment is stale, or that the deployment manager hasn't // noticed the update yet. Polling status.Replicas is not safe in the latter @@ -120,7 +123,7 @@ func DeploymentHasDesiredReplicas(c ExtensionsInterface, deployment *extensions. desiredGeneration := deployment.Generation return func() (bool, error) { - deployment, err := c.Deployments(deployment.Namespace).Get(deployment.Name) + deployment, err := dClient.Deployments(deployment.Namespace).Get(deployment.Name) if err != nil { return false, err } diff --git a/pkg/kubectl/cmd/rollingupdate.go b/pkg/kubectl/cmd/rollingupdate.go index 2f336b7b41..bd4d618d65 100644 --- a/pkg/kubectl/cmd/rollingupdate.go +++ b/pkg/kubectl/cmd/rollingupdate.go @@ -27,6 +27,7 @@ import ( "github.com/renstrom/dedent" "github.com/spf13/cobra" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" @@ -177,24 +178,25 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg return err } - client, err := f.Client() + clientset, err := f.ClientSet() if err != nil { return err } + coreClient := clientset.Core() var newRc *api.ReplicationController // fetch rc - oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName) + oldRc, err := coreClient.ReplicationControllers(cmdNamespace).Get(oldName) if err != nil { if !errors.IsNotFound(err) || len(image) == 0 || len(args) > 1 { return err } // We're in the middle of a rename, look for an RC with a source annotation of oldName - newRc, err := kubectl.FindSourceController(client, cmdNamespace, oldName) + newRc, err := kubectl.FindSourceController(coreClient, cmdNamespace, oldName) if err != nil { return err } - return kubectl.Rename(client, newRc, oldName) + return kubectl.Rename(coreClient, newRc, oldName) } var keepOldName bool @@ -248,10 +250,10 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg // than the old rc. This selector is the hash of the rc, with a suffix to provide uniqueness for // same-image updates. if len(image) != 0 { - codec := api.Codecs.LegacyCodec(client.APIVersion()) + codec := api.Codecs.LegacyCodec(clientset.CoreClient.APIVersion()) keepOldName = len(args) == 1 newName := findNewName(args, oldRc) - if newRc, err = kubectl.LoadExistingNextReplicationController(client, cmdNamespace, newName); err != nil { + if newRc, err = kubectl.LoadExistingNextReplicationController(coreClient, cmdNamespace, newName); err != nil { return err } if newRc != nil { @@ -274,7 +276,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg } config.PullPolicy = api.PullPolicy(pullPolicy) } - newRc, err = kubectl.CreateNewControllerFromCurrentController(client, codec, config) + newRc, err = kubectl.CreateNewControllerFromCurrentController(coreClient, codec, config) if err != nil { return err } @@ -287,7 +289,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg } // If new image is same as old, the hash may not be distinct, so add a suffix. oldHash += "-orig" - oldRc, err = kubectl.UpdateExistingReplicationController(client, oldRc, cmdNamespace, newRc.Name, deploymentKey, oldHash, out) + oldRc, err = kubectl.UpdateExistingReplicationController(coreClient, coreClient, oldRc, cmdNamespace, newRc.Name, deploymentKey, oldHash, out) if err != nil { return err } @@ -296,7 +298,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg if rollback { keepOldName = len(args) == 1 newName := findNewName(args, oldRc) - if newRc, err = kubectl.LoadExistingNextReplicationController(client, cmdNamespace, newName); err != nil { + if newRc, err = kubectl.LoadExistingNextReplicationController(coreClient, cmdNamespace, newName); err != nil { return err } @@ -310,7 +312,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg filename, oldName) } - updater := kubectl.NewRollingUpdater(newRc.Namespace, client) + updater := kubectl.NewRollingUpdater(newRc.Namespace, coreClient, coreClient) // To successfully pull off a rolling update the new and old rc have to differ // by at least one selector. Every new pod should have the selector and every @@ -367,7 +369,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg if err != nil { return err } - client.ReplicationControllers(config.NewRc.Namespace).Update(config.NewRc) + coreClient.ReplicationControllers(config.NewRc.Namespace).Update(config.NewRc) } err = updater.Update(config) if err != nil { @@ -380,7 +382,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg } else { message = fmt.Sprintf("rolling updated to %q", newRc.Name) } - newRc, err = client.ReplicationControllers(cmdNamespace).Get(newRc.Name) + newRc, err = coreClient.ReplicationControllers(cmdNamespace).Get(newRc.Name) if err != nil { return err } diff --git a/pkg/kubectl/cmd/util/clientcache.go b/pkg/kubectl/cmd/util/clientcache.go index 81476cabaf..923aa52b21 100644 --- a/pkg/kubectl/cmd/util/clientcache.go +++ b/pkg/kubectl/cmd/util/clientcache.go @@ -20,6 +20,7 @@ import ( fed_clientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" @@ -28,6 +29,7 @@ import ( func NewClientCache(loader clientcmd.ClientConfig) *ClientCache { return &ClientCache{ clients: make(map[unversioned.GroupVersion]*client.Client), + clientsets: make(map[unversioned.GroupVersion]*internalclientset.Clientset), configs: make(map[unversioned.GroupVersion]*restclient.Config), fedClientSets: make(map[unversioned.GroupVersion]fed_clientset.Interface), loader: loader, @@ -39,6 +41,7 @@ func NewClientCache(loader clientcmd.ClientConfig) *ClientCache { type ClientCache struct { loader clientcmd.ClientConfig clients map[unversioned.GroupVersion]*client.Client + clientsets map[unversioned.GroupVersion]*internalclientset.Clientset fedClientSets map[unversioned.GroupVersion]fed_clientset.Interface configs map[unversioned.GroupVersion]*restclient.Config defaultConfig *restclient.Config @@ -95,6 +98,40 @@ func (c *ClientCache) ClientConfigForVersion(version *unversioned.GroupVersion) return &config, nil } +// ClientSetForVersion initializes or reuses a clientset for the specified version, or returns an +// error if that is not possible +func (c *ClientCache) ClientSetForVersion(version *unversioned.GroupVersion) (*internalclientset.Clientset, error) { + if version != nil { + if clientset, ok := c.clientsets[*version]; ok { + return clientset, nil + } + } + config, err := c.ClientConfigForVersion(version) + if err != nil { + return nil, err + } + + clientset, err := internalclientset.NewForConfig(config) + if err != nil { + return nil, err + } + c.clientsets[*config.GroupVersion] = clientset + + // `version` does not necessarily equal `config.Version`. However, we know that if we call this method again with + // `version`, we should get a client based on the same config we just found. There's no guarantee that a client + // is copiable, so create a new client and save it in the cache. + if version != nil { + configCopy := *config + clientset, err := internalclientset.NewForConfig(&configCopy) + if err != nil { + return nil, err + } + c.clientsets[*version] = clientset + } + + return clientset, nil +} + // ClientForVersion initializes or reuses a client for the specified version, or returns an // error if that is not possible func (c *ClientCache) ClientForVersion(version *unversioned.GroupVersion) (*client.Client, error) { diff --git a/pkg/kubectl/cmd/util/factory.go b/pkg/kubectl/cmd/util/factory.go index dc2d1bf1d1..fb7a6f5526 100644 --- a/pkg/kubectl/cmd/util/factory.go +++ b/pkg/kubectl/cmd/util/factory.go @@ -431,12 +431,7 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { return restclient.RESTClientFor(clientConfig) }, ClientSet: func() (*internalclientset.Clientset, error) { - cfg, err := clients.ClientConfigForVersion(nil) - if err != nil { - return nil, err - } - - return internalclientset.NewForConfig(cfg) + return clients.ClientSetForVersion(nil) }, ClientConfig: func() (*restclient.Config, error) { return clients.ClientConfigForVersion(nil) @@ -706,19 +701,19 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { }, Scaler: func(mapping *meta.RESTMapping) (kubectl.Scaler, error) { mappingVersion := mapping.GroupVersionKind.GroupVersion() - client, err := clients.ClientForVersion(&mappingVersion) + clientset, err := clients.ClientSetForVersion(&mappingVersion) if err != nil { return nil, err } - return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), client) + return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset) }, Reaper: func(mapping *meta.RESTMapping) (kubectl.Reaper, error) { mappingVersion := mapping.GroupVersionKind.GroupVersion() - client, err := clients.ClientForVersion(&mappingVersion) + clientset, err := clients.ClientSetForVersion(&mappingVersion) if err != nil { return nil, err } - return kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), client) + return kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), clientset) }, HistoryViewer: func(mapping *meta.RESTMapping) (kubectl.HistoryViewer, error) { mappingVersion := mapping.GroupVersionKind.GroupVersion() diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index 4468530891..0ab9046197 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/labels" @@ -109,8 +110,8 @@ const ( // RollingUpdater provides methods for updating replicated pods in a predictable, // fault-tolerant way. type RollingUpdater struct { - // Client interface for creating and updating controllers - c client.Interface + rcClient coreclient.ReplicationControllersGetter + podClient coreclient.PodsGetter // Namespace for resources ns string // scaleAndWait scales a controller and returns its updated state. @@ -127,10 +128,11 @@ type RollingUpdater struct { } // NewRollingUpdater creates a RollingUpdater from a client. -func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdater { +func NewRollingUpdater(namespace string, rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter) *RollingUpdater { updater := &RollingUpdater{ - c: client, - ns: namespace, + rcClient: rcClient, + podClient: podClient, + ns: namespace, } // Inject real implementations. updater.scaleAndWait = updater.scaleAndWaitWithScaler @@ -189,7 +191,7 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { // annotation if it doesn't yet exist. _, hasOriginalAnnotation := oldRc.Annotations[originalReplicasAnnotation] if !hasOriginalAnnotation { - existing, err := r.c.ReplicationControllers(oldRc.Namespace).Get(oldRc.Name) + existing, err := r.rcClient.ReplicationControllers(oldRc.Namespace).Get(oldRc.Name) if err != nil { return err } @@ -200,7 +202,7 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { } rc.Annotations[originalReplicasAnnotation] = originReplicas } - if oldRc, err = updateRcWithRetries(r.c, existing.Namespace, existing, applyUpdate); err != nil { + if oldRc, err = updateRcWithRetries(r.rcClient, existing.Namespace, existing, applyUpdate); err != nil { return err } } @@ -390,14 +392,11 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi // scalerScaleAndWait scales a controller using a Scaler and a real client. func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { - scaler, err := ScalerFor(api.Kind("ReplicationController"), r.c) - if err != nil { - return nil, fmt.Errorf("Couldn't make scaler: %s", err) - } + scaler := &ReplicationControllerScaler{r.rcClient} if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil { return nil, err } - return r.c.ReplicationControllers(rc.Namespace).Get(rc.Name) + return r.rcClient.ReplicationControllers(rc.Namespace).Get(rc.Name) } // readyPods returns the old and new ready counts for their pods. @@ -415,7 +414,7 @@ func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minR controller := controllers[i] selector := labels.Set(controller.Spec.Selector).AsSelector() options := api.ListOptions{LabelSelector: selector} - pods, err := r.c.Pods(controller.Namespace).List(options) + pods, err := r.podClient.Pods(controller.Namespace).List(options) if err != nil { return 0, 0, err } @@ -460,7 +459,7 @@ func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *api.R controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", controller.Spec.Replicas) controller.Annotations[sourceIdAnnotation] = sourceId controller.Spec.Replicas = 0 - newRc, err := r.c.ReplicationControllers(r.ns).Create(controller) + newRc, err := r.rcClient.ReplicationControllers(r.ns).Create(controller) return newRc, false, err } // Validate and use the existing controller. @@ -480,7 +479,7 @@ func (r *RollingUpdater) existingController(controller *api.ReplicationControlle return nil, errors.NewNotFound(api.Resource("replicationcontrollers"), controller.Name) } // controller name is required to get rc back - return r.c.ReplicationControllers(controller.Namespace).Get(controller.Name) + return r.rcClient.ReplicationControllers(controller.Namespace).Get(controller.Name) } // cleanupWithClients performs cleanup tasks after the rolling update. Update @@ -489,7 +488,7 @@ func (r *RollingUpdater) existingController(controller *api.ReplicationControlle func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error { // Clean up annotations var err error - newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name) + newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(newRc.Name) if err != nil { return err } @@ -497,14 +496,14 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl delete(rc.Annotations, sourceIdAnnotation) delete(rc.Annotations, desiredReplicasAnnotation) } - if newRc, err = updateRcWithRetries(r.c, r.ns, newRc, applyUpdate); err != nil { + if newRc, err = updateRcWithRetries(r.rcClient, r.ns, newRc, applyUpdate); err != nil { return err } - if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil { + if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.rcClient, newRc)); err != nil { return err } - newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name) + newRc, err = r.rcClient.ReplicationControllers(r.ns).Get(newRc.Name) if err != nil { return err } @@ -513,15 +512,15 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl case DeleteRollingUpdateCleanupPolicy: // delete old rc fmt.Fprintf(config.Out, "Update succeeded. Deleting %s\n", oldRc.Name) - return r.c.ReplicationControllers(r.ns).Delete(oldRc.Name, nil) + return r.rcClient.ReplicationControllers(r.ns).Delete(oldRc.Name, nil) case RenameRollingUpdateCleanupPolicy: // delete old rc fmt.Fprintf(config.Out, "Update succeeded. Deleting old controller: %s\n", oldRc.Name) - if err := r.c.ReplicationControllers(r.ns).Delete(oldRc.Name, nil); err != nil { + if err := r.rcClient.ReplicationControllers(r.ns).Delete(oldRc.Name, nil); err != nil { return err } fmt.Fprintf(config.Out, "Renaming %s to %s\n", oldRc.Name, newRc.Name) - return Rename(r.c, newRc, oldRc.Name) + return Rename(r.rcClient, newRc, oldRc.Name) case PreserveRollingUpdateCleanupPolicy: return nil default: @@ -529,7 +528,7 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl } } -func Rename(c client.ReplicationControllersNamespacer, rc *api.ReplicationController, newName string) error { +func Rename(c coreclient.ReplicationControllersGetter, rc *api.ReplicationController, newName string) error { oldName := rc.Name rc.Name = newName rc.ResourceVersion = "" @@ -560,7 +559,7 @@ func Rename(c client.ReplicationControllersNamespacer, rc *api.ReplicationContro return nil } -func LoadExistingNextReplicationController(c client.ReplicationControllersNamespacer, namespace, newName string) (*api.ReplicationController, error) { +func LoadExistingNextReplicationController(c coreclient.ReplicationControllersGetter, namespace, newName string) (*api.ReplicationController, error) { if len(newName) == 0 { return nil, nil } @@ -580,10 +579,10 @@ type NewControllerConfig struct { PullPolicy api.PullPolicy } -func CreateNewControllerFromCurrentController(c client.Interface, codec runtime.Codec, cfg *NewControllerConfig) (*api.ReplicationController, error) { +func CreateNewControllerFromCurrentController(rcClient coreclient.ReplicationControllersGetter, codec runtime.Codec, cfg *NewControllerConfig) (*api.ReplicationController, error) { containerIndex := 0 // load the old RC into the "new" RC - newRc, err := c.ReplicationControllers(cfg.Namespace).Get(cfg.OldName) + newRc, err := rcClient.ReplicationControllers(cfg.Namespace).Get(cfg.OldName) if err != nil { return nil, err } @@ -669,21 +668,21 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) { rc.Annotations[nextControllerAnnotation] = name } -func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) { +func UpdateExistingReplicationController(rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) { if _, found := oldRc.Spec.Selector[deploymentKey]; !found { SetNextControllerAnnotation(oldRc, newName) - return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out) + return AddDeploymentKeyToReplicationController(oldRc, rcClient, podClient, deploymentKey, deploymentValue, namespace, out) } else { // If we didn't need to update the controller for the deployment key, we still need to write // the "next" controller. applyUpdate := func(rc *api.ReplicationController) { SetNextControllerAnnotation(rc, newName) } - return updateRcWithRetries(c, namespace, oldRc, applyUpdate) + return updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate) } } -func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { +func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) { var err error // First, update the template label. This ensures that any newly created pods will have the new label applyUpdate := func(rc *api.ReplicationController) { @@ -692,7 +691,7 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c } rc.Spec.Template.Labels[deploymentKey] = deploymentValue } - if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil { + if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil { return nil, err } @@ -700,7 +699,7 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c // TODO: extract the code from the label command and re-use it here. selector := labels.SelectorFromSet(oldRc.Spec.Selector) options := api.ListOptions{LabelSelector: selector} - podList, err := client.Pods(namespace).List(options) + podList, err := podClient.Pods(namespace).List(options) if err != nil { return nil, err } @@ -715,7 +714,7 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c p.Labels[deploymentKey] = deploymentValue } } - if pod, err = updatePodWithRetries(client, namespace, pod, applyUpdate); err != nil { + if pod, err = updatePodWithRetries(podClient, namespace, pod, applyUpdate); err != nil { return nil, err } } @@ -732,7 +731,7 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c rc.Spec.Selector[deploymentKey] = deploymentValue } // Update the selector of the rc so it manages all the pods we updated above - if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil { + if oldRc, err = updateRcWithRetries(rcClient, namespace, oldRc, applyUpdate); err != nil { return nil, err } @@ -741,11 +740,11 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c // we've finished re-adopting existing pods to the rc. selector = labels.SelectorFromSet(selectorCopy) options = api.ListOptions{LabelSelector: selector} - podList, err = client.Pods(namespace).List(options) + podList, err = podClient.Pods(namespace).List(options) for ix := range podList.Items { pod := &podList.Items[ix] if value, found := pod.Labels[deploymentKey]; !found || value != deploymentValue { - if err := client.Pods(namespace).Delete(pod.Name, nil); err != nil { + if err := podClient.Pods(namespace).Delete(pod.Name, nil); err != nil { return nil, err } } @@ -760,7 +759,7 @@ type updateRcFunc func(controller *api.ReplicationController) // 1. Get latest resource // 2. applyUpdate // 3. Update the resource -func updateRcWithRetries(c client.Interface, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) { +func updateRcWithRetries(rcClient coreclient.ReplicationControllersGetter, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) { // Deep copy the rc in case we failed on Get during retry loop obj, err := api.Scheme.Copy(rc) if err != nil { @@ -770,14 +769,14 @@ func updateRcWithRetries(c client.Interface, namespace string, rc *api.Replicati err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) { // Apply the update, then attempt to push it to the apiserver. applyUpdate(rc) - if rc, e = c.ReplicationControllers(namespace).Update(rc); e == nil { + if rc, e = rcClient.ReplicationControllers(namespace).Update(rc); e == nil { // rc contains the latest controller post update return } updateErr := e // Update the controller with the latest resource version, if the update failed we // can't trust rc so use oldRc.Name. - if rc, e = c.ReplicationControllers(namespace).Get(oldRc.Name); e != nil { + if rc, e = rcClient.ReplicationControllers(namespace).Get(oldRc.Name); e != nil { // The Get failed: Value in rc cannot be trusted. rc = oldRc } @@ -795,7 +794,7 @@ type updatePodFunc func(controller *api.Pod) // 1. Get latest resource // 2. applyUpdate // 3. Update the resource -func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) { +func updatePodWithRetries(podClient coreclient.PodsGetter, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) { // Deep copy the pod in case we failed on Get during retry loop obj, err := api.Scheme.Copy(pod) if err != nil { @@ -805,11 +804,11 @@ func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, ap err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) { // Apply the update, then attempt to push it to the apiserver. applyUpdate(pod) - if pod, e = c.Pods(namespace).Update(pod); e == nil { + if pod, e = podClient.Pods(namespace).Update(pod); e == nil { return } updateErr := e - if pod, e = c.Pods(namespace).Get(oldPod.Name); e != nil { + if pod, e = podClient.Pods(namespace).Get(oldPod.Name); e != nil { pod = oldPod } // Only return the error from update @@ -820,7 +819,7 @@ func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, ap return pod, err } -func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) { +func FindSourceController(r coreclient.ReplicationControllersGetter, namespace, name string) (*api.ReplicationController, error) { list, err := r.ReplicationControllers(namespace).List(api.ListOptions{}) if err != nil { return nil, err diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 044be5706f..53781bfcf7 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -27,6 +27,11 @@ import ( "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/unversioned" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/util/wait" @@ -44,10 +49,10 @@ type Scaler interface { ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) } -func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) { +func ScalerFor(kind unversioned.GroupKind, c *internalclientset.Clientset) (Scaler, error) { switch kind { case api.Kind("ReplicationController"): - return &ReplicationControllerScaler{c}, nil + return &ReplicationControllerScaler{c.Core()}, nil case extensions.Kind("ReplicaSet"): return &ReplicaSetScaler{c.Extensions()}, nil case extensions.Kind("Job"), batch.Kind("Job"): @@ -155,7 +160,7 @@ func (precondition *ScalePrecondition) ValidateReplicationController(controller } type ReplicationControllerScaler struct { - c client.Interface + c coreclient.ReplicationControllersGetter } // ScaleSimple does a simple one-shot attempt at scaling. It returns the @@ -253,7 +258,7 @@ func (precondition *ScalePrecondition) ValidateReplicaSet(replicaSet *extensions } type ReplicaSetScaler struct { - c client.ExtensionsInterface + c extensionsclient.ReplicaSetsGetter } // ScaleSimple does a simple one-shot attempt at scaling. It returns the @@ -324,7 +329,7 @@ func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error { } type PetSetScaler struct { - c client.AppsInterface + c appsclient.PetSetsGetter } // ScaleSimple does a simple one-shot attempt at scaling. It returns the @@ -377,7 +382,7 @@ func (scaler *PetSetScaler) Scale(namespace, name string, newSize uint, precondi } type JobScaler struct { - c client.BatchInterface + c batchclient.JobsGetter } // ScaleSimple is responsible for updating job's parallelism. It returns the @@ -445,7 +450,7 @@ func (precondition *ScalePrecondition) ValidateDeployment(deployment *extensions } type DeploymentScaler struct { - c client.ExtensionsInterface + c extensionsclient.DeploymentsGetter } // ScaleSimple is responsible for updating a deployment's desired replicas diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index 99538f58a5..d64fac2ca3 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -28,6 +28,11 @@ import ( "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/unversioned" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/unversioned" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/labels" @@ -63,68 +68,71 @@ func IsNoSuchReaperError(err error) bool { return ok } -func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) { +func ReaperFor(kind unversioned.GroupKind, c *internalclientset.Clientset) (Reaper, error) { switch kind { case api.Kind("ReplicationController"): - return &ReplicationControllerReaper{c, Interval, Timeout}, nil + return &ReplicationControllerReaper{c.Core(), Interval, Timeout}, nil case extensions.Kind("ReplicaSet"): - return &ReplicaSetReaper{c, Interval, Timeout}, nil + return &ReplicaSetReaper{c.Extensions(), Interval, Timeout}, nil case extensions.Kind("DaemonSet"): - return &DaemonSetReaper{c, Interval, Timeout}, nil + return &DaemonSetReaper{c.Extensions(), Interval, Timeout}, nil case api.Kind("Pod"): - return &PodReaper{c}, nil + return &PodReaper{c.Core()}, nil case api.Kind("Service"): - return &ServiceReaper{c}, nil + return &ServiceReaper{c.Core()}, nil case extensions.Kind("Job"), batch.Kind("Job"): - return &JobReaper{c, Interval, Timeout}, nil + return &JobReaper{c.Batch(), c.Core(), Interval, Timeout}, nil case apps.Kind("PetSet"): - return &PetSetReaper{c, Interval, Timeout}, nil + return &PetSetReaper{c.Apps(), c.Core(), Interval, Timeout}, nil case extensions.Kind("Deployment"): - return &DeploymentReaper{c, Interval, Timeout}, nil + return &DeploymentReaper{c.Extensions(), c.Extensions(), Interval, Timeout}, nil } return nil, &NoSuchReaperError{kind} } -func ReaperForReplicationController(c client.Interface, timeout time.Duration) (Reaper, error) { - return &ReplicationControllerReaper{c, Interval, timeout}, nil +func ReaperForReplicationController(rcClient coreclient.ReplicationControllersGetter, timeout time.Duration) (Reaper, error) { + return &ReplicationControllerReaper{rcClient, Interval, timeout}, nil } type ReplicationControllerReaper struct { - client.Interface + client coreclient.ReplicationControllersGetter pollInterval, timeout time.Duration } type ReplicaSetReaper struct { - client.Interface + client extensionsclient.ReplicaSetsGetter pollInterval, timeout time.Duration } type DaemonSetReaper struct { - client.Interface + client extensionsclient.DaemonSetsGetter pollInterval, timeout time.Duration } type JobReaper struct { - client.Interface + client batchclient.JobsGetter + podClient coreclient.PodsGetter pollInterval, timeout time.Duration } type DeploymentReaper struct { - client.Interface + dClient extensionsclient.DeploymentsGetter + rsClient extensionsclient.ReplicaSetsGetter pollInterval, timeout time.Duration } type PodReaper struct { - client.Interface + client coreclient.PodsGetter } type ServiceReaper struct { - client.Interface + client coreclient.ServicesGetter } type PetSetReaper struct { - client.Interface + client appsclient.PetSetsGetter + podClient coreclient.PodsGetter pollInterval, timeout time.Duration } @@ -134,8 +142,8 @@ type objInterface interface { } // getOverlappingControllers finds rcs that this controller overlaps, as well as rcs overlapping this controller. -func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api.ReplicationController) ([]api.ReplicationController, error) { - rcs, err := c.List(api.ListOptions{}) +func getOverlappingControllers(rcClient coreclient.ReplicationControllerInterface, rc *api.ReplicationController) ([]api.ReplicationController, error) { + rcs, err := rcClient.List(api.ListOptions{}) if err != nil { return nil, fmt.Errorf("error getting replication controllers: %v", err) } @@ -151,11 +159,8 @@ func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api. } func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - rc := reaper.ReplicationControllers(namespace) - scaler, err := ScalerFor(api.Kind("ReplicationController"), *reaper) - if err != nil { - return err - } + rc := reaper.client.ReplicationControllers(namespace) + scaler := &ReplicationControllerScaler{reaper.client} ctrl, err := rc.Get(name) if err != nil { return err @@ -223,11 +228,8 @@ func getOverlappingReplicaSets(c client.ReplicaSetInterface, rs *extensions.Repl } func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - rsc := reaper.Extensions().ReplicaSets(namespace) - scaler, err := ScalerFor(extensions.Kind("ReplicaSet"), *reaper) - if err != nil { - return err - } + rsc := reaper.client.ReplicaSets(namespace) + scaler := &ReplicaSetScaler{reaper.client} rs, err := rsc.Get(name) if err != nil { return err @@ -290,7 +292,7 @@ func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Durati } func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - ds, err := reaper.Extensions().DaemonSets(namespace).Get(name) + ds, err := reaper.client.DaemonSets(namespace).Get(name) if err != nil { return err } @@ -305,13 +307,13 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio // force update to avoid version conflict ds.ResourceVersion = "" - if ds, err = reaper.Extensions().DaemonSets(namespace).Update(ds); err != nil { + if ds, err = reaper.client.DaemonSets(namespace).Update(ds); err != nil { return err } // Wait for the daemon set controller to kill all the daemon pods. if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) { - updatedDS, err := reaper.Extensions().DaemonSets(namespace).Get(name) + updatedDS, err := reaper.client.DaemonSets(namespace).Get(name) if err != nil { return false, nil } @@ -321,15 +323,12 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio return err } - return reaper.Extensions().DaemonSets(namespace).Delete(name) + return reaper.client.DaemonSets(namespace).Delete(name, nil) } func (reaper *PetSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - petsets := reaper.Apps().PetSets(namespace) - scaler, err := ScalerFor(apps.Kind("PetSet"), *reaper) - if err != nil { - return err - } + petsets := reaper.client.PetSets(namespace) + scaler := &PetSetScaler{reaper.client} ps, err := petsets.Get(name) if err != nil { return err @@ -346,7 +345,7 @@ func (reaper *PetSetReaper) Stop(namespace, name string, timeout time.Duration, // TODO: This shouldn't be needed, see corresponding TODO in PetSetHasDesiredPets. // PetSet should track generation number. - pods := reaper.Pods(namespace) + pods := reaper.podClient.Pods(namespace) selector, _ := unversioned.LabelSelectorAsSelector(ps.Spec.Selector) options := api.ListOptions{LabelSelector: selector} podList, err := pods.List(options) @@ -372,12 +371,9 @@ func (reaper *PetSetReaper) Stop(namespace, name string, timeout time.Duration, } func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - jobs := reaper.Batch().Jobs(namespace) - pods := reaper.Pods(namespace) - scaler, err := ScalerFor(batch.Kind("Job"), *reaper) - if err != nil { - return err - } + jobs := reaper.client.Jobs(namespace) + pods := reaper.podClient.Pods(namespace) + scaler := &JobScaler{reaper.client} job, err := jobs.Get(name) if err != nil { return err @@ -418,9 +414,9 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra } func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - deployments := reaper.Extensions().Deployments(namespace) - replicaSets := reaper.Extensions().ReplicaSets(namespace) - rsReaper, _ := ReaperFor(extensions.Kind("ReplicaSet"), reaper) + deployments := reaper.dClient.Deployments(namespace) + replicaSets := reaper.rsClient.ReplicaSets(namespace) + rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout} deployment, err := reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) { // set deployment's history and scale to 0 @@ -473,7 +469,7 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati type updateDeploymentFunc func(d *extensions.Deployment) func (reaper *DeploymentReaper) updateDeploymentWithRetries(namespace, name string, applyUpdate updateDeploymentFunc) (deployment *extensions.Deployment, err error) { - deployments := reaper.Extensions().Deployments(namespace) + deployments := reaper.dClient.Deployments(namespace) err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { if deployment, err = deployments.Get(name); err != nil { return false, err @@ -493,7 +489,7 @@ func (reaper *DeploymentReaper) updateDeploymentWithRetries(namespace, name stri } func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - pods := reaper.Pods(namespace) + pods := reaper.client.Pods(namespace) _, err := pods.Get(name) if err != nil { return err @@ -502,10 +498,10 @@ func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gra } func (reaper *ServiceReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { - services := reaper.Services(namespace) + services := reaper.client.Services(namespace) _, err := services.Get(name) if err != nil { return err } - return services.Delete(name) + return services.Delete(name, nil) }