From 4196780edaa380b80f01fdcd6df30ceed559804f Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 5 Nov 2014 17:22:18 -0800 Subject: [PATCH] Add self links to objects sent down the watch channel. --- pkg/api/ref.go | 2 +- pkg/apiserver/apiserver.go | 8 ++++++- pkg/apiserver/apiserver_test.go | 22 ++++++++++++++--- pkg/apiserver/watch.go | 33 ++++++++++++++++++++++---- pkg/apiserver/watch_test.go | 42 +++++++++++++++++++++++---------- 5 files changed, 85 insertions(+), 22 deletions(-) diff --git a/pkg/api/ref.go b/pkg/api/ref.go index 36eb647eb1..64c794406a 100644 --- a/pkg/api/ref.go +++ b/pkg/api/ref.go @@ -46,7 +46,7 @@ func GetReference(obj runtime.Object) (*ObjectReference, error) { } version := versionFromSelfLink.FindStringSubmatch(meta.SelfLink()) if len(version) < 2 { - return nil, fmt.Errorf("unexpected self link format: %v", meta.SelfLink()) + return nil, fmt.Errorf("unexpected self link format: '%v'; got version '%v'", meta.SelfLink(), version) } return &ObjectReference{ Kind: kind, diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 47c27ed687..f6d0b725f7 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" + "github.com/golang/glog" ) @@ -89,7 +90,12 @@ func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec, canonicalP // in a slash. func (g *APIGroup) InstallREST(mux Mux, paths ...string) { restHandler := &g.handler - watchHandler := &WatchHandler{g.handler.storage, g.handler.codec} + watchHandler := &WatchHandler{ + storage: g.handler.storage, + codec: g.handler.codec, + canonicalPrefix: g.handler.canonicalPrefix, + selfLinker: g.handler.selfLinker, + } redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec} opHandler := &OperationHandler{g.handler.ops, g.handler.codec} diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 52a055c50f..e01d4516d4 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -33,6 +33,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -44,17 +45,17 @@ func convert(obj runtime.Object) (runtime.Object, error) { return obj, nil } -var codec = latest.Codec +var codec = testapi.Codec() var selfLinker = latest.SelfLinker func init() { api.Scheme.AddKnownTypes("", &Simple{}, &SimpleList{}) - api.Scheme.AddKnownTypes(latest.Version, &Simple{}, &SimpleList{}) + api.Scheme.AddKnownTypes(testapi.Version(), &Simple{}, &SimpleList{}) } type Simple struct { api.TypeMeta `yaml:",inline" json:",inline"` - api.ObjectMeta `yaml:"metadata,inline" json:"metadata,inline"` + api.ObjectMeta `yaml:"metadata" json:"metadata"` Other string `yaml:"other,omitempty" json:"other,omitempty"` } @@ -68,6 +69,21 @@ type SimpleList struct { func (*SimpleList) IsAnAPIObject() {} +func TestSimpleSetupRight(t *testing.T) { + s := &Simple{ObjectMeta: api.ObjectMeta{Name: "aName"}} + wire, err := codec.Encode(s) + if err != nil { + t.Fatal(err) + } + s2, err := codec.Decode(wire) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(s, s2) { + t.Fatalf("encode/decode broken:\n%#v\n%#v\n", s, s2) + } +} + type SimpleRESTStorage struct { errors map[string]error list []Simple diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 9a00c62621..be70056141 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -19,21 +19,39 @@ package apiserver import ( "net/http" "net/url" + "path" "regexp" "strings" - "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" + + "code.google.com/p/go.net/websocket" + "github.com/golang/glog" ) type WatchHandler struct { - storage map[string]RESTStorage - codec runtime.Codec + storage map[string]RESTStorage + codec runtime.Codec + canonicalPrefix string + selfLinker runtime.SelfLinker +} + +// setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type. +func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { + name, err := h.selfLinker.Name(obj) + if err != nil { + return err + } + newURL := *req.URL + newURL.Path = path.Join(h.canonicalPrefix, req.URL.Path, name) + newURL.RawQuery = "" + newURL.Fragment = "" + return h.selfLinker.SetSelfLink(obj, newURL.String()) } func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion string) { @@ -84,7 +102,11 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // TODO: This is one watch per connection. We want to multiplex, so that // multiple watches of the same thing don't create two watches downstream. - watchServer := &WatchServer{watching, h.codec} + watchServer := &WatchServer{watching, h.codec, func(obj runtime.Object) { + if err := h.setSelfLinkAddName(obj, req); err != nil { + glog.Errorf("Failed to set self link for object %#v", obj) + } + }} if isWebsocketRequest(req) { websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) } else { @@ -100,6 +122,7 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { type WatchServer struct { watching watch.Interface codec runtime.Codec + fixup func(runtime.Object) } // HandleWS implements a websocket handler. @@ -122,6 +145,7 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) { // End of results. return } + w.fixup(event.Object) obj, err := watchjson.Object(w.codec, &event) if err != nil { // Client disconnect. @@ -171,6 +195,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // End of results. return } + self.fixup(event.Object) if err := encoder.Encode(&event); err != nil { // Client disconnect. self.watching.Stop() diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 4126936b47..bb37d9ca7a 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -40,9 +40,9 @@ var watchTestTable = []struct { t watch.EventType obj runtime.Object }{ - {watch.Added, &Simple{Other: "A Name"}}, - {watch.Modified, &Simple{Other: "Another Name"}}, - {watch.Deleted, &Simple{Other: "Another Name"}}, + {watch.Added, &Simple{ObjectMeta: api.ObjectMeta{Name: "foo"}}}, + {watch.Modified, &Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}}}, + {watch.Deleted, &Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}}}, } func TestWatchWebsocket(t *testing.T) { @@ -50,13 +50,13 @@ func TestWatchWebsocket(t *testing.T) { _ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work. handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, - }, codec, "/prefix/version", selfLinker) + }, codec, "/api/version", selfLinker) server := httptest.NewServer(handler) defer server.Close() dest, _ := url.Parse(server.URL) dest.Scheme = "ws" // Required by websocket, though the server never sees it. - dest.Path = "/prefix/version/watch/foo" + dest.Path = "/api/version/watch/foo" dest.RawQuery = "" ws, err := websocket.Dial(dest.String(), "", "http://localhost") @@ -76,7 +76,14 @@ func TestWatchWebsocket(t *testing.T) { if got.Type != action { t.Errorf("Unexpected type: %v", got.Type) } - if e, a := runtime.EncodeOrDie(codec, object), string(got.Object); !reflect.DeepEqual(e, a) { + gotObj, err := codec.Decode(got.Object) + if err != nil { + t.Fatalf("Decode error: %v", err) + } + if _, err := api.GetReference(gotObj); err != nil { + t.Errorf("Unable to construct reference: %v", err) + } + if e, a := object, gotObj; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } } @@ -97,13 +104,13 @@ func TestWatchHTTP(t *testing.T) { simpleStorage := &SimpleRESTStorage{} handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, - }, codec, "/prefix/version", selfLinker) + }, codec, "/api/version", selfLinker) server := httptest.NewServer(handler) defer server.Close() client := http.Client{} dest, _ := url.Parse(server.URL) - dest.Path = "/prefix/version/watch/foo" + dest.Path = "/api/version/watch/foo" dest.RawQuery = "" request, err := http.NewRequest("GET", dest.String(), nil) @@ -134,7 +141,16 @@ func TestWatchHTTP(t *testing.T) { if got.Type != item.t { t.Errorf("%d: Unexpected type: %v", i, got.Type) } - if e, a := runtime.EncodeOrDie(codec, item.obj), string(got.Object); !reflect.DeepEqual(e, a) { + t.Logf("obj: %v", string(got.Object)) + gotObj, err := codec.Decode(got.Object) + if err != nil { + t.Fatalf("Decode error: %v", err) + } + t.Logf("obj: %#v", gotObj) + if _, err := api.GetReference(gotObj); err != nil { + t.Errorf("Unable to construct reference: %v", err) + } + if e, a := item.obj, gotObj; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } } @@ -151,12 +167,12 @@ func TestWatchParamParsing(t *testing.T) { simpleStorage := &SimpleRESTStorage{} handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, - }, codec, "/prefix/version", selfLinker) + }, codec, "/api/version", selfLinker) server := httptest.NewServer(handler) defer server.Close() dest, _ := url.Parse(server.URL) - dest.Path = "/prefix/version/watch/foo" + dest.Path = "/api/version/watch/foo" table := []struct { rawQuery string @@ -223,14 +239,14 @@ func TestWatchProtocolSelection(t *testing.T) { simpleStorage := &SimpleRESTStorage{} handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, - }, codec, "/prefix/version", selfLinker) + }, codec, "/api/version", selfLinker) server := httptest.NewServer(handler) defer server.Close() defer server.CloseClientConnections() client := http.Client{} dest, _ := url.Parse(server.URL) - dest.Path = "/prefix/version/watch/foo" + dest.Path = "/api/version/watch/foo" dest.RawQuery = "" table := []struct {