Test atomic PUTs, and make them work.

Improve apiserver/logger.go's interface (it's pretty cool now).

Improve apiserver's error reporting to clients.

Improve client's handling of errors from apiserver.

Make failed PUTs return 409 (conflict)-- http status codes are amazingly
well defined for what we're doing!
pull/6/head
Daniel Smith 2014-07-02 13:51:27 -07:00
parent a6144f656c
commit 3b9735d787
8 changed files with 229 additions and 41 deletions

View File

@ -23,7 +23,9 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect"
"runtime" "runtime"
"sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -128,8 +130,73 @@ func runReplicationControllerTest(kubeClient *client.Client) {
glog.Infof("Replication controller produced:\n\n%#v\n\n", pods) glog.Infof("Replication controller produced:\n\n%#v\n\n", pods)
} }
func runAtomicPutTest(c *client.Client) {
var svc api.Service
err := c.Post().Path("services").Body(
api.Service{
JSONBase: api.JSONBase{ID: "atomicService"},
Port: 12345,
Labels: map[string]string{
"name": "atomicService",
},
},
).Do().Into(&svc)
if err != nil {
glog.Fatalf("Failed creating atomicService: %v", err)
}
testLabels := labels.Set{}
for i := 0; i < 26; i++ {
// a: z, b: y, etc...
testLabels[string([]byte{byte('a' + i)})] = string([]byte{byte('z' - i)})
}
var wg sync.WaitGroup
wg.Add(len(testLabels))
for label, value := range testLabels {
go func(l, v string) {
for {
var tmpSvc api.Service
err := c.Get().Path("services").Path(svc.ID).Do().Into(&tmpSvc)
if err != nil {
glog.Errorf("Error getting atomicService: %v", err)
continue
}
if tmpSvc.Selector == nil {
tmpSvc.Selector = map[string]string{l: v}
} else {
tmpSvc.Selector[l] = v
}
err = c.Put().Path("services").Path(svc.ID).Body(&tmpSvc).Do().Error()
if err != nil {
if se, ok := err.(*client.StatusErr); ok {
if se.Status.Code == http.StatusConflict {
// This is what we expect.
continue
}
}
glog.Errorf("Unexpected error putting atomicService: %v", err)
continue
}
break
}
wg.Done()
}(label, value)
}
wg.Wait()
err = c.Get().Path("services").Path(svc.ID).Do().Into(&svc)
if err != nil {
glog.Fatalf("Failed getting atomicService after writers are complete: %v", err)
}
if !reflect.DeepEqual(testLabels, labels.Set(svc.Selector)) {
glog.Fatalf("Selector PUTs were not atomic: wanted %v, got %v", testLabels, svc.Selector)
}
glog.Info("Atomic PUTs work.")
}
type testFunc func(*client.Client)
func main() { func main() {
runtime.GOMAXPROCS(4) runtime.GOMAXPROCS(runtime.NumCPU())
util.ReallyCrash = true util.ReallyCrash = true
util.InitLogs() util.InitLogs()
defer util.FlushLogs() defer util.FlushLogs()
@ -150,7 +217,22 @@ func main() {
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
kubeClient := client.New(apiServerURL, nil) kubeClient := client.New(apiServerURL, nil)
runReplicationControllerTest(kubeClient)
// Run tests in parallel
testFuncs := []testFunc{
runReplicationControllerTest,
runAtomicPutTest,
}
var wg sync.WaitGroup
wg.Add(len(testFuncs))
for i := range testFuncs {
f := testFuncs[i]
go func() {
f(kubeClient)
wg.Done()
}()
}
wg.Wait()
// Check that kubelet tried to make the pods. // Check that kubelet tried to make the pods.
// Using a set to list unique creation attempts. Our fake is // Using a set to list unique creation attempts. Our fake is

View File

@ -56,6 +56,23 @@ func FindJSONBase(obj interface{}) (*JSONBase, error) {
return jsonBase, err return jsonBase, err
} }
// Takes an arbitary api type, return a copy of its JSONBase field.
// obj may be a pointer to an api type, or a non-pointer struct api type.
func FindJSONBaseRO(obj interface{}) (JSONBase, error) {
v := reflect.ValueOf(obj)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
if v.Kind() != reflect.Struct {
return JSONBase{}, fmt.Errorf("expected struct, but got %v", v.Type().Name())
}
jsonBase := v.FieldByName("JSONBase")
if !jsonBase.IsValid() {
return JSONBase{}, fmt.Errorf("struct %v lacks embedded JSON type", v.Type().Name())
}
return jsonBase.Interface().(JSONBase), nil
}
// Encode turns the given api object into an appropriate JSON string. // Encode turns the given api object into an appropriate JSON string.
// Will return an error if the object doesn't have an embedded JSONBase. // Will return an error if the object doesn't have an embedded JSONBase.
// Obj may be a pointer to a struct, or a struct. If a struct, a copy // Obj may be a pointer to a struct, or a struct. If a struct, a copy

View File

@ -268,7 +268,11 @@ type Status struct {
// TODO: if "working", include an operation identifier so final status can be // TODO: if "working", include an operation identifier so final status can be
// checked. // checked.
Status string `json:"status,omitempty" yaml:"status,omitempty"` Status string `json:"status,omitempty" yaml:"status,omitempty"`
// Details about the status. May be an error description or an
// operation number for later polling.
Details string `json:"details,omitempty" yaml:"details,omitempty"` Details string `json:"details,omitempty" yaml:"details,omitempty"`
// Suggested HTTP return code for this status, 0 if not set.
Code int `json:"code,omitempty" yaml:"code,omitempty"`
} }
// Values of Status.Status // Values of Status.Status

View File

@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -54,9 +55,15 @@ func MakeAsync(fn WorkFunc) <-chan interface{} {
defer util.HandleCrash() defer util.HandleCrash()
obj, err := fn() obj, err := fn()
if err != nil { if err != nil {
status := http.StatusInternalServerError
switch {
case tools.IsEtcdConflict(err):
status = http.StatusConflict
}
channel <- &api.Status{ channel <- &api.Status{
Status: api.StatusFailure, Status: api.StatusFailure,
Details: err.Error(), Details: err.Error(),
Code: status,
} }
} else { } else {
channel <- obj channel <- obj
@ -110,9 +117,13 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
glog.Infof("ApiServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) glog.Infof("ApiServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack())
} }
}() }()
logger := MakeLogged(req, w) defer MakeLogged(req, &w).StacktraceWhen(
w = logger StatusIsNot(
defer logger.Log() http.StatusOK,
http.StatusAccepted,
http.StatusConflict,
),
).Log()
url, err := url.ParseRequestURI(req.RequestURI) url, err := url.ParseRequestURI(req.RequestURI)
if err != nil { if err != nil {
server.error(err, w) server.error(err, w)
@ -141,7 +152,7 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
storage := server.storage[requestParts[0]] storage := server.storage[requestParts[0]]
if storage == nil { if storage == nil {
logger.Addf("'%v' has no storage object", requestParts[0]) LogOf(w).Addf("'%v' has no storage object", requestParts[0])
server.notFound(req, w) server.notFound(req, w)
return return
} else { } else {
@ -171,8 +182,7 @@ func (server *ApiServer) error(err error, w http.ResponseWriter) {
func (server *ApiServer) readBody(req *http.Request) ([]byte, error) { func (server *ApiServer) readBody(req *http.Request) ([]byte, error) {
defer req.Body.Close() defer req.Body.Close()
body, err := ioutil.ReadAll(req.Body) return ioutil.ReadAll(req.Body)
return body, err
} }
// finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an // finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an
@ -184,7 +194,19 @@ func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout ti
} }
obj, complete := op.StatusOrResult() obj, complete := op.StatusOrResult()
if complete { if complete {
server.write(http.StatusOK, obj, w) status := http.StatusOK
switch stat := obj.(type) {
case api.Status:
LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
if stat.Code != 0 {
status = stat.Code
}
case *api.Status:
if stat.Code != 0 {
status = stat.Code
}
}
server.write(status, obj, w)
} else { } else {
server.write(http.StatusAccepted, obj, w) server.write(http.StatusAccepted, obj, w)
} }

View File

@ -25,6 +25,9 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// Return true if a stacktrace should be logged for this status
type StacktracePred func(httpStatus int) (logStacktrace bool)
// Add a layer on top of ResponseWriter, so we can track latency and error // Add a layer on top of ResponseWriter, so we can track latency and error
// message sources. // message sources.
type respLogger struct { type respLogger struct {
@ -35,17 +38,64 @@ type respLogger struct {
req *http.Request req *http.Request
w http.ResponseWriter w http.ResponseWriter
logStacktracePred StacktracePred
} }
func DefaultStacktracePred(status int) bool {
return status != http.StatusOK && status != http.StatusAccepted
}
// MakeLogged turns a normal response writer into a logged response writer.
//
// Usage: // Usage:
// logger := MakeLogged(req, w) //
// w = logger // Route response writing actions through w // defer MakeLogged(req, &w).StacktraceWhen(StatusIsNot(200, 202)).Log()
// defer logger.Log() //
func MakeLogged(req *http.Request, w http.ResponseWriter) *respLogger { // (Only the call to Log() is defered, so you can set everything up in one line!)
return &respLogger{ //
// Note that this *changes* your writer, to route response writing actions
// through the logger.
//
// Use LogOf(w).Addf(...) to log something along with the response result.
func MakeLogged(req *http.Request, w *http.ResponseWriter) *respLogger {
rl := &respLogger{
startTime: time.Now(), startTime: time.Now(),
req: req, req: req,
w: w, w: *w,
logStacktracePred: DefaultStacktracePred,
}
*w = rl // hijack caller's writer!
return rl
}
// LogOf returns the logger hiding in w. Panics if there isn't such a logger,
// because MakeLogged() must have been previously called for the log to work.
func LogOf(w http.ResponseWriter) *respLogger {
if rl, ok := w.(*respLogger); ok {
return rl
}
panic("Logger not installed yet!")
return nil
}
// Sets the stacktrace logging predicate, which decides when to log a stacktrace.
// There's a default, so you don't need to call this unless you don't like the default.
func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger {
rl.logStacktracePred = pred
return rl
}
// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged
// for any status *not* in the given list.
func StatusIsNot(statuses ...int) StacktracePred {
return func(status int) bool {
for _, s := range statuses {
if status == s {
return false
}
}
return true
} }
} }
@ -73,7 +123,7 @@ func (rl *respLogger) Write(b []byte) (int, error) {
// Implement http.ResponseWriter // Implement http.ResponseWriter
func (rl *respLogger) WriteHeader(status int) { func (rl *respLogger) WriteHeader(status int) {
rl.status = status rl.status = status
if status != http.StatusOK && status != http.StatusAccepted { if rl.logStacktracePred(status) {
// Only log stacks for errors // Only log stacks for errors
stack := make([]byte, 2048) stack := make([]byte, 2048)
stack = stack[:runtime.Stack(stack, false)] stack = stack[:runtime.Stack(stack, false)]

View File

@ -100,16 +100,27 @@ func (c *Client) doRequest(request *http.Request) ([]byte, error) {
if err != nil { if err != nil {
return body, err return body, err
} }
if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent {
// Did the server give us a status response?
isStatusResponse := false
var status api.Status
if err := api.DecodeInto(body, &status); err == nil && status.Status != "" {
isStatusResponse = true
}
switch {
case response.StatusCode == http.StatusConflict:
// Return error given by server, if there was one.
if isStatusResponse {
return nil, &StatusErr{status}
}
fallthrough
case response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent:
return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body)) return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body))
} }
// If the server gave us a status back, look at what it was. // If the server gave us a status back, look at what it was.
var status api.Status if isStatusResponse && status.Status != api.StatusSuccess {
if err := api.DecodeInto(body, &status); err == nil && status.Status != "" {
if status.Status == api.StatusSuccess {
return body, nil
}
// "Working" requests need to be handled specially. // "Working" requests need to be handled specially.
// "Failed" requests are clearly just an error and it makes sense to return them as such. // "Failed" requests are clearly just an error and it makes sense to return them as such.
return nil, &StatusErr{status} return nil, &StatusErr{status}

View File

@ -46,9 +46,7 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) {
} }
func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
logger := apiserver.MakeLogged(req, w) defer apiserver.MakeLogged(req, &w).Log()
w = logger
defer logger.Log()
u, err := url.ParseRequestURI(req.RequestURI) u, err := url.ParseRequestURI(req.RequestURI)
if err != nil { if err != nil {

View File

@ -118,14 +118,8 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error. // empty responses and nil response nodes exactly like a not found error.
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
_, index, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
if err != nil {
return err return err
}
if jsonBase, err := api.FindJSONBase(objPtr); err == nil {
jsonBase.ResourceVersion = index
}
return nil
} }
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
@ -145,20 +139,30 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
} }
body = response.Node.Value body = response.Node.Value
return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr) err = json.Unmarshal([]byte(body), objPtr)
if jsonBase, err := api.FindJSONBase(objPtr); err == nil {
jsonBase.ResourceVersion = response.Node.ModifiedIndex
// Note that err shadows the err returned below, so we won't
// return an error just because we failed to find a JSONBase.
// This is intentional.
}
return body, response.Node.ModifiedIndex, err
} }
// SetObj marshals obj via json, and stores under key. // SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error { func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj) data, err := json.Marshal(obj)
if err != nil { if err != nil {
return err return err
} }
if jsonBase, err := api.FindJSONBase(obj); err == nil && jsonBase.ResourceVersion != 0 { if jsonBase, err := api.FindJSONBaseRO(obj); err == nil && jsonBase.ResourceVersion != 0 {
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion) _, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion)
} else { return err // err is shadowed!
_, err = h.Client.Set(key, string(data), 0)
} }
// TODO: when client supports atomic creation, integrate this with the above.
_, err = h.Client.Set(key, string(data), 0)
return err return err
} }