Extract RESTHandler and allow API groupings

Prepare for running multiple API versions on the same HTTP server
by decoupling some of the mechanics of apiserver.  Define a new
APIGroup object which represents a version of the API.
pull/6/head
Clayton Coleman 2014-08-09 17:12:55 -04:00
parent aeea1b1e06
commit bbf3b55e76
10 changed files with 340 additions and 290 deletions

View File

@ -25,6 +25,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
@ -114,5 +115,13 @@ func main() {
}) })
} }
glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix)) storage, codec := m.API_v1beta1()
s := &http.Server{
Addr: net.JoinHostPort(*address, strconv.Itoa(int(*port))),
Handler: apiserver.Handle(storage, codec, *apiPrefix),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
glog.Fatal(s.ListenAndServe())
} }

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
@ -86,11 +87,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
machineList := []string{"localhost", "machine"} machineList := []string{"localhost", "machine"}
handler := delegateHandler{} handler := delegateHandler{}
apiserver := httptest.NewServer(&handler) apiServer := httptest.NewServer(&handler)
etcdClient := etcd.NewClient(servers) etcdClient := etcd.NewClient(servers)
cl := client.New(apiserver.URL, nil) cl := client.New(apiServer.URL, nil)
cl.PollPeriod = time.Second * 1 cl.PollPeriod = time.Second * 1
cl.Sync = true cl.Sync = true
@ -101,7 +102,8 @@ func startComponents(manifestURL string) (apiServerURL string) {
Minions: machineList, Minions: machineList,
PodInfoGetter: fakePodInfoGetter{}, PodInfoGetter: fakePodInfoGetter{},
}) })
handler.delegate = m.ConstructHandler("/api/v1beta1") storage, codec := m.API_v1beta1()
handler.delegate = apiserver.Handle(storage, codec, "/api/v1beta1")
controllerManager := controller.MakeReplicationManager(cl) controllerManager := controller.MakeReplicationManager(cl)
@ -130,7 +132,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251) kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251)
}, 0) }, 0)
return apiserver.URL return apiServer.URL
} }
func runReplicationControllerTest(kubeClient *client.Client) { func runReplicationControllerTest(kubeClient *client.Client) {

View File

@ -21,15 +21,12 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path"
"runtime/debug" "runtime/debug"
"strings" "strings"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -42,215 +39,104 @@ type Codec interface {
DecodeInto(data []byte, obj interface{}) error DecodeInto(data []byte, obj interface{}) error
} }
// APIServer is an HTTPHandler that delegates to RESTStorage objects. // mux is an object that can register http handlers
// It handles URLs of the form: type mux interface {
// ${prefix}/${storage_key}[/${object_name}] Handle(pattern string, handler http.Handler)
// Where 'prefix' is an arbitrary string, and 'storage_key' points to a RESTStorage object stored in storage. HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
//
// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing.
type APIServer struct {
storage map[string]RESTStorage
codec Codec
ops *Operations
asyncOpWait time.Duration
handler http.Handler
} }
// New creates a new APIServer object. 'storage' contains a map of handlers. 'codec' // defaultAPIServer exposes nested objects for testability
// is an interface for decoding to and from JSON. 'prefix' is the hosting path prefix. type defaultAPIServer struct {
http.Handler
group *APIGroup
}
// Handle returns a Handler function that expose the provided storage interfaces
// as RESTful resources at prefix, serialized by codec, and also includes the support
// http resources.
func Handle(storage map[string]RESTStorage, codec Codec, prefix string) http.Handler {
group := NewAPIGroup(storage, codec)
mux := http.NewServeMux()
group.InstallREST(mux, prefix)
InstallSupport(mux)
return &defaultAPIServer{RecoverPanics(mux), group}
}
// APIGroup is a http.Handler that exposes multiple RESTStorage objects
// It handles URLs of the form:
// /${storage_key}[/${object_name}]
// Where 'storage_key' points to a RESTStorage object stored in storage.
// //
// The codec will be used to decode the request body into an object pointer returned by // TODO: consider migrating this to go-restful which is a more full-featured version of the same thing.
// RESTStorage.New(). The Create() and Update() methods should cast their argument to type APIGroup struct {
// the type returned by New(). handler RESTHandler
}
// NewAPIGroup returns an object that will serve a set of REST resources and their
// associated operations. The provided codec controls serialization and deserialization.
// This is a helper method for registering multiple sets of REST handlers under different
// prefixes onto a server.
// TODO: add multitype codec serialization // TODO: add multitype codec serialization
func New(storage map[string]RESTStorage, codec Codec, prefix string) *APIServer { func NewAPIGroup(storage map[string]RESTStorage, codec Codec) *APIGroup {
s := &APIServer{ return &APIGroup{RESTHandler{
storage: storage, storage: storage,
codec: codec, codec: codec,
ops: NewOperations(), ops: NewOperations(),
// Delay just long enough to handle most simple write operations // Delay just long enough to handle most simple write operations
asyncOpWait: time.Millisecond * 25, asyncOpWait: time.Millisecond * 25,
}}
}
// InstallREST registers the REST handlers (storage, watch, and operations) into a mux.
// It is expected that the provided prefix will serve all operations. Path MUST NOT end
// in a slash.
func (g *APIGroup) InstallREST(mux mux, paths ...string) {
restHandler := &g.handler
watchHandler := &WatchHandler{g.handler.storage, g.handler.codec}
opHandler := &OperationHandler{g.handler.ops, g.handler.codec}
for _, prefix := range paths {
prefix = strings.TrimRight(prefix, "/")
mux.Handle(prefix+"/", http.StripPrefix(prefix, restHandler))
mux.Handle(prefix+"/watch/", http.StripPrefix(prefix+"/watch/", watchHandler))
mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler))
mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler))
} }
}
mux := http.NewServeMux() // InstallSupport registers the APIServer support functions into a mux.
func InstallSupport(mux mux) {
prefix = strings.TrimRight(prefix, "/")
// Primary API handlers
restPrefix := prefix + "/"
mux.Handle(restPrefix, http.StripPrefix(restPrefix, http.HandlerFunc(s.handleREST)))
// Watch API handlers
watchPrefix := path.Join(prefix, "watch") + "/"
mux.Handle(watchPrefix, http.StripPrefix(watchPrefix, &WatchHandler{storage, codec}))
// Support services for the apiserver
logsPrefix := "/logs/"
mux.Handle(logsPrefix, http.StripPrefix(logsPrefix, http.FileServer(http.Dir("/var/log/"))))
healthz.InstallHandler(mux) healthz.InstallHandler(mux)
mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))))
mux.Handle("/proxy/minion/", http.StripPrefix("/proxy/minion", http.HandlerFunc(handleProxyMinion)))
mux.HandleFunc("/version", handleVersion) mux.HandleFunc("/version", handleVersion)
mux.HandleFunc("/", handleIndex) mux.HandleFunc("/", handleIndex)
// Handle both operations and operations/* with the same handler
handler := &OperationHandler{s.ops, s.codec}
operationPrefix := path.Join(prefix, "operations")
mux.Handle(operationPrefix, http.StripPrefix(operationPrefix, handler))
operationsPrefix := operationPrefix + "/"
mux.Handle(operationsPrefix, http.StripPrefix(operationsPrefix, handler))
// Proxy minion requests
mux.Handle("/proxy/minion/", http.StripPrefix("/proxy/minion", http.HandlerFunc(handleProxyMinion)))
s.handler = mux
return s
} }
// ServeHTTP implements the standard net/http interface. // RecoverPanics wraps an http Handler to recover and log panics
func (s *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { func RecoverPanics(handler http.Handler) http.Handler {
defer func() { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if x := recover(); x != nil { defer func() {
w.WriteHeader(http.StatusInternalServerError) if x := recover(); x != nil {
fmt.Fprint(w, "apis panic. Look in log for details.") w.WriteHeader(http.StatusInternalServerError)
glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) fmt.Fprint(w, "apis panic. Look in log for details.")
} glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack())
}()
defer httplog.MakeLogged(req, &w).StacktraceWhen(
httplog.StatusIsNot(
http.StatusOK,
http.StatusAccepted,
http.StatusConflict,
http.StatusNotFound,
),
).Log()
// Dispatch to the internal handler
s.handler.ServeHTTP(w, req)
}
// handleREST handles requests to all our RESTStorage objects.
func (s *APIServer) handleREST(w http.ResponseWriter, req *http.Request) {
parts := splitPath(req.URL.Path)
if len(parts) < 1 {
notFound(w, req)
return
}
storage := s.storage[parts[0]]
if storage == nil {
httplog.LogOf(w).Addf("'%v' has no storage object", parts[0])
notFound(w, req)
return
}
s.handleRESTStorage(parts, req, w, storage)
}
// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then
// on path length, according to the following table:
// Method Path Action
// GET /foo list
// GET /foo/bar get 'bar'
// POST /foo create
// PUT /foo/bar update 'bar'
// DELETE /foo/bar delete 'bar'
// Returns 404 if the method/pattern doesn't match one of these entries
// The s accepts several query parameters:
// sync=[false|true] Synchronous request (only applies to create, update, delete operations)
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true
// labels=<label-selector> Used for filtering list operations
func (s *APIServer) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
sync := req.URL.Query().Get("sync") == "true"
timeout := parseTimeout(req.URL.Query().Get("timeout"))
switch req.Method {
case "GET":
switch len(parts) {
case 1:
selector, err := labels.ParseSelector(req.URL.Query().Get("labels"))
if err != nil {
errorJSON(err, s.codec, w)
return
} }
list, err := storage.List(selector) }()
if err != nil { defer httplog.MakeLogged(req, &w).StacktraceWhen(
errorJSON(err, s.codec, w) httplog.StatusIsNot(
return http.StatusOK,
} http.StatusAccepted,
writeJSON(http.StatusOK, s.codec, list, w) http.StatusConflict,
case 2: http.StatusNotFound,
item, err := storage.Get(parts[1]) ),
if err != nil { ).Log()
errorJSON(err, s.codec, w)
return
}
writeJSON(http.StatusOK, s.codec, item, w)
default:
notFound(w, req)
}
case "POST": // Dispatch to the internal handler
if len(parts) != 1 { handler.ServeHTTP(w, req)
notFound(w, req) })
return
}
body, err := readBody(req)
if err != nil {
errorJSON(err, s.codec, w)
return
}
obj := storage.New()
err = s.codec.DecodeInto(body, obj)
if err != nil {
errorJSON(err, s.codec, w)
return
}
out, err := storage.Create(obj)
if err != nil {
errorJSON(err, s.codec, w)
return
}
op := s.createOperation(out, sync, timeout)
s.finishReq(op, w)
case "DELETE":
if len(parts) != 2 {
notFound(w, req)
return
}
out, err := storage.Delete(parts[1])
if err != nil {
errorJSON(err, s.codec, w)
return
}
op := s.createOperation(out, sync, timeout)
s.finishReq(op, w)
case "PUT":
if len(parts) != 2 {
notFound(w, req)
return
}
body, err := readBody(req)
if err != nil {
errorJSON(err, s.codec, w)
return
}
obj := storage.New()
err = s.codec.DecodeInto(body, obj)
if err != nil {
errorJSON(err, s.codec, w)
return
}
out, err := storage.Update(obj)
if err != nil {
errorJSON(err, s.codec, w)
return
}
op := s.createOperation(out, sync, timeout)
s.finishReq(op, w)
default:
notFound(w, req)
}
} }
// handleVersionReq writes the server's version information. // handleVersionReq writes the server's version information.
@ -258,40 +144,6 @@ func handleVersion(w http.ResponseWriter, req *http.Request) {
writeRawJSON(http.StatusOK, version.Get(), w) writeRawJSON(http.StatusOK, version.Get(), w)
} }
// createOperation creates an operation to process a channel response
func (s *APIServer) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation {
op := s.ops.NewOperation(out)
if sync {
op.WaitFor(timeout)
} else if s.asyncOpWait != 0 {
op.WaitFor(s.asyncOpWait)
}
return op
}
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
// Operation to receive the result and returning its ID down the writer.
func (s *APIServer) finishReq(op *Operation, w http.ResponseWriter) {
obj, complete := op.StatusOrResult()
if complete {
status := http.StatusOK
switch stat := obj.(type) {
case api.Status:
httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
if stat.Code != 0 {
status = stat.Code
}
case *api.Status:
if stat.Code != 0 {
status = stat.Code
}
}
writeJSON(status, s.codec, obj, w)
} else {
writeJSON(http.StatusAccepted, s.codec, obj, w)
}
}
// writeJSON renders an object as JSON to the response // writeJSON renders an object as JSON to the response
func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseWriter) { func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseWriter) {
output, err := codec.Encode(object) output, err := codec.Encode(object)

View File

@ -170,7 +170,7 @@ func TestNotFound(t *testing.T) {
"watch missing storage": {"GET", "/prefix/version/watch/"}, "watch missing storage": {"GET", "/prefix/version/watch/"},
"watch with bad method": {"POST", "/prefix/version/watch/foo/bar"}, "watch with bad method": {"POST", "/prefix/version/watch/foo/bar"},
} }
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": &SimpleRESTStorage{}, "foo": &SimpleRESTStorage{},
}, codec, "/prefix/version") }, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
@ -193,7 +193,7 @@ func TestNotFound(t *testing.T) {
} }
func TestVersion(t *testing.T) { func TestVersion(t *testing.T) {
handler := New(map[string]RESTStorage{}, codec, "/prefix/version") handler := Handle(map[string]RESTStorage{}, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
client := http.Client{} client := http.Client{}
@ -222,7 +222,7 @@ func TestSimpleList(t *testing.T) {
storage := map[string]RESTStorage{} storage := map[string]RESTStorage{}
simpleStorage := SimpleRESTStorage{} simpleStorage := SimpleRESTStorage{}
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
resp, err := http.Get(server.URL + "/prefix/version/simple") resp, err := http.Get(server.URL + "/prefix/version/simple")
@ -241,7 +241,7 @@ func TestErrorList(t *testing.T) {
errors: map[string]error{"list": fmt.Errorf("test Error")}, errors: map[string]error{"list": fmt.Errorf("test Error")},
} }
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
resp, err := http.Get(server.URL + "/prefix/version/simple") resp, err := http.Get(server.URL + "/prefix/version/simple")
@ -265,7 +265,7 @@ func TestNonEmptyList(t *testing.T) {
}, },
} }
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
resp, err := http.Get(server.URL + "/prefix/version/simple") resp, err := http.Get(server.URL + "/prefix/version/simple")
@ -300,7 +300,7 @@ func TestGet(t *testing.T) {
}, },
} }
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
resp, err := http.Get(server.URL + "/prefix/version/simple/id") resp, err := http.Get(server.URL + "/prefix/version/simple/id")
@ -321,7 +321,7 @@ func TestGetMissing(t *testing.T) {
errors: map[string]error{"get": NewNotFoundErr("simple", "id")}, errors: map[string]error{"get": NewNotFoundErr("simple", "id")},
} }
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
resp, err := http.Get(server.URL + "/prefix/version/simple/id") resp, err := http.Get(server.URL + "/prefix/version/simple/id")
@ -339,7 +339,7 @@ func TestDelete(t *testing.T) {
simpleStorage := SimpleRESTStorage{} simpleStorage := SimpleRESTStorage{}
ID := "id" ID := "id"
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
client := http.Client{} client := http.Client{}
@ -361,7 +361,7 @@ func TestDeleteMissing(t *testing.T) {
errors: map[string]error{"delete": NewNotFoundErr("simple", ID)}, errors: map[string]error{"delete": NewNotFoundErr("simple", ID)},
} }
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
client := http.Client{} client := http.Client{}
@ -381,7 +381,7 @@ func TestUpdate(t *testing.T) {
simpleStorage := SimpleRESTStorage{} simpleStorage := SimpleRESTStorage{}
ID := "id" ID := "id"
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
item := Simple{ item := Simple{
@ -411,7 +411,7 @@ func TestUpdateMissing(t *testing.T) {
errors: map[string]error{"update": NewNotFoundErr("simple", ID)}, errors: map[string]error{"update": NewNotFoundErr("simple", ID)},
} }
storage["simple"] = &simpleStorage storage["simple"] = &simpleStorage
handler := New(storage, codec, "/prefix/version") handler := Handle(storage, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
item := Simple{ item := Simple{
@ -436,10 +436,10 @@ func TestUpdateMissing(t *testing.T) {
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
handler.asyncOpWait = 0 handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
client := http.Client{} client := http.Client{}
@ -473,7 +473,7 @@ func TestCreate(t *testing.T) {
} }
func TestCreateNotFound(t *testing.T) { func TestCreateNotFound(t *testing.T) {
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"simple": &SimpleRESTStorage{ "simple": &SimpleRESTStorage{
// storage.Create can fail with not found error in theory. // storage.Create can fail with not found error in theory.
// See https://github.com/GoogleCloudPlatform/kubernetes/pull/486#discussion_r15037092. // See https://github.com/GoogleCloudPlatform/kubernetes/pull/486#discussion_r15037092.
@ -519,7 +519,7 @@ func TestSyncCreate(t *testing.T) {
return obj, nil return obj, nil
}, },
} }
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": &storage, "foo": &storage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
@ -621,8 +621,8 @@ func TestAsyncDelayReturnsError(t *testing.T) {
return nil, NewAlreadyExistsErr("foo", "bar") return nil, NewAlreadyExistsErr("foo", "bar")
}, },
} }
handler := New(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version") handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
handler.asyncOpWait = time.Millisecond / 2 handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
status := expectApiStatus(t, "DELETE", fmt.Sprintf("%s/prefix/version/foo/bar", server.URL), nil, http.StatusConflict) status := expectApiStatus(t, "DELETE", fmt.Sprintf("%s/prefix/version/foo/bar", server.URL), nil, http.StatusConflict)
@ -639,8 +639,8 @@ func TestAsyncCreateError(t *testing.T) {
return nil, NewAlreadyExistsErr("foo", "bar") return nil, NewAlreadyExistsErr("foo", "bar")
}, },
} }
handler := New(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version") handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix/version")
handler.asyncOpWait = 0 handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
simple := Simple{Name: "foo"} simple := Simple{Name: "foo"}
@ -728,7 +728,7 @@ func TestSyncCreateTimeout(t *testing.T) {
return obj, nil return obj, nil
}, },
} }
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": &storage, "foo": &storage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)

View File

@ -127,7 +127,7 @@ func TestApiServerMinionProxy(t *testing.T) {
proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(req.URL.Path)) w.Write([]byte(req.URL.Path))
})) }))
server := httptest.NewServer(New(nil, nil, "/prefix")) server := httptest.NewServer(Handle(nil, nil, "/prefix"))
proxy, _ := url.Parse(proxyServer.URL) proxy, _ := url.Parse(proxyServer.URL)
resp, err := http.Get(fmt.Sprintf("%s/proxy/minion/%s%s", server.URL, proxy.Host, "/test")) resp, err := http.Get(fmt.Sprintf("%s/proxy/minion/%s%s", server.URL, proxy.Host, "/test"))
if err != nil { if err != nil {

View File

@ -93,10 +93,10 @@ func TestOperation(t *testing.T) {
func TestOperationsList(t *testing.T) { func TestOperationsList(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
handler.asyncOpWait = 0 handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
client := http.Client{} client := http.Client{}
@ -105,26 +105,26 @@ func TestOperationsList(t *testing.T) {
} }
data, err := codec.Encode(simple) data, err := codec.Encode(simple)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
response, err := client.Post(server.URL+"/prefix/version/foo", "application/json", bytes.NewBuffer(data)) response, err := client.Post(server.URL+"/prefix/version/foo", "application/json", bytes.NewBuffer(data))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
if response.StatusCode != http.StatusAccepted { if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response) t.Fatalf("Unexpected response %#v", response)
} }
response, err = client.Get(server.URL + "/prefix/version/operations") response, err = client.Get(server.URL + "/prefix/version/operations")
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
if response.StatusCode != http.StatusOK { if response.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code %#v", response) t.Fatalf("unexpected status code %#v", response)
} }
body, err := ioutil.ReadAll(response.Body) body, err := ioutil.ReadAll(response.Body)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
obj, err := codec.Decode(body) obj, err := codec.Decode(body)
if err != nil { if err != nil {
@ -149,10 +149,10 @@ func TestOpGet(t *testing.T) {
return obj, nil return obj, nil
}, },
} }
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
handler.asyncOpWait = 0 handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
client := http.Client{} client := http.Client{}
@ -162,27 +162,27 @@ func TestOpGet(t *testing.T) {
data, err := codec.Encode(simple) data, err := codec.Encode(simple)
t.Log(string(data)) t.Log(string(data))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data)) request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
response, err := client.Do(request) response, err := client.Do(request)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
if response.StatusCode != http.StatusAccepted { if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response) t.Fatalf("Unexpected response %#v", response)
} }
var itemOut api.Status var itemOut api.Status
body, err := extractBody(response, &itemOut) body, err := extractBody(response, &itemOut)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" { if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
@ -191,12 +191,12 @@ func TestOpGet(t *testing.T) {
req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details.ID, nil) req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details.ID, nil)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
_, err = client.Do(req2) _, err = client.Do(req2)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
if response.StatusCode != http.StatusAccepted { if response.StatusCode != http.StatusAccepted {

View File

@ -0,0 +1,192 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"net/http"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
type RESTHandler struct {
storage map[string]RESTStorage
codec Codec
ops *Operations
asyncOpWait time.Duration
}
// ServeHTTP handles requests to all RESTStorage objects.
func (h *RESTHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
parts := splitPath(req.URL.Path)
if len(parts) < 1 {
notFound(w, req)
return
}
storage := h.storage[parts[0]]
if storage == nil {
httplog.LogOf(w).Addf("'%v' has no storage object", parts[0])
notFound(w, req)
return
}
h.handleRESTStorage(parts, req, w, storage)
}
// handleRESTStorage is the main dispatcher for a storage object. It switches on the HTTP method, and then
// on path length, according to the following table:
// Method Path Action
// GET /foo list
// GET /foo/bar get 'bar'
// POST /foo create
// PUT /foo/bar update 'bar'
// DELETE /foo/bar delete 'bar'
// Returns 404 if the method/pattern doesn't match one of these entries
// The s accepts several query parameters:
// sync=[false|true] Synchronous request (only applies to create, update, delete operations)
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true
// labels=<label-selector> Used for filtering list operations
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
sync := req.URL.Query().Get("sync") == "true"
timeout := parseTimeout(req.URL.Query().Get("timeout"))
switch req.Method {
case "GET":
switch len(parts) {
case 1:
selector, err := labels.ParseSelector(req.URL.Query().Get("labels"))
if err != nil {
errorJSON(err, h.codec, w)
return
}
list, err := storage.List(selector)
if err != nil {
errorJSON(err, h.codec, w)
return
}
writeJSON(http.StatusOK, h.codec, list, w)
case 2:
item, err := storage.Get(parts[1])
if err != nil {
errorJSON(err, h.codec, w)
return
}
writeJSON(http.StatusOK, h.codec, item, w)
default:
notFound(w, req)
}
case "POST":
if len(parts) != 1 {
notFound(w, req)
return
}
body, err := readBody(req)
if err != nil {
errorJSON(err, h.codec, w)
return
}
obj := storage.New()
err = h.codec.DecodeInto(body, obj)
if err != nil {
errorJSON(err, h.codec, w)
return
}
out, err := storage.Create(obj)
if err != nil {
errorJSON(err, h.codec, w)
return
}
op := h.createOperation(out, sync, timeout)
h.finishReq(op, w)
case "DELETE":
if len(parts) != 2 {
notFound(w, req)
return
}
out, err := storage.Delete(parts[1])
if err != nil {
errorJSON(err, h.codec, w)
return
}
op := h.createOperation(out, sync, timeout)
h.finishReq(op, w)
case "PUT":
if len(parts) != 2 {
notFound(w, req)
return
}
body, err := readBody(req)
if err != nil {
errorJSON(err, h.codec, w)
return
}
obj := storage.New()
err = h.codec.DecodeInto(body, obj)
if err != nil {
errorJSON(err, h.codec, w)
return
}
out, err := storage.Update(obj)
if err != nil {
errorJSON(err, h.codec, w)
return
}
op := h.createOperation(out, sync, timeout)
h.finishReq(op, w)
default:
notFound(w, req)
}
}
// createOperation creates an operation to process a channel response
func (h *RESTHandler) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation {
op := h.ops.NewOperation(out)
if sync {
op.WaitFor(timeout)
} else if h.asyncOpWait != 0 {
op.WaitFor(h.asyncOpWait)
}
return op
}
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
// Operation to receive the result and returning its ID down the writer.
func (h *RESTHandler) finishReq(op *Operation, w http.ResponseWriter) {
obj, complete := op.StatusOrResult()
if complete {
status := http.StatusOK
switch stat := obj.(type) {
case api.Status:
httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
if stat.Code != 0 {
status = stat.Code
}
case *api.Status:
if stat.Code != 0 {
status = stat.Code
}
}
writeJSON(status, h.codec, obj, w)
} else {
writeJSON(http.StatusAccepted, h.codec, obj, w)
}
}

View File

@ -41,7 +41,7 @@ var watchTestTable = []struct {
func TestWatchWebsocket(t *testing.T) { func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work. _ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
@ -87,7 +87,7 @@ func TestWatchWebsocket(t *testing.T) {
func TestWatchHTTP(t *testing.T) { func TestWatchHTTP(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
@ -144,7 +144,7 @@ func TestWatchHTTP(t *testing.T) {
func TestWatchParamParsing(t *testing.T) { func TestWatchParamParsing(t *testing.T) {
simpleStorage := &SimpleRESTStorage{} simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{ handler := Handle(map[string]RESTStorage{
"foo": simpleStorage, "foo": simpleStorage,
}, codec, "/prefix/version") }, codec, "/prefix/version")
server := httptest.NewServer(handler) server := httptest.NewServer(handler)

View File

@ -20,6 +20,11 @@ import (
"net/http" "net/http"
) )
// mux is an interface describing the methods InstallHandler requires.
type mux interface {
HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))
}
func init() { func init() {
http.HandleFunc("/healthz", handleHealthz) http.HandleFunc("/healthz", handleHealthz)
} }
@ -31,6 +36,6 @@ func handleHealthz(w http.ResponseWriter, r *http.Request) {
} }
// InstallHandler registers a handler for health checking on the path "/healthz" to mux. // InstallHandler registers a handler for health checking on the path "/healthz" to mux.
func InstallHandler(mux *http.ServeMux) { func InstallHandler(mux mux) {
mux.HandleFunc("/healthz", handleHealthz) mux.HandleFunc("/healthz", handleHealthz)
} }

View File

@ -139,21 +139,11 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
} }
} }
// Run begins serving the Kubernetes API. It never returns. // API_v1beta1 returns the resources and codec for API version v1beta1
func (m *Master) Run(myAddress, apiPrefix string) error { func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, apiserver.Codec) {
s := &http.Server{ storage := make(map[string]apiserver.RESTStorage)
Addr: myAddress, for k, v := range m.storage {
Handler: m.ConstructHandler(apiPrefix), storage[k] = v
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
} }
return s.ListenAndServe() return storage, api.Codec
}
// ConstructHandler returns an http.Handler which serves the Kubernetes API.
// Instead of calling Run, you can call this function to get a handler for your own server.
// It is intended for testing. Only call once.
func (m *Master) ConstructHandler(apiPrefix string) http.Handler {
return apiserver.New(m.storage, api.Codec, apiPrefix)
} }