Switch kubectl rollout status to UntilWithSync to avoid premature

timeouts
pull/8/head
Tomas Nozicka 2018-08-24 12:22:56 +02:00
parent f2a6fd394a
commit 7793211669
1 changed files with 72 additions and 42 deletions

View File

@ -23,10 +23,17 @@ import (
"github.com/spf13/cobra"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericclioptions/resource"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
@ -62,9 +69,11 @@ type RolloutStatusOptions struct {
Watch bool
Revision int64
Timeout time.Duration
StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error)
Builder func() *resource.Builder
StatusViewer func(*meta.RESTMapping) (kubectl.StatusViewer, error)
Builder func() *resource.Builder
DynamicClient dynamic.Interface
FilenameOptions *resource.FilenameOptions
genericclioptions.IOStreams
@ -76,6 +85,7 @@ func NewRolloutStatusOptions(streams genericclioptions.IOStreams) *RolloutStatus
FilenameOptions: &resource.FilenameOptions{},
IOStreams: streams,
Watch: true,
Timeout: 0,
}
}
@ -102,6 +112,7 @@ func NewCmdRolloutStatus(f cmdutil.Factory, streams genericclioptions.IOStreams)
cmdutil.AddFilenameOptionFlags(cmd, o.FilenameOptions, usage)
cmd.Flags().BoolVarP(&o.Watch, "watch", "w", o.Watch, "Watch the status of the rollout until it's done.")
cmd.Flags().Int64Var(&o.Revision, "revision", o.Revision, "Pin to a specific revision for showing its status. Defaults to 0 (last revision).")
cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
return cmd
}
@ -119,6 +130,17 @@ func (o *RolloutStatusOptions) Complete(f cmdutil.Factory, args []string) error
o.StatusViewer = func(mapping *meta.RESTMapping) (kubectl.StatusViewer, error) {
return polymorphichelpers.StatusViewerFn(f, mapping)
}
clientConfig, err := f.ToRESTConfig()
if err != nil {
return err
}
o.DynamicClient, err = dynamic.NewForConfig(clientConfig)
if err != nil {
return err
}
return nil
}
@ -158,60 +180,68 @@ func (o *RolloutStatusOptions) Run() error {
info := infos[0]
mapping := info.ResourceMapping()
obj, err := r.Object()
if err != nil {
return err
}
rv, err := meta.NewAccessor().ResourceVersion(obj)
if err != nil {
return err
}
statusViewer, err := o.StatusViewer(mapping)
if err != nil {
return err
}
// check if deployment's has finished the rollout
status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision)
if err != nil {
return err
}
fmt.Fprintf(o.Out, "%s", status)
if done {
return nil
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(options)
},
}
shouldWatch := o.Watch
if !shouldWatch {
return nil
}
preconditionFunc := func(store cache.Store) (bool, error) {
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
if err != nil {
return true, err
}
if !exists {
// We need to make sure we see the object in the cache before we start waiting for events
// or we would be waiting for the timeout if such object didn't exist.
return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
}
// watch for changes to the deployment
w, err := r.Watch(rv)
if err != nil {
return err
return false, nil
}
// if the rollout isn't done yet, keep watching deployment status
// TODO: expose timeout
timeout := 0 * time.Second
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
intr := interrupt.New(nil, cancel)
return intr.Run(func() error {
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
// print deployment's status
status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision)
if err != nil {
return false, err
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, preconditionFunc, func(e watch.Event) (bool, error) {
switch t := e.Type; t {
case watch.Added, watch.Modified:
status, done, err := statusViewer.Status(e.Object.(runtime.Unstructured), o.Revision)
if err != nil {
return false, err
}
fmt.Fprintf(o.Out, "%s", status)
// Quit waiting if the rollout is done
if done {
return true, nil
}
shouldWatch := o.Watch
if !shouldWatch {
return true, nil
}
return false, nil
case watch.Deleted:
// We need to abort to avoid cases of recreation and not to silently watch the wrong (new) object
return true, fmt.Errorf("object has been deleted")
default:
return true, fmt.Errorf("internal error: unexpected event %#v", e)
}
fmt.Fprintf(o.Out, "%s", status)
// Quit waiting if the rollout is done
if done {
return true, nil
}
return false, nil
})
return err
})