diff --git a/pkg/kubectl/cmd/rollout/rollout_status.go b/pkg/kubectl/cmd/rollout/rollout_status.go index e231497d6a..160f4e845b 100644 --- a/pkg/kubectl/cmd/rollout/rollout_status.go +++ b/pkg/kubectl/cmd/rollout/rollout_status.go @@ -23,10 +23,17 @@ import ( "github.com/spf13/cobra" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" @@ -62,9 +69,11 @@ type RolloutStatusOptions struct { Watch bool Revision int64 + Timeout time.Duration - StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error) - Builder func() *resource.Builder + StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error) + Builder func() *resource.Builder + DynamicClient dynamic.Interface FilenameOptions *resource.FilenameOptions genericclioptions.IOStreams @@ -76,6 +85,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus FilenameOptions: &resource.FilenameOptions{}, IOStreams: streams, Watch: true, + Timeout: 0, } } @@ -102,6 +112,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams) cmdutil.AddFilenameOptionFlags(cmd, o.FilenameOptions, usage) cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.") cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).") + cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).") return cmd } @@ -119,6 +130,17 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error o.StatusViewer = func(mapping *meta.RESTMapping) (kubectl.StatusViewer, error) { return polymorphichelpers.StatusViewerFn(f, mapping) } + + clientConfig, err := f.ToRESTConfig() + if err != nil { + return err + } + + o.DynamicClient, err = dynamic.NewForConfig(clientConfig) + if err != nil { + return err + } + return nil } @@ -158,60 +180,68 @@ func (o *RolloutStatusOptions) Run() error { info := infos[0] mapping := info.ResourceMapping() - obj, err := r.Object() - if err != nil { - return err - } - rv, err := meta.NewAccessor().ResourceVersion(obj) - if err != nil { - return err - } - statusViewer, err := o.StatusViewer(mapping) if err != nil { return err } - // check if deployment's has finished the rollout - status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision) - if err != nil { - return err - } - fmt.Fprintf(o.Out, "%s", status) - if done { - return nil + fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(options) + }, } - shouldWatch := o.Watch - if !shouldWatch { - return nil - } + preconditionFunc := func(store cache.Store) (bool, error) { + _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) + if err != nil { + return true, err + } + if !exists { + // We need to make sure we see the object in the cache before we start waiting for events + // or we would be waiting for the timeout if such object didn't exist. + return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name) + } - // watch for changes to the deployment - w, err := r.Watch(rv) - if err != nil { - return err + return false, nil } // if the rollout isn't done yet, keep watching deployment status - // TODO: expose timeout - timeout := 0 * time.Second - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) intr := interrupt.New(nil, cancel) return intr.Run(func() error { - _, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { - // print deployment's status - status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision) - if err != nil { - return false, err + _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) { + switch t := e.Type; t { + case watch.Added, watch.Modified: + status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision) + if err != nil { + return false, err + } + fmt.Fprintf(o.Out, "%s", status) + // Quit waiting if the rollout is done + if done { + return true, nil + } + + shouldWatch := o.Watch + if !shouldWatch { + return true, nil + } + + return false, nil + + case watch.Deleted: + // We need to abort to avoid cases of recreation and not to silently watch the wrong (new) object + return true, fmt.Errorf("object has been deleted") + + default: + return true, fmt.Errorf("internal error: unexpected event %#v", e) } - fmt.Fprintf(o.Out, "%s", status) - // Quit waiting if the rollout is done - if done { - return true, nil - } - return false, nil }) return err })