mirror of https://github.com/k3s-io/k3s
Dynamic client watchers should be able to handle ERROR gracefully
Watch can return type "ERROR" and a metav1.Status object. We need to handle that during wait, and make it easy to handle the status object.pull/58/head
parent
95c99eb052
commit
5a8afa073f
|
@ -45,6 +45,7 @@ go_test(
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -145,7 +146,7 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
conditionFn, err := conditionFuncFor(flags.ForCondition)
|
conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -168,7 +169,7 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
|
||||||
return o, nil
|
return o, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func conditionFuncFor(condition string) (ConditionFunc, error) {
|
func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) {
|
||||||
if strings.ToLower(condition) == "delete" {
|
if strings.ToLower(condition) == "delete" {
|
||||||
return IsDeleted, nil
|
return IsDeleted, nil
|
||||||
}
|
}
|
||||||
|
@ -183,6 +184,7 @@ func conditionFuncFor(condition string) (ConditionFunc, error) {
|
||||||
return ConditionalWait{
|
return ConditionalWait{
|
||||||
conditionName: conditionName,
|
conditionName: conditionName,
|
||||||
conditionStatus: conditionValue,
|
conditionStatus: conditionValue,
|
||||||
|
errOut: errOut,
|
||||||
}.IsConditionMet, nil
|
}.IsConditionMet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,7 +283,7 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
|
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
|
||||||
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, isDeleted)
|
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, Wait{errOut: o.ErrOut}.IsDeleted)
|
||||||
cancel()
|
cancel()
|
||||||
switch {
|
switch {
|
||||||
case err == nil:
|
case err == nil:
|
||||||
|
@ -299,14 +301,33 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func isDeleted(event watch.Event) (bool, error) {
|
// Wait has helper methods for handling watches, including error handling.
|
||||||
return event.Type == watch.Deleted, nil
|
type Wait struct {
|
||||||
|
errOut io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
|
||||||
|
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Error:
|
||||||
|
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||||
|
// the server if the error is unrecoverable.
|
||||||
|
err := apierrors.FromObject(event.Object)
|
||||||
|
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
|
||||||
|
return false, nil
|
||||||
|
case watch.Deleted:
|
||||||
|
return true, nil
|
||||||
|
default:
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConditionalWait hold information to check an API status condition
|
// ConditionalWait hold information to check an API status condition
|
||||||
type ConditionalWait struct {
|
type ConditionalWait struct {
|
||||||
conditionName string
|
conditionName string
|
||||||
conditionStatus string
|
conditionStatus string
|
||||||
|
// errOut is written to if an error occurs
|
||||||
|
errOut io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
|
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
|
||||||
|
@ -389,6 +410,13 @@ func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
|
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
|
||||||
|
if event.Type == watch.Error {
|
||||||
|
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||||
|
// the server
|
||||||
|
err := apierrors.FromObject(event.Object)
|
||||||
|
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
if event.Type == watch.Deleted {
|
if event.Type == watch.Deleted {
|
||||||
// this will chain back out, result in another get and an return false back up the chain
|
// this will chain back out, result in another get and an return false back up the chain
|
||||||
return false, nil
|
return false, nil
|
||||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
package wait
|
package wait
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
|
@ -26,6 +27,7 @@ import (
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
@ -53,6 +55,16 @@ func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Uns
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newUnstructuredStatus(status *metav1.Status) runtime.Unstructured {
|
||||||
|
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(status)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return &unstructured.Unstructured{
|
||||||
|
Object: obj,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func addCondition(in *unstructured.Unstructured, name, status string) *unstructured.Unstructured {
|
func addCondition(in *unstructured.Unstructured, name, status string) *unstructured.Unstructured {
|
||||||
conditions, _, _ := unstructured.NestedSlice(in.Object, "status", "conditions")
|
conditions, _, _ := unstructured.NestedSlice(in.Object, "status", "conditions")
|
||||||
conditions = append(conditions, map[string]interface{}{
|
conditions = append(conditions, map[string]interface{}{
|
||||||
|
@ -286,6 +298,61 @@ func TestWaitForDeletion(t *testing.T) {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "ignores watch error",
|
||||||
|
infos: []*resource.Info{
|
||||||
|
{
|
||||||
|
Mapping: &meta.RESTMapping{
|
||||||
|
Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"},
|
||||||
|
},
|
||||||
|
Name: "name-foo",
|
||||||
|
Namespace: "ns-foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
|
||||||
|
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
|
||||||
|
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
|
||||||
|
})
|
||||||
|
count := 0
|
||||||
|
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
fakeWatch := watch.NewRaceFreeFake()
|
||||||
|
if count == 0 {
|
||||||
|
fakeWatch.Error(newUnstructuredStatus(&metav1.Status{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
|
||||||
|
Status: "Failure",
|
||||||
|
Code: 500,
|
||||||
|
Message: "Bad",
|
||||||
|
}))
|
||||||
|
fakeWatch.Stop()
|
||||||
|
} else {
|
||||||
|
fakeWatch.Action(watch.Deleted, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"))
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
return true, fakeWatch, nil
|
||||||
|
})
|
||||||
|
return fakeClient
|
||||||
|
},
|
||||||
|
timeout: 10 * time.Second,
|
||||||
|
|
||||||
|
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||||
|
if len(actions) != 4 {
|
||||||
|
t.Fatal(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[1].Matches("watch", "theresource") {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[3].Matches("watch", "theresource") {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
@ -544,6 +611,64 @@ func TestWaitForCondition(t *testing.T) {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "ignores watch error",
|
||||||
|
infos: []*resource.Info{
|
||||||
|
{
|
||||||
|
Mapping: &meta.RESTMapping{
|
||||||
|
Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"},
|
||||||
|
},
|
||||||
|
Name: "name-foo",
|
||||||
|
Namespace: "ns-foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
|
||||||
|
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
|
||||||
|
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
|
||||||
|
})
|
||||||
|
count := 0
|
||||||
|
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
fakeWatch := watch.NewRaceFreeFake()
|
||||||
|
if count == 0 {
|
||||||
|
fakeWatch.Error(newUnstructuredStatus(&metav1.Status{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"},
|
||||||
|
Status: "Failure",
|
||||||
|
Code: 500,
|
||||||
|
Message: "Bad",
|
||||||
|
}))
|
||||||
|
fakeWatch.Stop()
|
||||||
|
} else {
|
||||||
|
fakeWatch.Action(watch.Modified, addCondition(
|
||||||
|
newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"),
|
||||||
|
"the-condition", "status-value",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
return true, fakeWatch, nil
|
||||||
|
})
|
||||||
|
return fakeClient
|
||||||
|
},
|
||||||
|
timeout: 10 * time.Second,
|
||||||
|
|
||||||
|
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||||
|
if len(actions) != 4 {
|
||||||
|
t.Fatal(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[1].Matches("watch", "theresource") {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
if !actions[3].Matches("watch", "theresource") {
|
||||||
|
t.Error(spew.Sdump(actions))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
@ -555,7 +680,7 @@ func TestWaitForCondition(t *testing.T) {
|
||||||
Timeout: test.timeout,
|
Timeout: test.timeout,
|
||||||
|
|
||||||
Printer: printers.NewDiscardingPrinter(),
|
Printer: printers.NewDiscardingPrinter(),
|
||||||
ConditionFn: ConditionalWait{conditionName: "the-condition", conditionStatus: "status-value"}.IsConditionMet,
|
ConditionFn: ConditionalWait{conditionName: "the-condition", conditionStatus: "status-value", errOut: ioutil.Discard}.IsConditionMet,
|
||||||
IOStreams: genericclioptions.NewTestIOStreamsDiscard(),
|
IOStreams: genericclioptions.NewTestIOStreamsDiscard(),
|
||||||
}
|
}
|
||||||
err := o.RunWait()
|
err := o.RunWait()
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
@ -82,7 +83,20 @@ func (u *UnexpectedObjectError) Error() string {
|
||||||
func FromObject(obj runtime.Object) error {
|
func FromObject(obj runtime.Object) error {
|
||||||
switch t := obj.(type) {
|
switch t := obj.(type) {
|
||||||
case *metav1.Status:
|
case *metav1.Status:
|
||||||
return &StatusError{*t}
|
return &StatusError{ErrStatus: *t}
|
||||||
|
case runtime.Unstructured:
|
||||||
|
var status metav1.Status
|
||||||
|
obj := t.UnstructuredContent()
|
||||||
|
if !reflect.DeepEqual(obj["kind"], "Status") {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(t.UnstructuredContent(), &status); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if status.APIVersion != "v1" && status.APIVersion != "meta.k8s.io/v1" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return &StatusError{ErrStatus: status}
|
||||||
}
|
}
|
||||||
return &UnexpectedObjectError{obj}
|
return &UnexpectedObjectError{obj}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1882,6 +1882,10 @@
|
||||||
"ImportPath": "k8s.io/client-go/discovery",
|
"ImportPath": "k8s.io/client-go/discovery",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "k8s.io/client-go/dynamic",
|
||||||
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/client-go/informers",
|
"ImportPath": "k8s.io/client-go/informers",
|
||||||
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
||||||
|
|
|
@ -54,6 +54,8 @@ go_test(
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||||
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
"//vendor/github.com/emicklei/go-restful:go_default_library",
|
||||||
"//vendor/golang.org/x/net/websocket:go_default_library",
|
"//vendor/golang.org/x/net/websocket:go_default_library",
|
||||||
],
|
],
|
||||||
|
|
|
@ -25,13 +25,16 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
@ -43,6 +46,8 @@ import (
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers"
|
"k8s.io/apiserver/pkg/endpoints/handlers"
|
||||||
apitesting "k8s.io/apiserver/pkg/endpoints/testing"
|
apitesting "k8s.io/apiserver/pkg/endpoints/testing"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
|
"k8s.io/client-go/dynamic"
|
||||||
|
restclient "k8s.io/client-go/rest"
|
||||||
)
|
)
|
||||||
|
|
||||||
// watchJSON defines the expected JSON wire equivalent of watch.Event
|
// watchJSON defines the expected JSON wire equivalent of watch.Event
|
||||||
|
@ -561,6 +566,128 @@ func (t *fakeTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchHTTPErrors(t *testing.T) {
|
||||||
|
watcher := watch.NewFake()
|
||||||
|
timeoutCh := make(chan time.Time)
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
||||||
|
if !ok || info.StreamSerializer == nil {
|
||||||
|
t.Fatal(info)
|
||||||
|
}
|
||||||
|
serializer := info.StreamSerializer
|
||||||
|
|
||||||
|
// Setup a new watchserver
|
||||||
|
watchServer := &handlers.WatchServer{
|
||||||
|
Watching: watcher,
|
||||||
|
|
||||||
|
MediaType: "testcase/json",
|
||||||
|
Framer: serializer.Framer,
|
||||||
|
Encoder: newCodec,
|
||||||
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
|
Fixup: func(obj runtime.Object) {},
|
||||||
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
watchServer.ServeHTTP(w, req)
|
||||||
|
}))
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
// Setup a client
|
||||||
|
dest, _ := url.Parse(s.URL)
|
||||||
|
dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/simple"
|
||||||
|
dest.RawQuery = "watch=true"
|
||||||
|
|
||||||
|
req, _ := http.NewRequest("GET", dest.String(), nil)
|
||||||
|
client := http.Client{}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
errStatus := errors.NewInternalError(fmt.Errorf("we got an error")).Status()
|
||||||
|
watcher.Error(&errStatus)
|
||||||
|
watcher.Stop()
|
||||||
|
|
||||||
|
// Make sure we can actually watch an endpoint
|
||||||
|
decoder := json.NewDecoder(resp.Body)
|
||||||
|
var got watchJSON
|
||||||
|
err = decoder.Decode(&got)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if got.Type != watch.Error {
|
||||||
|
t.Fatalf("unexpected watch type: %#v", got)
|
||||||
|
}
|
||||||
|
status := &metav1.Status{}
|
||||||
|
if err := json.Unmarshal(got.Object, status); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if status.Kind != "Status" || status.APIVersion != "v1" || status.Code != 500 || status.Status != "Failure" || !strings.Contains(status.Message, "we got an error") {
|
||||||
|
t.Fatalf("error: %#v", status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatchHTTPDynamicClientErrors(t *testing.T) {
|
||||||
|
watcher := watch.NewFake()
|
||||||
|
timeoutCh := make(chan time.Time)
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
||||||
|
if !ok || info.StreamSerializer == nil {
|
||||||
|
t.Fatal(info)
|
||||||
|
}
|
||||||
|
serializer := info.StreamSerializer
|
||||||
|
|
||||||
|
// Setup a new watchserver
|
||||||
|
watchServer := &handlers.WatchServer{
|
||||||
|
Watching: watcher,
|
||||||
|
|
||||||
|
MediaType: "testcase/json",
|
||||||
|
Framer: serializer.Framer,
|
||||||
|
Encoder: newCodec,
|
||||||
|
EmbeddedEncoder: newCodec,
|
||||||
|
|
||||||
|
Fixup: func(obj runtime.Object) {},
|
||||||
|
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
watchServer.ServeHTTP(w, req)
|
||||||
|
}))
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
client := dynamic.NewForConfigOrDie(&restclient.Config{
|
||||||
|
Host: s.URL,
|
||||||
|
APIPath: "/" + prefix,
|
||||||
|
}).Resource(newGroupVersion.WithResource("simple"))
|
||||||
|
|
||||||
|
w, err := client.Watch(metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
errStatus := errors.NewInternalError(fmt.Errorf("we got an error")).Status()
|
||||||
|
watcher.Error(&errStatus)
|
||||||
|
watcher.Stop()
|
||||||
|
|
||||||
|
got := <-w.ResultChan()
|
||||||
|
if got.Type != watch.Error {
|
||||||
|
t.Fatalf("unexpected watch type: %#v", got)
|
||||||
|
}
|
||||||
|
obj, ok := got.Object.(*unstructured.Unstructured)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("not the correct object type: %#v", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
status := &metav1.Status{}
|
||||||
|
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), status); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if status.Kind != "Status" || status.APIVersion != "v1" || status.Code != 500 || status.Status != "Failure" || !strings.Contains(status.Message, "we got an error") {
|
||||||
|
t.Fatalf("error: %#v", status)
|
||||||
|
}
|
||||||
|
t.Logf("status: %#v", status)
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatchHTTPTimeout(t *testing.T) {
|
func TestWatchHTTPTimeout(t *testing.T) {
|
||||||
watcher := watch.NewFake()
|
watcher := watch.NewFake()
|
||||||
timeoutCh := make(chan time.Time)
|
timeoutCh := make(chan time.Time)
|
||||||
|
|
Loading…
Reference in New Issue