From 5a8afa073f6b8cbb8b09f997f6db747c39dffb6e Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 3 Oct 2018 14:16:44 -0400 Subject: [PATCH] Dynamic client watchers should be able to handle ERROR gracefully Watch can return type "ERROR" and a metav1.Status object. We need to handle that during wait, and make it easy to handle the status object. --- pkg/kubectl/cmd/wait/BUILD | 1 + pkg/kubectl/cmd/wait/wait.go | 38 +++++- pkg/kubectl/cmd/wait/wait_test.go | 127 +++++++++++++++++- .../apimachinery/pkg/api/errors/errors.go | 16 ++- .../src/k8s.io/apiserver/Godeps/Godeps.json | 4 + .../src/k8s.io/apiserver/pkg/endpoints/BUILD | 2 + .../apiserver/pkg/endpoints/watch_test.go | 127 ++++++++++++++++++ 7 files changed, 308 insertions(+), 7 deletions(-) diff --git a/pkg/kubectl/cmd/wait/BUILD b/pkg/kubectl/cmd/wait/BUILD index 3fb1238067..919e4bd95b 100644 --- a/pkg/kubectl/cmd/wait/BUILD +++ b/pkg/kubectl/cmd/wait/BUILD @@ -45,6 +45,7 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", diff --git a/pkg/kubectl/cmd/wait/wait.go b/pkg/kubectl/cmd/wait/wait.go index c8f6bfb438..c338fdec00 100644 --- a/pkg/kubectl/cmd/wait/wait.go +++ b/pkg/kubectl/cmd/wait/wait.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "io" "strings" "time" @@ -145,7 +146,7 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) { if err != nil { return nil, err } - conditionFn, err := conditionFuncFor(flags.ForCondition) + conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut) if err != nil { return nil, err } @@ -168,7 +169,7 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) { return o, nil } -func conditionFuncFor(condition string) (ConditionFunc, error) { +func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) { if strings.ToLower(condition) == "delete" { return IsDeleted, nil } @@ -183,6 +184,7 @@ func conditionFuncFor(condition string) (ConditionFunc, error) { return ConditionalWait{ conditionName: conditionName, conditionStatus: conditionValue, + errOut: errOut, }.IsConditionMet, nil } @@ -281,7 +283,7 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error } ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) - watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, isDeleted) + watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, Wait{errOut: o.ErrOut}.IsDeleted) cancel() switch { case err == nil: @@ -299,14 +301,33 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error } } -func isDeleted(event watch.Event) (bool, error) { - return event.Type == watch.Deleted, nil +// Wait has helper methods for handling watches, including error handling. +type Wait struct { + errOut io.Writer +} + +// IsDeleted returns true if the object is deleted. It prints any errors it encounters. +func (w Wait) IsDeleted(event watch.Event) (bool, error) { + switch event.Type { + case watch.Error: + // keep waiting in the event we see an error - we expect the watch to be closed by + // the server if the error is unrecoverable. + err := apierrors.FromObject(event.Object) + fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err) + return false, nil + case watch.Deleted: + return true, nil + default: + return false, nil + } } // ConditionalWait hold information to check an API status condition type ConditionalWait struct { conditionName string conditionStatus string + // errOut is written to if an error occurs + errOut io.Writer } // IsConditionMet is a conditionfunc for waiting on an API condition to be met @@ -389,6 +410,13 @@ func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, e } func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) { + if event.Type == watch.Error { + // keep waiting in the event we see an error - we expect the watch to be closed by + // the server + err := apierrors.FromObject(event.Object) + fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) + return false, nil + } if event.Type == watch.Deleted { // this will chain back out, result in another get and an return false back up the chain return false, nil diff --git a/pkg/kubectl/cmd/wait/wait_test.go b/pkg/kubectl/cmd/wait/wait_test.go index 01d510f21a..ef34dec72a 100644 --- a/pkg/kubectl/cmd/wait/wait_test.go +++ b/pkg/kubectl/cmd/wait/wait_test.go @@ -17,6 +17,7 @@ limitations under the License. package wait import ( + "io/ioutil" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/davecgh/go-spew/spew" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -53,6 +55,16 @@ func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Uns } } +func newUnstructuredStatus(status *metav1.Status) runtime.Unstructured { + obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(status) + if err != nil { + panic(err) + } + return &unstructured.Unstructured{ + Object: obj, + } +} + func addCondition(in *unstructured.Unstructured, name, status string) *unstructured.Unstructured { conditions, _, _ := unstructured.NestedSlice(in.Object, "status", "conditions") conditions = append(conditions, map[string]interface{}{ @@ -286,6 +298,61 @@ func TestWaitForDeletion(t *testing.T) { } }, }, + { + name: "ignores watch error", + infos: []*resource.Info{ + { + Mapping: &meta.RESTMapping{ + Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"}, + }, + Name: "name-foo", + Namespace: "ns-foo", + }, + }, + fakeClient: func() *dynamicfakeclient.FakeDynamicClient { + fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme) + fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) { + return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil + }) + count := 0 + fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { + fakeWatch := watch.NewRaceFreeFake() + if count == 0 { + fakeWatch.Error(newUnstructuredStatus(&metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: "Failure", + Code: 500, + Message: "Bad", + })) + fakeWatch.Stop() + } else { + fakeWatch.Action(watch.Deleted, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo")) + } + count++ + return true, fakeWatch, nil + }) + return fakeClient + }, + timeout: 10 * time.Second, + + validateActions: func(t *testing.T, actions []clienttesting.Action) { + if len(actions) != 4 { + t.Fatal(spew.Sdump(actions)) + } + if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" { + t.Error(spew.Sdump(actions)) + } + if !actions[1].Matches("watch", "theresource") { + t.Error(spew.Sdump(actions)) + } + if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" { + t.Error(spew.Sdump(actions)) + } + if !actions[3].Matches("watch", "theresource") { + t.Error(spew.Sdump(actions)) + } + }, + }, } for _, test := range tests { @@ -544,6 +611,64 @@ func TestWaitForCondition(t *testing.T) { } }, }, + { + name: "ignores watch error", + infos: []*resource.Info{ + { + Mapping: &meta.RESTMapping{ + Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"}, + }, + Name: "name-foo", + Namespace: "ns-foo", + }, + }, + fakeClient: func() *dynamicfakeclient.FakeDynamicClient { + fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme) + fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) { + return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil + }) + count := 0 + fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { + fakeWatch := watch.NewRaceFreeFake() + if count == 0 { + fakeWatch.Error(newUnstructuredStatus(&metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: "Failure", + Code: 500, + Message: "Bad", + })) + fakeWatch.Stop() + } else { + fakeWatch.Action(watch.Modified, addCondition( + newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), + "the-condition", "status-value", + )) + } + count++ + return true, fakeWatch, nil + }) + return fakeClient + }, + timeout: 10 * time.Second, + + validateActions: func(t *testing.T, actions []clienttesting.Action) { + if len(actions) != 4 { + t.Fatal(spew.Sdump(actions)) + } + if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" { + t.Error(spew.Sdump(actions)) + } + if !actions[1].Matches("watch", "theresource") { + t.Error(spew.Sdump(actions)) + } + if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" { + t.Error(spew.Sdump(actions)) + } + if !actions[3].Matches("watch", "theresource") { + t.Error(spew.Sdump(actions)) + } + }, + }, } for _, test := range tests { @@ -555,7 +680,7 @@ func TestWaitForCondition(t *testing.T) { Timeout: test.timeout, Printer: printers.NewDiscardingPrinter(), - ConditionFn: ConditionalWait{conditionName: "the-condition", conditionStatus: "status-value"}.IsConditionMet, + ConditionFn: ConditionalWait{conditionName: "the-condition", conditionStatus: "status-value", errOut: ioutil.Discard}.IsConditionMet, IOStreams: genericclioptions.NewTestIOStreamsDiscard(), } err := o.RunWait() diff --git a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go index bcc032df9d..e736a98614 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "net/http" + "reflect" "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -82,7 +83,20 @@ func (u *UnexpectedObjectError) Error() string { func FromObject(obj runtime.Object) error { switch t := obj.(type) { case *metav1.Status: - return &StatusError{*t} + return &StatusError{ErrStatus: *t} + case runtime.Unstructured: + var status metav1.Status + obj := t.UnstructuredContent() + if !reflect.DeepEqual(obj["kind"], "Status") { + break + } + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(t.UnstructuredContent(), &status); err != nil { + return err + } + if status.APIVersion != "v1" && status.APIVersion != "meta.k8s.io/v1" { + break + } + return &StatusError{ErrStatus: status} } return &UnexpectedObjectError{obj} } diff --git a/staging/src/k8s.io/apiserver/Godeps/Godeps.json b/staging/src/k8s.io/apiserver/Godeps/Godeps.json index bb3b653bd4..bf15847c84 100644 --- a/staging/src/k8s.io/apiserver/Godeps/Godeps.json +++ b/staging/src/k8s.io/apiserver/Godeps/Godeps.json @@ -1882,6 +1882,10 @@ "ImportPath": "k8s.io/client-go/discovery", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" }, + { + "ImportPath": "k8s.io/client-go/dynamic", + "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + }, { "ImportPath": "k8s.io/client-go/informers", "Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD index d1ab6e43b6..45f63ebeb5 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD @@ -54,6 +54,8 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//vendor/github.com/emicklei/go-restful:go_default_library", "//vendor/golang.org/x/net/websocket:go_default_library", ], diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go index 802b55e732..81d141fa75 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go @@ -25,13 +25,16 @@ import ( "net/http/httptest" "net/url" "reflect" + "strings" "sync" "testing" "time" "golang.org/x/net/websocket" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -43,6 +46,8 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers" apitesting "k8s.io/apiserver/pkg/endpoints/testing" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/client-go/dynamic" + restclient "k8s.io/client-go/rest" ) // watchJSON defines the expected JSON wire equivalent of watch.Event @@ -561,6 +566,128 @@ func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { } } +func TestWatchHTTPErrors(t *testing.T) { + watcher := watch.NewFake() + timeoutCh := make(chan time.Time) + done := make(chan struct{}) + + info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) + if !ok || info.StreamSerializer == nil { + t.Fatal(info) + } + serializer := info.StreamSerializer + + // Setup a new watchserver + watchServer := &handlers.WatchServer{ + Watching: watcher, + + MediaType: "testcase/json", + Framer: serializer.Framer, + Encoder: newCodec, + EmbeddedEncoder: newCodec, + + Fixup: func(obj runtime.Object) {}, + TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done}, + } + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + watchServer.ServeHTTP(w, req) + })) + defer s.Close() + + // Setup a client + dest, _ := url.Parse(s.URL) + dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/simple" + dest.RawQuery = "watch=true" + + req, _ := http.NewRequest("GET", dest.String(), nil) + client := http.Client{} + resp, err := client.Do(req) + errStatus := errors.NewInternalError(fmt.Errorf("we got an error")).Status() + watcher.Error(&errStatus) + watcher.Stop() + + // Make sure we can actually watch an endpoint + decoder := json.NewDecoder(resp.Body) + var got watchJSON + err = decoder.Decode(&got) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if got.Type != watch.Error { + t.Fatalf("unexpected watch type: %#v", got) + } + status := &metav1.Status{} + if err := json.Unmarshal(got.Object, status); err != nil { + t.Fatal(err) + } + if status.Kind != "Status" || status.APIVersion != "v1" || status.Code != 500 || status.Status != "Failure" || !strings.Contains(status.Message, "we got an error") { + t.Fatalf("error: %#v", status) + } +} + +func TestWatchHTTPDynamicClientErrors(t *testing.T) { + watcher := watch.NewFake() + timeoutCh := make(chan time.Time) + done := make(chan struct{}) + + info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON) + if !ok || info.StreamSerializer == nil { + t.Fatal(info) + } + serializer := info.StreamSerializer + + // Setup a new watchserver + watchServer := &handlers.WatchServer{ + Watching: watcher, + + MediaType: "testcase/json", + Framer: serializer.Framer, + Encoder: newCodec, + EmbeddedEncoder: newCodec, + + Fixup: func(obj runtime.Object) {}, + TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done}, + } + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + watchServer.ServeHTTP(w, req) + })) + defer s.Close() + + client := dynamic.NewForConfigOrDie(&restclient.Config{ + Host: s.URL, + APIPath: "/" + prefix, + }).Resource(newGroupVersion.WithResource("simple")) + + w, err := client.Watch(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + + errStatus := errors.NewInternalError(fmt.Errorf("we got an error")).Status() + watcher.Error(&errStatus) + watcher.Stop() + + got := <-w.ResultChan() + if got.Type != watch.Error { + t.Fatalf("unexpected watch type: %#v", got) + } + obj, ok := got.Object.(*unstructured.Unstructured) + if !ok { + t.Fatalf("not the correct object type: %#v", got) + } + + status := &metav1.Status{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), status); err != nil { + t.Fatal(err) + } + if status.Kind != "Status" || status.APIVersion != "v1" || status.Code != 500 || status.Status != "Failure" || !strings.Contains(status.Message, "we got an error") { + t.Fatalf("error: %#v", status) + } + t.Logf("status: %#v", status) +} + func TestWatchHTTPTimeout(t *testing.T) { watcher := watch.NewFake() timeoutCh := make(chan time.Time)