From bbf3b55e76a6fe2d79c83392902004bbbc1cc313 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 9 Aug 2014 17:12:55 -0400 Subject: [PATCH] Extract RESTHandler and allow API groupings Prepare for running multiple API versions on the same HTTP server by decoupling some of the mechanics of apiserver. Define a new APIGroup object which represents a version of the API. --- cmd/apiserver/apiserver.go | 11 +- cmd/integration/integration.go | 10 +- pkg/apiserver/apiserver.go | 308 ++++++++---------------------- pkg/apiserver/apiserver_test.go | 40 ++-- pkg/apiserver/minionproxy_test.go | 2 +- pkg/apiserver/operation_test.go | 32 ++-- pkg/apiserver/resthandler.go | 192 +++++++++++++++++++ pkg/apiserver/watch_test.go | 6 +- pkg/healthz/healthz.go | 7 +- pkg/master/master.go | 22 +-- 10 files changed, 340 insertions(+), 290 deletions(-) create mode 100644 pkg/apiserver/resthandler.go diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 9fe1de44e9..cbd03f9002 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -25,6 +25,7 @@ import ( "strconv" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" @@ -114,5 +115,13 @@ func main() { }) } - glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix)) + storage, codec := m.API_v1beta1() + s := &http.Server{ + Addr: net.JoinHostPort(*address, strconv.Itoa(int(*port))), + Handler: apiserver.Handle(storage, codec, *apiPrefix), + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + glog.Fatal(s.ListenAndServe()) } diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index b05474d842..c330646119 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -29,6 +29,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" @@ -86,11 +87,11 @@ func startComponents(manifestURL string) (apiServerURL string) { machineList := []string{"localhost", "machine"} handler := delegateHandler{} - apiserver := httptest.NewServer(&handler) + apiServer := httptest.NewServer(&handler) etcdClient := etcd.NewClient(servers) - cl := client.New(apiserver.URL, nil) + cl := client.New(apiServer.URL, nil) cl.PollPeriod = time.Second * 1 cl.Sync = true @@ -101,7 +102,8 @@ func startComponents(manifestURL string) (apiServerURL string) { Minions: machineList, PodInfoGetter: fakePodInfoGetter{}, }) - handler.delegate = m.ConstructHandler("/api/v1beta1") + storage, codec := m.API_v1beta1() + handler.delegate = apiserver.Handle(storage, codec, "/api/v1beta1") controllerManager := controller.MakeReplicationManager(cl) @@ -130,7 +132,7 @@ func startComponents(manifestURL string) (apiServerURL string) { kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251) }, 0) - return apiserver.URL + return apiServer.URL } func runReplicationControllerTest(kubeClient *client.Client) { diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index cc0b0c9d2c..4539dd63f9 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -21,15 +21,12 @@ import ( "fmt" "io/ioutil" "net/http" - "path" "runtime/debug" "strings" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/golang/glog" ) @@ -42,215 +39,104 @@ type Codec interface { DecodeInto(data []byte, obj interface{}) error } -// APIServer is an HTTPHandler that delegates to RESTStorage objects. -// It handles URLs of the form: -// ${prefix}/${storage_key}[/${object_name}] -// Where 'prefix' is an arbitrary string, and 'storage_key' points to a RESTStorage object stored in storage. -// -// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing. -type APIServer struct { - storage map[string]RESTStorage - codec Codec - ops *Operations - asyncOpWait time.Duration - handler http.Handler +// mux is an object that can register http handlers +type mux interface { + Handle(pattern string, handler http.Handler) + HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) } -// New creates a new APIServer object. 'storage' contains a map of handlers. 'codec' -// is an interface for decoding to and from JSON. 'prefix' is the hosting path prefix. +// defaultAPIServer exposes nested objects for testability +type defaultAPIServer struct { + http.Handler + group *APIGroup +} + +// Handle returns a Handler function that expose the provided storage interfaces +// as RESTful resources at prefix, serialized by codec, and also includes the support +// http resources. +func Handle(storage map[string]RESTStorage, codec Codec, prefix string) http.Handler { + group := NewAPIGroup(storage, codec) + + mux := http.NewServeMux() + group.InstallREST(mux, prefix) + InstallSupport(mux) + + return &defaultAPIServer{RecoverPanics(mux), group} +} + +// APIGroup is a http.Handler that exposes multiple RESTStorage objects +// It handles URLs of the form: +// /${storage_key}[/${object_name}] +// Where 'storage_key' points to a RESTStorage object stored in storage. // -// The codec will be used to decode the request body into an object pointer returned by -// RESTStorage.New(). The Create() and Update() methods should cast their argument to -// the type returned by New(). +// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing. +type APIGroup struct { + handler RESTHandler +} + +// NewAPIGroup returns an object that will serve a set of REST resources and their +// associated operations. The provided codec controls serialization and deserialization. +// This is a helper method for registering multiple sets of REST handlers under different +// prefixes onto a server. // TODO: add multitype codec serialization -func New(storage map[string]RESTStorage, codec Codec, prefix string) *APIServer { - s := &APIServer{ +func NewAPIGroup(storage map[string]RESTStorage, codec Codec) *APIGroup { + return &APIGroup{RESTHandler{ storage: storage, codec: codec, ops: NewOperations(), // Delay just long enough to handle most simple write operations asyncOpWait: time.Millisecond * 25, + }} +} + +// InstallREST registers the REST handlers (storage, watch, and operations) into a mux. +// It is expected that the provided prefix will serve all operations. Path MUST NOT end +// in a slash. +func (g *APIGroup) InstallREST(mux mux, paths ...string) { + restHandler := &g.handler + watchHandler := &WatchHandler{g.handler.storage, g.handler.codec} + opHandler := &OperationHandler{g.handler.ops, g.handler.codec} + + for _, prefix := range paths { + prefix = strings.TrimRight(prefix, "/") + mux.Handle(prefix+"/", http.StripPrefix(prefix, restHandler)) + mux.Handle(prefix+"/watch/", http.StripPrefix(prefix+"/watch/", watchHandler)) + mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler)) + mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler)) } +} - mux := http.NewServeMux() - - prefix = strings.TrimRight(prefix, "/") - - // Primary API handlers - restPrefix := prefix + "/" - mux.Handle(restPrefix, http.StripPrefix(restPrefix, http.HandlerFunc(s.handleREST))) - - // Watch API handlers - watchPrefix := path.Join(prefix, "watch") + "/" - mux.Handle(watchPrefix, http.StripPrefix(watchPrefix, &WatchHandler{storage, codec})) - - // Support services for the apiserver - logsPrefix := "/logs/" - mux.Handle(logsPrefix, http.StripPrefix(logsPrefix, http.FileServer(http.Dir("/var/log/")))) +// InstallSupport registers the APIServer support functions into a mux. +func InstallSupport(mux mux) { healthz.InstallHandler(mux) + mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))) + mux.Handle("/proxy/minion/", http.StripPrefix("/proxy/minion", http.HandlerFunc(handleProxyMinion))) mux.HandleFunc("/version", handleVersion) mux.HandleFunc("/", handleIndex) - - // Handle both operations and operations/* with the same handler - handler := &OperationHandler{s.ops, s.codec} - operationPrefix := path.Join(prefix, "operations") - mux.Handle(operationPrefix, http.StripPrefix(operationPrefix, handler)) - operationsPrefix := operationPrefix + "/" - mux.Handle(operationsPrefix, http.StripPrefix(operationsPrefix, handler)) - - // Proxy minion requests - mux.Handle("/proxy/minion/", http.StripPrefix("/proxy/minion", http.HandlerFunc(handleProxyMinion))) - - s.handler = mux - - return s } -// ServeHTTP implements the standard net/http interface. -func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - defer func() { - if x := recover(); x != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(w, "apis panic. Look in log for details.") - glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) - } - }() - defer httplog.MakeLogged(req, &w).StacktraceWhen( - httplog.StatusIsNot( - http.StatusOK, - http.StatusAccepted, - http.StatusConflict, - http.StatusNotFound, - ), - ).Log() - - // Dispatch to the internal handler - s.handler.ServeHTTP(w, req) -} - -// handleREST handles requests to all our RESTStorage objects. -func (s *APIServer) handleREST(w http.ResponseWriter, req *http.Request) { - parts := splitPath(req.URL.Path) - if len(parts) < 1 { - notFound(w, req) - return - } - storage := s.storage[parts[0]] - if storage == nil { - httplog.LogOf(w).Addf("'%v' has no storage object", parts[0]) - notFound(w, req) - return - } - - s.handleRESTStorage(parts, req, w, storage) -} - -// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then -// on path length, according to the following table: -// Method Path Action -// GET /foo list -// GET /foo/bar get 'bar' -// POST /foo create -// PUT /foo/bar update 'bar' -// DELETE /foo/bar delete 'bar' -// Returns 404 if the method/pattern doesn't match one of these entries -// The s accepts several query parameters: -// sync=[false|true] Synchronous request (only applies to create, update, delete operations) -// timeout= Timeout for synchronous requests, only applies if sync=true -// labels= Used for filtering list operations -func (s *APIServer) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) { - sync := req.URL.Query().Get("sync") == "true" - timeout := parseTimeout(req.URL.Query().Get("timeout")) - switch req.Method { - case "GET": - switch len(parts) { - case 1: - selector, err := labels.ParseSelector(req.URL.Query().Get("labels")) - if err != nil { - errorJSON(err, s.codec, w) - return +// RecoverPanics wraps an http Handler to recover and log panics +func RecoverPanics(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer func() { + if x := recover(); x != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, "apis panic. Look in log for details.") + glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) } - list, err := storage.List(selector) - if err != nil { - errorJSON(err, s.codec, w) - return - } - writeJSON(http.StatusOK, s.codec, list, w) - case 2: - item, err := storage.Get(parts[1]) - if err != nil { - errorJSON(err, s.codec, w) - return - } - writeJSON(http.StatusOK, s.codec, item, w) - default: - notFound(w, req) - } + }() + defer httplog.MakeLogged(req, &w).StacktraceWhen( + httplog.StatusIsNot( + http.StatusOK, + http.StatusAccepted, + http.StatusConflict, + http.StatusNotFound, + ), + ).Log() - case "POST": - if len(parts) != 1 { - notFound(w, req) - return - } - body, err := readBody(req) - if err != nil { - errorJSON(err, s.codec, w) - return - } - obj := storage.New() - err = s.codec.DecodeInto(body, obj) - if err != nil { - errorJSON(err, s.codec, w) - return - } - out, err := storage.Create(obj) - if err != nil { - errorJSON(err, s.codec, w) - return - } - op := s.createOperation(out, sync, timeout) - s.finishReq(op, w) - - case "DELETE": - if len(parts) != 2 { - notFound(w, req) - return - } - out, err := storage.Delete(parts[1]) - if err != nil { - errorJSON(err, s.codec, w) - return - } - op := s.createOperation(out, sync, timeout) - s.finishReq(op, w) - - case "PUT": - if len(parts) != 2 { - notFound(w, req) - return - } - body, err := readBody(req) - if err != nil { - errorJSON(err, s.codec, w) - return - } - obj := storage.New() - err = s.codec.DecodeInto(body, obj) - if err != nil { - errorJSON(err, s.codec, w) - return - } - out, err := storage.Update(obj) - if err != nil { - errorJSON(err, s.codec, w) - return - } - op := s.createOperation(out, sync, timeout) - s.finishReq(op, w) - - default: - notFound(w, req) - } + // Dispatch to the internal handler + handler.ServeHTTP(w, req) + }) } // handleVersionReq writes the server's version information. @@ -258,40 +144,6 @@ func handleVersion(w http.ResponseWriter, req *http.Request) { writeRawJSON(http.StatusOK, version.Get(), w) } -// createOperation creates an operation to process a channel response -func (s *APIServer) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation { - op := s.ops.NewOperation(out) - if sync { - op.WaitFor(timeout) - } else if s.asyncOpWait != 0 { - op.WaitFor(s.asyncOpWait) - } - return op -} - -// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an -// Operation to receive the result and returning its ID down the writer. -func (s *APIServer) finishReq(op *Operation, w http.ResponseWriter) { - obj, complete := op.StatusOrResult() - if complete { - status := http.StatusOK - switch stat := obj.(type) { - case api.Status: - httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.") - if stat.Code != 0 { - status = stat.Code - } - case *api.Status: - if stat.Code != 0 { - status = stat.Code - } - } - writeJSON(status, s.codec, obj, w) - } else { - writeJSON(http.StatusAccepted, s.codec, obj, w) - } -} - // writeJSON renders an object as JSON to the response func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseWriter) { output, err := codec.Encode(object) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 4e3a9c0a20..59809d1d19 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -170,7 +170,7 @@ func TestNotFound(t *testing.T) { "watch missing storage": {"GET", "/prefix/version/watch/"}, "watch with bad method": {"POST", "/prefix/version/watch/foo/bar"}, } - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": &SimpleRESTStorage{}, }, codec, "/prefix/version") server := httptest.NewServer(handler) @@ -193,7 +193,7 @@ func TestNotFound(t *testing.T) { } func TestVersion(t *testing.T) { - handler := New(map[string]RESTStorage{}, codec, "/prefix/version") + handler := Handle(map[string]RESTStorage{}, codec, "/prefix/version") server := httptest.NewServer(handler) client := http.Client{} @@ -222,7 +222,7 @@ func TestSimpleList(t *testing.T) { storage := map[string]RESTStorage{} simpleStorage := SimpleRESTStorage{} storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) resp, err := http.Get(server.URL + "/prefix/version/simple") @@ -241,7 +241,7 @@ func TestErrorList(t *testing.T) { errors: map[string]error{"list": fmt.Errorf("test Error")}, } storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) resp, err := http.Get(server.URL + "/prefix/version/simple") @@ -265,7 +265,7 @@ func TestNonEmptyList(t *testing.T) { }, } storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) resp, err := http.Get(server.URL + "/prefix/version/simple") @@ -300,7 +300,7 @@ func TestGet(t *testing.T) { }, } storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) resp, err := http.Get(server.URL + "/prefix/version/simple/id") @@ -321,7 +321,7 @@ func TestGetMissing(t *testing.T) { errors: map[string]error{"get": NewNotFoundErr("simple", "id")}, } storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) resp, err := http.Get(server.URL + "/prefix/version/simple/id") @@ -339,7 +339,7 @@ func TestDelete(t *testing.T) { simpleStorage := SimpleRESTStorage{} ID := "id" storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) client := http.Client{} @@ -361,7 +361,7 @@ func TestDeleteMissing(t *testing.T) { errors: map[string]error{"delete": NewNotFoundErr("simple", ID)}, } storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) client := http.Client{} @@ -381,7 +381,7 @@ func TestUpdate(t *testing.T) { simpleStorage := SimpleRESTStorage{} ID := "id" storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) item := Simple{ @@ -411,7 +411,7 @@ func TestUpdateMissing(t *testing.T) { errors: map[string]error{"update": NewNotFoundErr("simple", ID)}, } storage["simple"] = &simpleStorage - handler := New(storage, codec, "/prefix/version") + handler := Handle(storage, codec, "/prefix/version") server := httptest.NewServer(handler) item := Simple{ @@ -436,10 +436,10 @@ func TestUpdateMissing(t *testing.T) { func TestCreate(t *testing.T) { simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") - handler.asyncOpWait = 0 + handler.(*defaultAPIServer).group.handler.asyncOpWait = 0 server := httptest.NewServer(handler) client := http.Client{} @@ -473,7 +473,7 @@ func TestCreate(t *testing.T) { } func TestCreateNotFound(t *testing.T) { - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "simple": &SimpleRESTStorage{ // storage.Create can fail with not found error in theory. // See https://github.com/GoogleCloudPlatform/kubernetes/pull/486#discussion_r15037092. @@ -519,7 +519,7 @@ func TestSyncCreate(t *testing.T) { return obj, nil }, } - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": &storage, }, codec, "/prefix/version") server := httptest.NewServer(handler) @@ -621,8 +621,8 @@ func TestAsyncDelayReturnsError(t *testing.T) { return nil, NewAlreadyExistsErr("foo", "bar") }, } - handler := New(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version") - handler.asyncOpWait = time.Millisecond / 2 + handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version") + handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2 server := httptest.NewServer(handler) status := expectApiStatus(t, "DELETE", fmt.Sprintf("%s/prefix/version/foo/bar", server.URL), nil, http.StatusConflict) @@ -639,8 +639,8 @@ func TestAsyncCreateError(t *testing.T) { return nil, NewAlreadyExistsErr("foo", "bar") }, } - handler := New(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version") - handler.asyncOpWait = 0 + handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version") + handler.(*defaultAPIServer).group.handler.asyncOpWait = 0 server := httptest.NewServer(handler) simple := Simple{Name: "foo"} @@ -728,7 +728,7 @@ func TestSyncCreateTimeout(t *testing.T) { return obj, nil }, } - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": &storage, }, codec, "/prefix/version") server := httptest.NewServer(handler) diff --git a/pkg/apiserver/minionproxy_test.go b/pkg/apiserver/minionproxy_test.go index e298a9b361..1ceeedf3a6 100644 --- a/pkg/apiserver/minionproxy_test.go +++ b/pkg/apiserver/minionproxy_test.go @@ -127,7 +127,7 @@ func TestApiServerMinionProxy(t *testing.T) { proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { w.Write([]byte(req.URL.Path)) })) - server := httptest.NewServer(New(nil, nil, "/prefix")) + server := httptest.NewServer(Handle(nil, nil, "/prefix")) proxy, _ := url.Parse(proxyServer.URL) resp, err := http.Get(fmt.Sprintf("%s/proxy/minion/%s%s", server.URL, proxy.Host, "/test")) if err != nil { diff --git a/pkg/apiserver/operation_test.go b/pkg/apiserver/operation_test.go index bd082084c2..d596f75d29 100644 --- a/pkg/apiserver/operation_test.go +++ b/pkg/apiserver/operation_test.go @@ -93,10 +93,10 @@ func TestOperation(t *testing.T) { func TestOperationsList(t *testing.T) { simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") - handler.asyncOpWait = 0 + handler.(*defaultAPIServer).group.handler.asyncOpWait = 0 server := httptest.NewServer(handler) client := http.Client{} @@ -105,26 +105,26 @@ func TestOperationsList(t *testing.T) { } data, err := codec.Encode(simple) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } response, err := client.Post(server.URL+"/prefix/version/foo", "application/json", bytes.NewBuffer(data)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } if response.StatusCode != http.StatusAccepted { - t.Errorf("Unexpected response %#v", response) + t.Fatalf("Unexpected response %#v", response) } response, err = client.Get(server.URL + "/prefix/version/operations") if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } if response.StatusCode != http.StatusOK { t.Fatalf("unexpected status code %#v", response) } body, err := ioutil.ReadAll(response.Body) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } obj, err := codec.Decode(body) if err != nil { @@ -149,10 +149,10 @@ func TestOpGet(t *testing.T) { return obj, nil }, } - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") - handler.asyncOpWait = 0 + handler.(*defaultAPIServer).group.handler.asyncOpWait = 0 server := httptest.NewServer(handler) client := http.Client{} @@ -162,27 +162,27 @@ func TestOpGet(t *testing.T) { data, err := codec.Encode(simple) t.Log(string(data)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } response, err := client.Do(request) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } if response.StatusCode != http.StatusAccepted { - t.Errorf("Unexpected response %#v", response) + t.Fatalf("Unexpected response %#v", response) } var itemOut api.Status body, err := extractBody(response, &itemOut) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" { @@ -191,12 +191,12 @@ func TestOpGet(t *testing.T) { req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details.ID, nil) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } _, err = client.Do(req2) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } if response.StatusCode != http.StatusAccepted { diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go new file mode 100644 index 0000000000..b1770c1464 --- /dev/null +++ b/pkg/apiserver/resthandler.go @@ -0,0 +1,192 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "net/http" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +type RESTHandler struct { + storage map[string]RESTStorage + codec Codec + ops *Operations + asyncOpWait time.Duration +} + +// ServeHTTP handles requests to all RESTStorage objects. +func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + parts := splitPath(req.URL.Path) + if len(parts) < 1 { + notFound(w, req) + return + } + storage := h.storage[parts[0]] + if storage == nil { + httplog.LogOf(w).Addf("'%v' has no storage object", parts[0]) + notFound(w, req) + return + } + + h.handleRESTStorage(parts, req, w, storage) +} + +// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then +// on path length, according to the following table: +// Method Path Action +// GET /foo list +// GET /foo/bar get 'bar' +// POST /foo create +// PUT /foo/bar update 'bar' +// DELETE /foo/bar delete 'bar' +// Returns 404 if the method/pattern doesn't match one of these entries +// The s accepts several query parameters: +// sync=[false|true] Synchronous request (only applies to create, update, delete operations) +// timeout= Timeout for synchronous requests, only applies if sync=true +// labels= Used for filtering list operations +func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) { + sync := req.URL.Query().Get("sync") == "true" + timeout := parseTimeout(req.URL.Query().Get("timeout")) + switch req.Method { + case "GET": + switch len(parts) { + case 1: + selector, err := labels.ParseSelector(req.URL.Query().Get("labels")) + if err != nil { + errorJSON(err, h.codec, w) + return + } + list, err := storage.List(selector) + if err != nil { + errorJSON(err, h.codec, w) + return + } + writeJSON(http.StatusOK, h.codec, list, w) + case 2: + item, err := storage.Get(parts[1]) + if err != nil { + errorJSON(err, h.codec, w) + return + } + writeJSON(http.StatusOK, h.codec, item, w) + default: + notFound(w, req) + } + + case "POST": + if len(parts) != 1 { + notFound(w, req) + return + } + body, err := readBody(req) + if err != nil { + errorJSON(err, h.codec, w) + return + } + obj := storage.New() + err = h.codec.DecodeInto(body, obj) + if err != nil { + errorJSON(err, h.codec, w) + return + } + out, err := storage.Create(obj) + if err != nil { + errorJSON(err, h.codec, w) + return + } + op := h.createOperation(out, sync, timeout) + h.finishReq(op, w) + + case "DELETE": + if len(parts) != 2 { + notFound(w, req) + return + } + out, err := storage.Delete(parts[1]) + if err != nil { + errorJSON(err, h.codec, w) + return + } + op := h.createOperation(out, sync, timeout) + h.finishReq(op, w) + + case "PUT": + if len(parts) != 2 { + notFound(w, req) + return + } + body, err := readBody(req) + if err != nil { + errorJSON(err, h.codec, w) + return + } + obj := storage.New() + err = h.codec.DecodeInto(body, obj) + if err != nil { + errorJSON(err, h.codec, w) + return + } + out, err := storage.Update(obj) + if err != nil { + errorJSON(err, h.codec, w) + return + } + op := h.createOperation(out, sync, timeout) + h.finishReq(op, w) + + default: + notFound(w, req) + } +} + +// createOperation creates an operation to process a channel response +func (h *RESTHandler) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation { + op := h.ops.NewOperation(out) + if sync { + op.WaitFor(timeout) + } else if h.asyncOpWait != 0 { + op.WaitFor(h.asyncOpWait) + } + return op +} + +// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an +// Operation to receive the result and returning its ID down the writer. +func (h *RESTHandler) finishReq(op *Operation, w http.ResponseWriter) { + obj, complete := op.StatusOrResult() + if complete { + status := http.StatusOK + switch stat := obj.(type) { + case api.Status: + httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.") + if stat.Code != 0 { + status = stat.Code + } + case *api.Status: + if stat.Code != 0 { + status = stat.Code + } + } + writeJSON(status, h.codec, obj, w) + } else { + writeJSON(http.StatusAccepted, h.codec, obj, w) + } +} diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index dbbfa7deda..09f7952f7d 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -41,7 +41,7 @@ var watchTestTable = []struct { func TestWatchWebsocket(t *testing.T) { simpleStorage := &SimpleRESTStorage{} _ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work. - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") server := httptest.NewServer(handler) @@ -87,7 +87,7 @@ func TestWatchWebsocket(t *testing.T) { func TestWatchHTTP(t *testing.T) { simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") server := httptest.NewServer(handler) @@ -144,7 +144,7 @@ func TestWatchHTTP(t *testing.T) { func TestWatchParamParsing(t *testing.T) { simpleStorage := &SimpleRESTStorage{} - handler := New(map[string]RESTStorage{ + handler := Handle(map[string]RESTStorage{ "foo": simpleStorage, }, codec, "/prefix/version") server := httptest.NewServer(handler) diff --git a/pkg/healthz/healthz.go b/pkg/healthz/healthz.go index 7671fed52c..f4542e5d0b 100644 --- a/pkg/healthz/healthz.go +++ b/pkg/healthz/healthz.go @@ -20,6 +20,11 @@ import ( "net/http" ) +// mux is an interface describing the methods InstallHandler requires. +type mux interface { + HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) +} + func init() { http.HandleFunc("/healthz", handleHealthz) } @@ -31,6 +36,6 @@ func handleHealthz(w http.ResponseWriter, r *http.Request) { } // InstallHandler registers a handler for health checking on the path "/healthz" to mux. -func InstallHandler(mux *http.ServeMux) { +func InstallHandler(mux mux) { mux.HandleFunc("/healthz", handleHealthz) } diff --git a/pkg/master/master.go b/pkg/master/master.go index e3cafe5963..917ec11408 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -139,21 +139,11 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf } } -// Run begins serving the Kubernetes API. It never returns. -func (m *Master) Run(myAddress, apiPrefix string) error { - s := &http.Server{ - Addr: myAddress, - Handler: m.ConstructHandler(apiPrefix), - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, +// API_v1beta1 returns the resources and codec for API version v1beta1 +func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, apiserver.Codec) { + storage := make(map[string]apiserver.RESTStorage) + for k, v := range m.storage { + storage[k] = v } - return s.ListenAndServe() -} - -// ConstructHandler returns an http.Handler which serves the Kubernetes API. -// Instead of calling Run, you can call this function to get a handler for your own server. -// It is intended for testing. Only call once. -func (m *Master) ConstructHandler(apiPrefix string) http.Handler { - return apiserver.New(m.storage, api.Codec, apiPrefix) + return storage, api.Codec }