From 3b9735d787523b3f3d70bd50e635dac36c2d0e43 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 2 Jul 2014 13:51:27 -0700 Subject: [PATCH] Test atomic PUTs, and make them work. Improve apiserver/logger.go's interface (it's pretty cool now). Improve apiserver's error reporting to clients. Improve client's handling of errors from apiserver. Make failed PUTs return 409 (conflict)-- http status codes are amazingly well defined for what we're doing! --- cmd/integration/integration.go | 86 +++++++++++++++++++++++++++++++++- pkg/api/helper.go | 17 +++++++ pkg/api/types.go | 6 ++- pkg/apiserver/apiserver.go | 36 +++++++++++--- pkg/apiserver/logger.go | 68 +++++++++++++++++++++++---- pkg/client/client.go | 23 ++++++--- pkg/kubelet/kubelet_server.go | 4 +- pkg/tools/etcd_tools.go | 30 +++++++----- 8 files changed, 229 insertions(+), 41 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index e6e9f183be..12239f59a8 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -23,7 +23,9 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "reflect" "runtime" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -128,8 +130,73 @@ func runReplicationControllerTest(kubeClient *client.Client) { glog.Infof("Replication controller produced:\n\n%#v\n\n", pods) } +func runAtomicPutTest(c *client.Client) { + var svc api.Service + err := c.Post().Path("services").Body( + api.Service{ + JSONBase: api.JSONBase{ID: "atomicService"}, + Port: 12345, + Labels: map[string]string{ + "name": "atomicService", + }, + }, + ).Do().Into(&svc) + if err != nil { + glog.Fatalf("Failed creating atomicService: %v", err) + } + + testLabels := labels.Set{} + for i := 0; i < 26; i++ { + // a: z, b: y, etc... + testLabels[string([]byte{byte('a' + i)})] = string([]byte{byte('z' - i)}) + } + var wg sync.WaitGroup + wg.Add(len(testLabels)) + for label, value := range testLabels { + go func(l, v string) { + for { + var tmpSvc api.Service + err := c.Get().Path("services").Path(svc.ID).Do().Into(&tmpSvc) + if err != nil { + glog.Errorf("Error getting atomicService: %v", err) + continue + } + if tmpSvc.Selector == nil { + tmpSvc.Selector = map[string]string{l: v} + } else { + tmpSvc.Selector[l] = v + } + err = c.Put().Path("services").Path(svc.ID).Body(&tmpSvc).Do().Error() + if err != nil { + if se, ok := err.(*client.StatusErr); ok { + if se.Status.Code == http.StatusConflict { + // This is what we expect. + continue + } + } + glog.Errorf("Unexpected error putting atomicService: %v", err) + continue + } + break + } + wg.Done() + }(label, value) + } + wg.Wait() + err = c.Get().Path("services").Path(svc.ID).Do().Into(&svc) + if err != nil { + glog.Fatalf("Failed getting atomicService after writers are complete: %v", err) + } + if !reflect.DeepEqual(testLabels, labels.Set(svc.Selector)) { + glog.Fatalf("Selector PUTs were not atomic: wanted %v, got %v", testLabels, svc.Selector) + } + glog.Info("Atomic PUTs work.") +} + +type testFunc func(*client.Client) + func main() { - runtime.GOMAXPROCS(4) + runtime.GOMAXPROCS(runtime.NumCPU()) util.ReallyCrash = true util.InitLogs() defer util.FlushLogs() @@ -150,7 +217,22 @@ func main() { time.Sleep(time.Second * 10) kubeClient := client.New(apiServerURL, nil) - runReplicationControllerTest(kubeClient) + + // Run tests in parallel + testFuncs := []testFunc{ + runReplicationControllerTest, + runAtomicPutTest, + } + var wg sync.WaitGroup + wg.Add(len(testFuncs)) + for i := range testFuncs { + f := testFuncs[i] + go func() { + f(kubeClient) + wg.Done() + }() + } + wg.Wait() // Check that kubelet tried to make the pods. // Using a set to list unique creation attempts. Our fake is diff --git a/pkg/api/helper.go b/pkg/api/helper.go index 4c92fd1944..2d031d83dd 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -56,6 +56,23 @@ func FindJSONBase(obj interface{}) (*JSONBase, error) { return jsonBase, err } +// Takes an arbitary api type, return a copy of its JSONBase field. +// obj may be a pointer to an api type, or a non-pointer struct api type. +func FindJSONBaseRO(obj interface{}) (JSONBase, error) { + v := reflect.ValueOf(obj) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + if v.Kind() != reflect.Struct { + return JSONBase{}, fmt.Errorf("expected struct, but got %v", v.Type().Name()) + } + jsonBase := v.FieldByName("JSONBase") + if !jsonBase.IsValid() { + return JSONBase{}, fmt.Errorf("struct %v lacks embedded JSON type", v.Type().Name()) + } + return jsonBase.Interface().(JSONBase), nil +} + // Encode turns the given api object into an appropriate JSON string. // Will return an error if the object doesn't have an embedded JSONBase. // Obj may be a pointer to a struct, or a struct. If a struct, a copy diff --git a/pkg/api/types.go b/pkg/api/types.go index 5104d60f06..4627481e8a 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -267,8 +267,12 @@ type Status struct { // One of: "success", "failure", "working" (for operations not yet completed) // TODO: if "working", include an operation identifier so final status can be // checked. - Status string `json:"status,omitempty" yaml:"status,omitempty"` + Status string `json:"status,omitempty" yaml:"status,omitempty"` + // Details about the status. May be an error description or an + // operation number for later polling. Details string `json:"details,omitempty" yaml:"details,omitempty"` + // Suggested HTTP return code for this status, 0 if not set. + Code int `json:"code,omitempty" yaml:"code,omitempty"` } // Values of Status.Status diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 92cfe11c72..64e43b95c1 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) @@ -54,9 +55,15 @@ func MakeAsync(fn WorkFunc) <-chan interface{} { defer util.HandleCrash() obj, err := fn() if err != nil { + status := http.StatusInternalServerError + switch { + case tools.IsEtcdConflict(err): + status = http.StatusConflict + } channel <- &api.Status{ Status: api.StatusFailure, Details: err.Error(), + Code: status, } } else { channel <- obj @@ -110,9 +117,13 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { glog.Infof("ApiServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) } }() - logger := MakeLogged(req, w) - w = logger - defer logger.Log() + defer MakeLogged(req, &w).StacktraceWhen( + StatusIsNot( + http.StatusOK, + http.StatusAccepted, + http.StatusConflict, + ), + ).Log() url, err := url.ParseRequestURI(req.RequestURI) if err != nil { server.error(err, w) @@ -141,7 +152,7 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } storage := server.storage[requestParts[0]] if storage == nil { - logger.Addf("'%v' has no storage object", requestParts[0]) + LogOf(w).Addf("'%v' has no storage object", requestParts[0]) server.notFound(req, w) return } else { @@ -171,8 +182,7 @@ func (server *ApiServer) error(err error, w http.ResponseWriter) { func (server *ApiServer) readBody(req *http.Request) ([]byte, error) { defer req.Body.Close() - body, err := ioutil.ReadAll(req.Body) - return body, err + return ioutil.ReadAll(req.Body) } // finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an @@ -184,7 +194,19 @@ func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout ti } obj, complete := op.StatusOrResult() if complete { - server.write(http.StatusOK, obj, w) + status := http.StatusOK + switch stat := obj.(type) { + case api.Status: + LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.") + if stat.Code != 0 { + status = stat.Code + } + case *api.Status: + if stat.Code != 0 { + status = stat.Code + } + } + server.write(status, obj, w) } else { server.write(http.StatusAccepted, obj, w) } diff --git a/pkg/apiserver/logger.go b/pkg/apiserver/logger.go index da82773cb6..3d881dee58 100644 --- a/pkg/apiserver/logger.go +++ b/pkg/apiserver/logger.go @@ -25,6 +25,9 @@ import ( "github.com/golang/glog" ) +// Return true if a stacktrace should be logged for this status +type StacktracePred func(httpStatus int) (logStacktrace bool) + // Add a layer on top of ResponseWriter, so we can track latency and error // message sources. type respLogger struct { @@ -35,17 +38,64 @@ type respLogger struct { req *http.Request w http.ResponseWriter + + logStacktracePred StacktracePred } +func DefaultStacktracePred(status int) bool { + return status != http.StatusOK && status != http.StatusAccepted +} + +// MakeLogged turns a normal response writer into a logged response writer. +// // Usage: -// logger := MakeLogged(req, w) -// w = logger // Route response writing actions through w -// defer logger.Log() -func MakeLogged(req *http.Request, w http.ResponseWriter) *respLogger { - return &respLogger{ - startTime: time.Now(), - req: req, - w: w, +// +// defer MakeLogged(req, &w).StacktraceWhen(StatusIsNot(200, 202)).Log() +// +// (Only the call to Log() is defered, so you can set everything up in one line!) +// +// Note that this *changes* your writer, to route response writing actions +// through the logger. +// +// Use LogOf(w).Addf(...) to log something along with the response result. +func MakeLogged(req *http.Request, w *http.ResponseWriter) *respLogger { + rl := &respLogger{ + startTime: time.Now(), + req: req, + w: *w, + logStacktracePred: DefaultStacktracePred, + } + *w = rl // hijack caller's writer! + return rl +} + +// LogOf returns the logger hiding in w. Panics if there isn't such a logger, +// because MakeLogged() must have been previously called for the log to work. +func LogOf(w http.ResponseWriter) *respLogger { + if rl, ok := w.(*respLogger); ok { + return rl + } + panic("Logger not installed yet!") + return nil +} + +// Sets the stacktrace logging predicate, which decides when to log a stacktrace. +// There's a default, so you don't need to call this unless you don't like the default. +func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger { + rl.logStacktracePred = pred + return rl +} + +// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged +// for any status *not* in the given list. +func StatusIsNot(statuses ...int) StacktracePred { + return func(status int) bool { + for _, s := range statuses { + if status == s { + return false + } + } + return true } } @@ -73,7 +123,7 @@ func (rl *respLogger) Write(b []byte) (int, error) { // Implement http.ResponseWriter func (rl *respLogger) WriteHeader(status int) { rl.status = status - if status != http.StatusOK && status != http.StatusAccepted { + if rl.logStacktracePred(status) { // Only log stacks for errors stack := make([]byte, 2048) stack = stack[:runtime.Stack(stack, false)] diff --git a/pkg/client/client.go b/pkg/client/client.go index 6ee978abc4..effccae056 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -100,16 +100,27 @@ func (c *Client) doRequest(request *http.Request) ([]byte, error) { if err != nil { return body, err } - if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent { + + // Did the server give us a status response? + isStatusResponse := false + var status api.Status + if err := api.DecodeInto(body, &status); err == nil && status.Status != "" { + isStatusResponse = true + } + + switch { + case response.StatusCode == http.StatusConflict: + // Return error given by server, if there was one. + if isStatusResponse { + return nil, &StatusErr{status} + } + fallthrough + case response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent: return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body)) } // If the server gave us a status back, look at what it was. - var status api.Status - if err := api.DecodeInto(body, &status); err == nil && status.Status != "" { - if status.Status == api.StatusSuccess { - return body, nil - } + if isStatusResponse && status.Status != api.StatusSuccess { // "Working" requests need to be handled specially. // "Failed" requests are clearly just an error and it makes sense to return them as such. return nil, &StatusErr{status} diff --git a/pkg/kubelet/kubelet_server.go b/pkg/kubelet/kubelet_server.go index 0883dc6145..41bc6d8bd4 100644 --- a/pkg/kubelet/kubelet_server.go +++ b/pkg/kubelet/kubelet_server.go @@ -46,9 +46,7 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) { } func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - logger := apiserver.MakeLogged(req, w) - w = logger - defer logger.Log() + defer apiserver.MakeLogged(req, &w).Log() u, err := url.ParseRequestURI(req.RequestURI) if err != nil { diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 631c925d0b..9f4b625e0d 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -118,14 +118,8 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { - _, index, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) - if err != nil { - return err - } - if jsonBase, err := api.FindJSONBase(objPtr); err == nil { - jsonBase.ResourceVersion = index - } - return nil + _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + return err } func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { @@ -145,20 +139,30 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) } body = response.Node.Value - return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr) + err = json.Unmarshal([]byte(body), objPtr) + if jsonBase, err := api.FindJSONBase(objPtr); err == nil { + jsonBase.ResourceVersion = response.Node.ModifiedIndex + // Note that err shadows the err returned below, so we won't + // return an error just because we failed to find a JSONBase. + // This is intentional. + } + return body, response.Node.ModifiedIndex, err } -// SetObj marshals obj via json, and stores under key. +// SetObj marshals obj via json, and stores under key. Will do an +// atomic update if obj's ResourceVersion field is set. func (h *EtcdHelper) SetObj(key string, obj interface{}) error { data, err := json.Marshal(obj) if err != nil { return err } - if jsonBase, err := api.FindJSONBase(obj); err == nil && jsonBase.ResourceVersion != 0 { + if jsonBase, err := api.FindJSONBaseRO(obj); err == nil && jsonBase.ResourceVersion != 0 { _, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion) - } else { - _, err = h.Client.Set(key, string(data), 0) + return err // err is shadowed! } + + // TODO: when client supports atomic creation, integrate this with the above. + _, err = h.Client.Set(key, string(data), 0) return err }