From 0d30a656ef3c2e7cd3655328a805c3196c133e5d Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 5 Sep 2014 19:22:03 -0700 Subject: [PATCH] Do interface{} -> runtime.Object rename everywhere --- examples/examples_test.go | 6 ++--- pkg/apiserver/apiserver.go | 16 ++++-------- pkg/apiserver/async.go | 7 +++--- pkg/apiserver/interfaces.go | 15 ++++++------ pkg/apiserver/operation.go | 15 ++++++------ pkg/apiserver/redirect.go | 3 ++- pkg/apiserver/resthandler.go | 10 +++----- pkg/apiserver/watch.go | 2 +- pkg/client/request.go | 13 ++++++---- pkg/kubecfg/parse.go | 2 +- pkg/kubecfg/proxy_server.go | 2 +- pkg/kubecfg/resource_printer.go | 14 +++++------ pkg/master/master.go | 2 +- pkg/registry/binding/storage.go | 15 ++++++------ pkg/registry/controller/registry.go | 4 +-- pkg/registry/controller/storage.go | 25 ++++++++++--------- pkg/registry/endpoint/registry.go | 2 +- pkg/registry/endpoint/storage.go | 13 +++++----- pkg/registry/etcd/etcd.go | 32 ++++++++++++------------ pkg/registry/minion/storage.go | 25 ++++++++++--------- pkg/registry/pod/registry.go | 4 +-- pkg/registry/pod/storage.go | 25 ++++++++++--------- pkg/registry/service/registry.go | 4 +-- pkg/registry/service/storage.go | 23 ++++++++--------- pkg/runtime/types.go | 9 ------- pkg/service/endpoints_controller.go | 2 +- pkg/tools/decoder.go | 3 ++- pkg/tools/etcd_tools.go | 38 ++++++++++------------------- pkg/tools/etcd_tools_watch.go | 19 ++++++++------- pkg/watch/iowatcher.go | 3 ++- pkg/watch/mux.go | 4 ++- pkg/watch/mux_test.go | 19 +++++++++------ pkg/watch/watch.go | 12 +++++---- 33 files changed, 190 insertions(+), 198 deletions(-) diff --git a/examples/examples_test.go b/examples/examples_test.go index 70e910f02e..3abb905cf3 100644 --- a/examples/examples_test.go +++ b/examples/examples_test.go @@ -31,7 +31,7 @@ import ( "github.com/golang/glog" ) -func validateObject(obj interface{}) (errors []error) { +func validateObject(obj runtime.Object) (errors []error) { switch t := obj.(type) { case *api.ReplicationController: errors = validation.ValidateManifest(&t.DesiredState.PodTemplate.DesiredState.Manifest) @@ -85,7 +85,7 @@ func walkJSONFiles(inDir string, fn func(name, path string, data []byte)) error } func TestApiExamples(t *testing.T) { - expected := map[string]interface{}{ + expected := map[string]runtime.Object{ "controller": &api.ReplicationController{}, "controller-list": &api.ReplicationControllerList{}, "pod": &api.Pod{}, @@ -120,7 +120,7 @@ func TestApiExamples(t *testing.T) { } func TestExamples(t *testing.T) { - expected := map[string]interface{}{ + expected := map[string]runtime.Object{ "frontend-controller": &api.ReplicationController{}, "redis-slave-controller": &api.ReplicationController{}, "redis-master": &api.Pod{}, diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index d08f5dbe00..d6e13a6991 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -27,17 +27,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/golang/glog" ) -// Codec defines methods for serializing and deserializing API objects. -type Codec interface { - Encode(obj interface{}) (data []byte, err error) - Decode(data []byte) (interface{}, error) - DecodeInto(data []byte, obj interface{}) error -} - // mux is an object that can register http handlers. type mux interface { Handle(pattern string, handler http.Handler) @@ -53,7 +47,7 @@ type defaultAPIServer struct { // Handle returns a Handler function that expose the provided storage interfaces // as RESTful resources at prefix, serialized by codec, and also includes the support // http resources. -func Handle(storage map[string]RESTStorage, codec Codec, prefix string) http.Handler { +func Handle(storage map[string]RESTStorage, codec runtime.Codec, prefix string) http.Handler { group := NewAPIGroup(storage, codec) mux := http.NewServeMux() @@ -78,7 +72,7 @@ type APIGroup struct { // This is a helper method for registering multiple sets of REST handlers under different // prefixes onto a server. // TODO: add multitype codec serialization -func NewAPIGroup(storage map[string]RESTStorage, codec Codec) *APIGroup { +func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec) *APIGroup { return &APIGroup{RESTHandler{ storage: storage, codec: codec, @@ -147,7 +141,7 @@ func handleVersion(w http.ResponseWriter, req *http.Request) { } // writeJSON renders an object as JSON to the response. -func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseWriter) { +func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter) { output, err := codec.Encode(object) if err != nil { errorJSON(err, codec, w) @@ -159,7 +153,7 @@ func writeJSON(statusCode int, codec Codec, object interface{}, w http.ResponseW } // errorJSON renders an error to the response. -func errorJSON(err error, codec Codec, w http.ResponseWriter) { +func errorJSON(err error, codec runtime.Codec, w http.ResponseWriter) { status := errToAPIStatus(err) writeJSON(status.Code, codec, status, w) } diff --git a/pkg/apiserver/async.go b/pkg/apiserver/async.go index a87c9766a1..97a5919d07 100644 --- a/pkg/apiserver/async.go +++ b/pkg/apiserver/async.go @@ -17,18 +17,19 @@ limitations under the License. package apiserver import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // WorkFunc is used to perform any time consuming work for an api call, after // the input has been validated. Pass one of these to MakeAsync to create an // appropriate return value for the Update, Delete, and Create methods. -type WorkFunc func() (result interface{}, err error) +type WorkFunc func() (result runtime.Object, err error) // MakeAsync takes a function and executes it, delivering the result in the way required // by RESTStorage's Update, Delete, and Create methods. -func MakeAsync(fn WorkFunc) <-chan interface{} { - channel := make(chan interface{}) +func MakeAsync(fn WorkFunc) <-chan runtime.Object { + channel := make(chan runtime.Object) go func() { defer util.HandleCrash() obj, err := fn() diff --git a/pkg/apiserver/interfaces.go b/pkg/apiserver/interfaces.go index 201436a5e9..c2a4f2c575 100644 --- a/pkg/apiserver/interfaces.go +++ b/pkg/apiserver/interfaces.go @@ -18,6 +18,7 @@ package apiserver import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -25,25 +26,25 @@ import ( // Resources which are exported to the RESTful API of apiserver need to implement this interface. type RESTStorage interface { // New returns an empty object that can be used with Create and Update after request data has been put into it. - // This object must be a pointer type for use with Codec.DecodeInto([]byte, interface{}) - New() interface{} + // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) + New() runtime.Object // List selects resources in the storage which match to the selector. // TODO: add field selector in addition to label selector. - List(labels.Selector) (interface{}, error) + List(labels.Selector) (runtime.Object, error) // Get finds a resource in the storage by id and returns it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. - Get(id string) (interface{}, error) + Get(id string) (runtime.Object, error) // Delete finds a resource in the storage and deletes it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. - Delete(id string) (<-chan interface{}, error) + Delete(id string) (<-chan runtime.Object, error) - Create(interface{}) (<-chan interface{}, error) - Update(interface{}) (<-chan interface{}, error) + Create(runtime.Object) (<-chan runtime.Object, error) + Update(runtime.Object) (<-chan runtime.Object, error) } // ResourceWatcher should be implemented by all RESTStorage objects that diff --git a/pkg/apiserver/operation.go b/pkg/apiserver/operation.go index 34cd12c926..e0a3e9a461 100644 --- a/pkg/apiserver/operation.go +++ b/pkg/apiserver/operation.go @@ -25,12 +25,13 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) type OperationHandler struct { ops *Operations - codec Codec + codec runtime.Codec } func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -63,8 +64,8 @@ func (h *OperationHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Operation represents an ongoing action which the server is performing. type Operation struct { ID string - result interface{} - awaiting <-chan interface{} + result runtime.Object + awaiting <-chan runtime.Object finished *time.Time lock sync.Mutex notify chan struct{} @@ -90,7 +91,7 @@ func NewOperations() *Operations { } // NewOperation adds a new operation. It is lock-free. -func (ops *Operations) NewOperation(from <-chan interface{}) *Operation { +func (ops *Operations) NewOperation(from <-chan runtime.Object) *Operation { id := atomic.AddInt64(&ops.lastID, 1) op := &Operation{ ID: strconv.FormatInt(id, 10), @@ -110,7 +111,7 @@ func (ops *Operations) insert(op *Operation) { } // List lists operations for an API client. -func (ops *Operations) List() api.ServerOpList { +func (ops *Operations) List() *api.ServerOpList { ops.lock.Lock() defer ops.lock.Unlock() @@ -119,7 +120,7 @@ func (ops *Operations) List() api.ServerOpList { ids = append(ids, id) } sort.StringSlice(ids).Sort() - ol := api.ServerOpList{} + ol := &api.ServerOpList{} for _, id := range ids { ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}}) } @@ -185,7 +186,7 @@ func (op *Operation) expired(limitTime time.Time) bool { // StatusOrResult returns status information or the result of the operation if it is complete, // with a bool indicating true in the latter case. -func (op *Operation) StatusOrResult() (description interface{}, finished bool) { +func (op *Operation) StatusOrResult() (description runtime.Object, finished bool) { op.lock.Lock() defer op.lock.Unlock() diff --git a/pkg/apiserver/redirect.go b/pkg/apiserver/redirect.go index be0a913632..57aff02d3d 100644 --- a/pkg/apiserver/redirect.go +++ b/pkg/apiserver/redirect.go @@ -20,11 +20,12 @@ import ( "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) type RedirectHandler struct { storage map[string]RESTStorage - codec Codec + codec runtime.Codec } func (r *RedirectHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 0e99c1dba0..09251b5bc4 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -23,11 +23,12 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) type RESTHandler struct { storage map[string]RESTStorage - codec Codec + codec runtime.Codec ops *Operations asyncOpWait time.Duration } @@ -158,7 +159,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt } // createOperation creates an operation to process a channel response. -func (h *RESTHandler) createOperation(out <-chan interface{}, sync bool, timeout time.Duration) *Operation { +func (h *RESTHandler) createOperation(out <-chan runtime.Object, sync bool, timeout time.Duration) *Operation { op := h.ops.NewOperation(out) if sync { op.WaitFor(timeout) @@ -175,11 +176,6 @@ func (h *RESTHandler) finishReq(op *Operation, w http.ResponseWriter) { if complete { status := http.StatusOK switch stat := obj.(type) { - case api.Status: - httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.") - if stat.Code != 0 { - status = stat.Code - } case *api.Status: if stat.Code != 0 { status = stat.Code diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index a4d349e663..7cd047093b 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -32,7 +32,7 @@ import ( type WatchHandler struct { storage map[string]RESTStorage - codec Codec + codec runtime.Codec } func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) { diff --git a/pkg/client/request.go b/pkg/client/request.go index f7995fd33a..8e6f41b5a9 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -187,7 +187,8 @@ func (r *Request) Timeout(d time.Duration) *Request { // If obj is a string, try to read a file of that name. // If obj is a []byte, send it directly. // If obj is an io.Reader, use it directly. -// Otherwise, assume obj is an api type and marshall it correctly. +// If obj is a runtime.Object, marshal it correctly. +// Otherwise, set an error. func (r *Request) Body(obj interface{}) *Request { if r.err != nil { return r @@ -204,13 +205,15 @@ func (r *Request) Body(obj interface{}) *Request { r.body = bytes.NewBuffer(t) case io.Reader: r.body = t - default: - data, err := runtime.DefaultCodec.Encode(obj) + case runtime.Object: + data, err := runtime.DefaultCodec.Encode(t) if err != nil { r.err = err return r } r.body = bytes.NewBuffer(data) + default: + r.err = fmt.Errorf("Unknown type used for body: %#v", obj) } return r } @@ -314,7 +317,7 @@ func (r Result) Raw() ([]byte, error) { } // Get returns the result as an object. -func (r Result) Get() (interface{}, error) { +func (r Result) Get() (runtime.Object, error) { if r.err != nil { return nil, r.err } @@ -322,7 +325,7 @@ func (r Result) Get() (interface{}, error) { } // Into stores the result into obj, if possible. -func (r Result) Into(obj interface{}) error { +func (r Result) Into(obj runtime.Object) error { if r.err != nil { return r.err } diff --git a/pkg/kubecfg/parse.go b/pkg/kubecfg/parse.go index db5f5e2283..a61ce563a0 100644 --- a/pkg/kubecfg/parse.go +++ b/pkg/kubecfg/parse.go @@ -43,7 +43,7 @@ func (p *Parser) ToWireFormat(data []byte, storage string) ([]byte, error) { return nil, fmt.Errorf("unknown storage type: %v", storage) } - obj := reflect.New(prototypeType).Interface() + obj := reflect.New(prototypeType).Interface().(runtime.Object) err := runtime.DefaultCodec.DecodeInto(data, obj) if err != nil { return nil, err diff --git a/pkg/kubecfg/proxy_server.go b/pkg/kubecfg/proxy_server.go index 5a28c3e5c2..002ba6530e 100644 --- a/pkg/kubecfg/proxy_server.go +++ b/pkg/kubecfg/proxy_server.go @@ -53,7 +53,7 @@ func (s *ProxyServer) Serve() error { func (s *ProxyServer) doError(w http.ResponseWriter, err error) { w.WriteHeader(http.StatusInternalServerError) w.Header().Add("Content-type", "application/json") - data, _ := runtime.DefaultCodec.Encode(api.Status{ + data, _ := runtime.DefaultCodec.Encode(&api.Status{ Status: api.StatusFailure, Message: fmt.Sprintf("internal error: %#v", err), }) diff --git a/pkg/kubecfg/resource_printer.go b/pkg/kubecfg/resource_printer.go index 7d966ed4d1..514161738f 100644 --- a/pkg/kubecfg/resource_printer.go +++ b/pkg/kubecfg/resource_printer.go @@ -36,7 +36,7 @@ import ( type ResourcePrinter interface { // Print receives an arbitrary JSON body, formats it and prints it to a writer. Print([]byte, io.Writer) error - PrintObj(interface{}, io.Writer) error + PrintObj(runtime.Object, io.Writer) error } // IdentityPrinter is an implementation of ResourcePrinter which simply copies the body out to the output stream. @@ -49,7 +49,7 @@ func (i *IdentityPrinter) Print(data []byte, w io.Writer) error { } // PrintObj is an implementation of ResourcePrinter.PrintObj which simply writes the object to the Writer. -func (i *IdentityPrinter) PrintObj(obj interface{}, output io.Writer) error { +func (i *IdentityPrinter) PrintObj(obj runtime.Object, output io.Writer) error { data, err := runtime.DefaultCodec.Encode(obj) if err != nil { return err @@ -62,7 +62,7 @@ type YAMLPrinter struct{} // Print parses the data as JSON, re-formats as YAML and prints the YAML. func (y *YAMLPrinter) Print(data []byte, w io.Writer) error { - var obj interface{} + var obj runtime.Object if err := json.Unmarshal(data, &obj); err != nil { return err } @@ -75,7 +75,7 @@ func (y *YAMLPrinter) Print(data []byte, w io.Writer) error { } // PrintObj prints the data as YAML. -func (y *YAMLPrinter) PrintObj(obj interface{}, w io.Writer) error { +func (y *YAMLPrinter) PrintObj(obj runtime.Object, w io.Writer) error { output, err := yaml.Marshal(obj) if err != nil { return err @@ -251,7 +251,7 @@ func printStatus(status *api.Status, w io.Writer) error { // Print parses the data as JSON, then prints the parsed data in a human-friendly // format according to the type of the data. func (h *HumanReadablePrinter) Print(data []byte, output io.Writer) error { - var mapObj map[string]interface{} + var mapObj map[string]runtime.Object if err := json.Unmarshal([]byte(data), &mapObj); err != nil { return err } @@ -268,7 +268,7 @@ func (h *HumanReadablePrinter) Print(data []byte, output io.Writer) error { } // PrintObj prints the obj in a human-friendly format according to the type of the obj. -func (h *HumanReadablePrinter) PrintObj(obj interface{}, output io.Writer) error { +func (h *HumanReadablePrinter) PrintObj(obj runtime.Object, output io.Writer) error { w := tabwriter.NewWriter(output, 20, 5, 3, ' ', 0) defer w.Flush() if handler := h.handlerMap[reflect.TypeOf(obj)]; handler != nil { @@ -300,6 +300,6 @@ func (t *TemplatePrinter) Print(data []byte, w io.Writer) error { } // PrintObj formats the obj with the Go Template. -func (t *TemplatePrinter) PrintObj(obj interface{}, w io.Writer) error { +func (t *TemplatePrinter) PrintObj(obj runtime.Object, w io.Writer) error { return t.Template.Execute(w, obj) } diff --git a/pkg/master/master.go b/pkg/master/master.go index 13a9552904..97d1a958a2 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -131,7 +131,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf } // API_v1beta1 returns the resources and codec for API version v1beta1. -func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, apiserver.Codec) { +func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec) { storage := make(map[string]apiserver.RESTStorage) for k, v := range m.storage { storage[k] = v diff --git a/pkg/registry/binding/storage.go b/pkg/registry/binding/storage.go index c6aeef564c..85cc2fc16b 100644 --- a/pkg/registry/binding/storage.go +++ b/pkg/registry/binding/storage.go @@ -23,6 +23,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) // BindingStorage implements the RESTStorage interface. When bindings are written, it @@ -40,32 +41,32 @@ func NewBindingStorage(bindingRegistry Registry) *BindingStorage { } // List returns an error because bindings are write-only objects. -func (*BindingStorage) List(selector labels.Selector) (interface{}, error) { +func (*BindingStorage) List(selector labels.Selector) (runtime.Object, error) { return nil, errors.NewNotFound("binding", "list") } // Get returns an error because bindings are write-only objects. -func (*BindingStorage) Get(id string) (interface{}, error) { +func (*BindingStorage) Get(id string) (runtime.Object, error) { return nil, errors.NewNotFound("binding", id) } // Delete returns an error because bindings are write-only objects. -func (*BindingStorage) Delete(id string) (<-chan interface{}, error) { +func (*BindingStorage) Delete(id string) (<-chan runtime.Object, error) { return nil, errors.NewNotFound("binding", id) } // New returns a new binding object fit for having data unmarshalled into it. -func (*BindingStorage) New() interface{} { +func (*BindingStorage) New() runtime.Object { return &api.Binding{} } // Create attempts to make the assignment indicated by the binding it recieves. -func (b *BindingStorage) Create(obj interface{}) (<-chan interface{}, error) { +func (b *BindingStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) { binding, ok := obj.(*api.Binding) if !ok { return nil, fmt.Errorf("incorrect type: %#v", obj) } - return apiserver.MakeAsync(func() (interface{}, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { if err := b.registry.ApplyBinding(binding); err != nil { return nil, err } @@ -74,6 +75,6 @@ func (b *BindingStorage) Create(obj interface{}) (<-chan interface{}, error) { } // Update returns an error-- this object may not be updated. -func (b *BindingStorage) Update(obj interface{}) (<-chan interface{}, error) { +func (b *BindingStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) { return nil, fmt.Errorf("Bindings may not be changed.") } diff --git a/pkg/registry/controller/registry.go b/pkg/registry/controller/registry.go index ab6d27be14..621577fd53 100644 --- a/pkg/registry/controller/registry.go +++ b/pkg/registry/controller/registry.go @@ -26,7 +26,7 @@ type Registry interface { ListControllers() (*api.ReplicationControllerList, error) WatchControllers(resourceVersion uint64) (watch.Interface, error) GetController(controllerID string) (*api.ReplicationController, error) - CreateController(controller api.ReplicationController) error - UpdateController(controller api.ReplicationController) error + CreateController(controller *api.ReplicationController) error + UpdateController(controller *api.ReplicationController) error DeleteController(controllerID string) error } diff --git a/pkg/registry/controller/storage.go b/pkg/registry/controller/storage.go index 8ce5e5fe46..9cf5dd5144 100644 --- a/pkg/registry/controller/storage.go +++ b/pkg/registry/controller/storage.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -51,7 +52,7 @@ func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.R } // Create registers the given ReplicationController. -func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -67,8 +68,8 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { controller.CreationTimestamp = util.Now() - return apiserver.MakeAsync(func() (interface{}, error) { - err := rs.registry.CreateController(*controller) + return apiserver.MakeAsync(func() (runtime.Object, error) { + err := rs.registry.CreateController(controller) if err != nil { return nil, err } @@ -77,14 +78,14 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { } // Delete asynchronously deletes the ReplicationController specified by its id. -func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() (interface{}, error) { +func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id) }), nil } // Get obtains the ReplicationController specified by its id. -func (rs *RegistryStorage) Get(id string) (interface{}, error) { +func (rs *RegistryStorage) Get(id string) (runtime.Object, error) { controller, err := rs.registry.GetController(id) if err != nil { return nil, err @@ -93,7 +94,7 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) { } // List obtains a list of ReplicationControllers that match selector. -func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) { controllers, err := rs.registry.ListControllers() if err != nil { return nil, err @@ -109,13 +110,13 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { } // New creates a new ReplicationController for use with Create and Update. -func (rs RegistryStorage) New() interface{} { +func (rs RegistryStorage) New() runtime.Object { return &api.ReplicationController{} } // Update replaces a given ReplicationController instance with an existing // instance in storage.registry. -func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) { controller, ok := obj.(*api.ReplicationController) if !ok { return nil, fmt.Errorf("not a replication controller: %#v", obj) @@ -123,8 +124,8 @@ func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { if errs := validation.ValidateReplicationController(controller); len(errs) > 0 { return nil, errors.NewInvalid("replicationController", controller.ID, errs) } - return apiserver.MakeAsync(func() (interface{}, error) { - err := rs.registry.UpdateController(*controller) + return apiserver.MakeAsync(func() (runtime.Object, error) { + err := rs.registry.UpdateController(controller) if err != nil { return nil, err } @@ -148,7 +149,7 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u }), nil } -func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) { +func (rs *RegistryStorage) waitForController(ctrl *api.ReplicationController) (runtime.Object, error) { for { pods, err := rs.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector()) if err != nil { diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go index 1cd5f2d548..ed03f0b129 100644 --- a/pkg/registry/endpoint/registry.go +++ b/pkg/registry/endpoint/registry.go @@ -27,5 +27,5 @@ type Registry interface { ListEndpoints() (*api.EndpointsList, error) GetEndpoints(name string) (*api.Endpoints, error) WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) - UpdateEndpoints(e api.Endpoints) error + UpdateEndpoints(e *api.Endpoints) error } diff --git a/pkg/registry/endpoint/storage.go b/pkg/registry/endpoint/storage.go index b0e4104b9d..3b21e14f77 100644 --- a/pkg/registry/endpoint/storage.go +++ b/pkg/registry/endpoint/storage.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -38,12 +39,12 @@ func NewStorage(registry Registry) apiserver.RESTStorage { } // Get satisfies the RESTStorage interface. -func (rs *Storage) Get(id string) (interface{}, error) { +func (rs *Storage) Get(id string) (runtime.Object, error) { return rs.registry.GetEndpoints(id) } // List satisfies the RESTStorage interface. -func (rs *Storage) List(selector labels.Selector) (interface{}, error) { +func (rs *Storage) List(selector labels.Selector) (runtime.Object, error) { if !selector.Empty() { return nil, errors.New("label selectors are not supported on endpoints") } @@ -57,21 +58,21 @@ func (rs *Storage) Watch(label, field labels.Selector, resourceVersion uint64) ( } // Create satisfies the RESTStorage interface but is unimplemented. -func (rs *Storage) Create(obj interface{}) (<-chan interface{}, error) { +func (rs *Storage) Create(obj runtime.Object) (<-chan runtime.Object, error) { return nil, errors.New("unimplemented") } // Update satisfies the RESTStorage interface but is unimplemented. -func (rs *Storage) Update(obj interface{}) (<-chan interface{}, error) { +func (rs *Storage) Update(obj runtime.Object) (<-chan runtime.Object, error) { return nil, errors.New("unimplemented") } // Delete satisfies the RESTStorage interface but is unimplemented. -func (rs *Storage) Delete(id string) (<-chan interface{}, error) { +func (rs *Storage) Delete(id string) (<-chan runtime.Object, error) { return nil, errors.New("unimplemented") } // New implements the RESTStorage interface. -func (rs Storage) New() interface{} { +func (rs Storage) New() runtime.Object { return &api.Endpoints{} } diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index c01f9ba839..0647f38247 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -82,7 +82,7 @@ func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) { // WatchPods begins watching for new, changed, or deleted pods. func (r *Registry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) { - return r.WatchList("/registry/pods", resourceVersion, func(obj interface{}) bool { + return r.WatchList("/registry/pods", resourceVersion, func(obj runtime.Object) bool { pod, ok := obj.(*api.Pod) if !ok { glog.Errorf("Unexpected object during pod watch: %#v", obj) @@ -110,14 +110,14 @@ func makeContainerKey(machine string) string { } // CreatePod creates a pod based on a specification. -func (r *Registry) CreatePod(pod api.Pod) error { +func (r *Registry) CreatePod(pod *api.Pod) error { // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. pod.DesiredState.Status = api.PodRunning pod.DesiredState.Host = "" - return r.CreateObj(makePodKey(pod.ID), &pod) + return r.CreateObj(makePodKey(pod.ID), pod) } // ApplyBinding implements binding's registry @@ -129,7 +129,7 @@ func (r *Registry) ApplyBinding(binding *api.Binding) error { // Returns the current state of the pod, or an error. func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) { podKey := makePodKey(podID) - err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) { + err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) @@ -156,13 +156,13 @@ func (r *Registry) assignPod(podID string, machine string) error { return err } contKey := makeContainerKey(machine) - err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) { manifests := *in.(*api.ContainerManifestList) manifests.Items = append(manifests.Items, manifest) if !constraint.Allowed(manifests.Items) { return nil, fmt.Errorf("The assignment would cause a constraint violation") } - return manifests, nil + return &manifests, nil }) if err != nil { // Put the pod's host back the way it was. This is a terrible hack that @@ -174,7 +174,7 @@ func (r *Registry) assignPod(podID string, machine string) error { return err } -func (r *Registry) UpdatePod(pod api.Pod) error { +func (r *Registry) UpdatePod(pod *api.Pod) error { return fmt.Errorf("unimplemented!") } @@ -205,7 +205,7 @@ func (r *Registry) DeletePod(podID string) error { } // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) { manifests := in.(*api.ContainerManifestList) newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) found := false @@ -258,7 +258,7 @@ func (r *Registry) GetController(controllerID string) (*api.ReplicationControlle } // CreateController creates a new ReplicationController. -func (r *Registry) CreateController(controller api.ReplicationController) error { +func (r *Registry) CreateController(controller *api.ReplicationController) error { err := r.CreateObj(makeControllerKey(controller.ID), controller) if tools.IsEtcdNodeExist(err) { return errors.NewAlreadyExists("replicationController", controller.ID) @@ -267,8 +267,8 @@ func (r *Registry) CreateController(controller api.ReplicationController) error } // UpdateController replaces an existing ReplicationController. -func (r *Registry) UpdateController(controller api.ReplicationController) error { - return r.SetObj(makeControllerKey(controller.ID), &controller) +func (r *Registry) UpdateController(controller *api.ReplicationController) error { + return r.SetObj(makeControllerKey(controller.ID), controller) } // DeleteController deletes a ReplicationController specified by its ID. @@ -293,7 +293,7 @@ func (r *Registry) ListServices() (*api.ServiceList, error) { } // CreateService creates a new Service. -func (r *Registry) CreateService(svc api.Service) error { +func (r *Registry) CreateService(svc *api.Service) error { err := r.CreateObj(makeServiceKey(svc.ID), svc) if tools.IsEtcdNodeExist(err) { return errors.NewAlreadyExists("service", svc.ID) @@ -352,8 +352,8 @@ func (r *Registry) DeleteService(name string) error { } // UpdateService replaces an existing Service. -func (r *Registry) UpdateService(svc api.Service) error { - return r.SetObj(makeServiceKey(svc.ID), &svc) +func (r *Registry) UpdateService(svc *api.Service) error { + return r.SetObj(makeServiceKey(svc.ID), svc) } // WatchServices begins watching for new, changed, or deleted service configurations. @@ -378,10 +378,10 @@ func (r *Registry) ListEndpoints() (*api.EndpointsList, error) { } // UpdateEndpoints update Endpoints of a Service. -func (r *Registry) UpdateEndpoints(e api.Endpoints) error { +func (r *Registry) UpdateEndpoints(e *api.Endpoints) error { // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, - func(input interface{}) (interface{}, error) { + func(input runtime.Object) (runtime.Object, error) { // TODO: racy - label query is returning different results for two simultaneous updaters return e, nil }) diff --git a/pkg/registry/minion/storage.go b/pkg/registry/minion/storage.go index 27580983aa..c7da335632 100644 --- a/pkg/registry/minion/storage.go +++ b/pkg/registry/minion/storage.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -37,7 +38,7 @@ func NewRegistryStorage(m Registry) apiserver.RESTStorage { } } -func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) { minion, ok := obj.(*api.Minion) if !ok { return nil, fmt.Errorf("not a minion: %#v", obj) @@ -48,7 +49,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { minion.CreationTimestamp = util.Now() - return apiserver.MakeAsync(func() (interface{}, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { err := rs.registry.Insert(minion.ID) if err != nil { return nil, err @@ -64,7 +65,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { }), nil } -func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { +func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) { exists, err := rs.registry.Contains(id) if !exists { return nil, ErrDoesNotExist @@ -72,12 +73,12 @@ func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { if err != nil { return nil, err } - return apiserver.MakeAsync(func() (interface{}, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id) }), nil } -func (rs *RegistryStorage) Get(id string) (interface{}, error) { +func (rs *RegistryStorage) Get(id string) (runtime.Object, error) { exists, err := rs.registry.Contains(id) if !exists { return nil, ErrDoesNotExist @@ -85,26 +86,26 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) { return rs.toApiMinion(id), err } -func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) { nameList, err := rs.registry.List() if err != nil { return nil, err } var list api.MinionList for _, name := range nameList { - list.Items = append(list.Items, rs.toApiMinion(name)) + list.Items = append(list.Items, *rs.toApiMinion(name)) } - return list, nil + return &list, nil } -func (rs RegistryStorage) New() interface{} { +func (rs RegistryStorage) New() runtime.Object { return &api.Minion{} } -func (rs *RegistryStorage) Update(minion interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Update(minion runtime.Object) (<-chan runtime.Object, error) { return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.") } -func (rs *RegistryStorage) toApiMinion(name string) api.Minion { - return api.Minion{JSONBase: api.JSONBase{ID: name}} +func (rs *RegistryStorage) toApiMinion(name string) *api.Minion { + return &api.Minion{JSONBase: api.JSONBase{ID: name}} } diff --git a/pkg/registry/pod/registry.go b/pkg/registry/pod/registry.go index af8fd91db4..ef12477e06 100644 --- a/pkg/registry/pod/registry.go +++ b/pkg/registry/pod/registry.go @@ -31,9 +31,9 @@ type Registry interface { // Get a specific pod GetPod(podID string) (*api.Pod, error) // Create a pod based on a specification. - CreatePod(pod api.Pod) error + CreatePod(pod *api.Pod) error // Update an existing pod - UpdatePod(pod api.Pod) error + UpdatePod(pod *api.Pod) error // Delete an existing pod DeletePod(podID string) error } diff --git a/pkg/registry/pod/storage.go b/pkg/registry/pod/storage.go index 730351067f..3dcd0aebf2 100644 --- a/pkg/registry/pod/storage.go +++ b/pkg/registry/pod/storage.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -64,7 +65,7 @@ func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage { } } -func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) { pod := obj.(*api.Pod) if len(pod.ID) == 0 { pod.ID = uuid.NewUUID().String() @@ -76,21 +77,21 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { pod.CreationTimestamp = util.Now() - return apiserver.MakeAsync(func() (interface{}, error) { - if err := rs.registry.CreatePod(*pod); err != nil { + return apiserver.MakeAsync(func() (runtime.Object, error) { + if err := rs.registry.CreatePod(pod); err != nil { return nil, err } return rs.registry.GetPod(pod.ID) }), nil } -func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { - return apiserver.MakeAsync(func() (interface{}, error) { +func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { return &api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id) }), nil } -func (rs *RegistryStorage) Get(id string) (interface{}, error) { +func (rs *RegistryStorage) Get(id string) (runtime.Object, error) { pod, err := rs.registry.GetPod(id) if err != nil { return pod, err @@ -106,7 +107,7 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) { return pod, err } -func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) { pods, err := rs.registry.ListPods(selector) if err == nil { for i := range pods.Items { @@ -131,17 +132,17 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u }) } -func (rs RegistryStorage) New() interface{} { +func (rs RegistryStorage) New() runtime.Object { return &api.Pod{} } -func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) { pod := obj.(*api.Pod) if errs := validation.ValidatePod(pod); len(errs) > 0 { return nil, errors.NewInvalid("pod", pod.ID, errs) } - return apiserver.MakeAsync(func() (interface{}, error) { - if err := rs.registry.UpdatePod(*pod); err != nil { + return apiserver.MakeAsync(func() (runtime.Object, error) { + if err := rs.registry.UpdatePod(pod); err != nil { return nil, err } return rs.registry.GetPod(pod.ID) @@ -235,7 +236,7 @@ func getPodStatus(pod *api.Pod) api.PodStatus { } } -func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) { +func (rs *RegistryStorage) waitForPodRunning(pod *api.Pod) (runtime.Object, error) { for { podObj, err := rs.Get(pod.ID) if err != nil || podObj == nil { diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 26af6077ff..1176725452 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -26,10 +26,10 @@ import ( // Registry is an interface for things that know how to store services. type Registry interface { ListServices() (*api.ServiceList, error) - CreateService(svc api.Service) error + CreateService(svc *api.Service) error GetService(name string) (*api.Service, error) DeleteService(name string) error - UpdateService(svc api.Service) error + UpdateService(svc *api.Service) error WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) // TODO: endpoints and their implementation should be separated, setting endpoints should be diff --git a/pkg/registry/service/storage.go b/pkg/registry/service/storage.go index 44da947b49..579104ff2f 100644 --- a/pkg/registry/service/storage.go +++ b/pkg/registry/service/storage.go @@ -29,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -49,7 +50,7 @@ func NewRegistryStorage(registry Registry, cloud cloudprovider.Interface, machin } } -func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Create(obj runtime.Object) (<-chan runtime.Object, error) { srv := obj.(*api.Service) if errs := validation.ValidateService(srv); len(errs) > 0 { return nil, errors.NewInvalid("service", srv.ID, errs) @@ -57,7 +58,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { srv.CreationTimestamp = util.Now() - return apiserver.MakeAsync(func() (interface{}, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers // correctly no matter what http operations happen. if srv.CreateExternalLoadBalancer { @@ -85,7 +86,7 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { return nil, err } } - err := rs.registry.CreateService(*srv) + err := rs.registry.CreateService(srv) if err != nil { return nil, err } @@ -93,18 +94,18 @@ func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) { }), nil } -func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) { +func (rs *RegistryStorage) Delete(id string) (<-chan runtime.Object, error) { service, err := rs.registry.GetService(id) if err != nil { return nil, err } - return apiserver.MakeAsync(func() (interface{}, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { rs.deleteExternalLoadBalancer(service) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id) }), nil } -func (rs *RegistryStorage) Get(id string) (interface{}, error) { +func (rs *RegistryStorage) Get(id string) (runtime.Object, error) { s, err := rs.registry.GetService(id) if err != nil { return nil, err @@ -112,7 +113,7 @@ func (rs *RegistryStorage) Get(id string) (interface{}, error) { return s, err } -func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { +func (rs *RegistryStorage) List(selector labels.Selector) (runtime.Object, error) { list, err := rs.registry.ListServices() if err != nil { return nil, err @@ -133,7 +134,7 @@ func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion u return rs.registry.WatchServices(label, field, resourceVersion) } -func (rs RegistryStorage) New() interface{} { +func (rs RegistryStorage) New() runtime.Object { return &api.Service{} } @@ -155,14 +156,14 @@ func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.En return result, nil } -func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) { +func (rs *RegistryStorage) Update(obj runtime.Object) (<-chan runtime.Object, error) { srv := obj.(*api.Service) if errs := validation.ValidateService(srv); len(errs) > 0 { return nil, errors.NewInvalid("service", srv.ID, errs) } - return apiserver.MakeAsync(func() (interface{}, error) { + return apiserver.MakeAsync(func() (runtime.Object, error) { // TODO: check to see if external load balancer status changed - err := rs.registry.UpdateService(*srv) + err := rs.registry.UpdateService(srv) if err != nil { return nil, err } diff --git a/pkg/runtime/types.go b/pkg/runtime/types.go index a4b37faeb2..1714873395 100644 --- a/pkg/runtime/types.go +++ b/pkg/runtime/types.go @@ -20,15 +20,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -// All api types must support the Object interface. It's deliberately tiny so that this is not an onerous -// burden. Implement it with a pointer reciever; this will allow us to use the go compiler to check the -// one thing about our objects that it's capable of checking for us. -type Object interface { - // This function is used only to enforce membership. It's never called. - // TODO: Consider mass rename in the future to make it do something useful. - IsAnAPIObject() -} - // Note that the types provided in this file are not versioned and are intended to be // safe to use from within all versions of every API object. diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index d7b08f5126..940c6d42a0 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -73,7 +73,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) } // TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop. - err = e.serviceRegistry.UpdateEndpoints(api.Endpoints{ + err = e.serviceRegistry.UpdateEndpoints(&api.Endpoints{ JSONBase: api.JSONBase{ID: service.ID}, Endpoints: endpoints, }) diff --git a/pkg/tools/decoder.go b/pkg/tools/decoder.go index adc1661d70..0c1778a23c 100644 --- a/pkg/tools/decoder.go +++ b/pkg/tools/decoder.go @@ -22,6 +22,7 @@ import ( "io" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -42,7 +43,7 @@ func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder { // Decode blocks until it can return the next object in the stream. Returns an error // if the stream is closed or an object can't be decoded. -func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) { +func (d *APIEventDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { var got api.WatchEvent err = d.decoder.Decode(&got) if err != nil { diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 0e27458004..7432a49fad 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/coreos/go-etcd/etcd" ) @@ -38,19 +39,6 @@ var ( EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} ) -// Codec provides methods for transforming Etcd values into objects and back. -type Codec interface { - Encode(obj interface{}) (data []byte, err error) - Decode(data []byte) (interface{}, error) - DecodeInto(data []byte, obj interface{}) error -} - -// ResourceVersioner provides methods for managing object modification tracking. -type ResourceVersioner interface { - SetResourceVersion(obj interface{}, version uint64) error - ResourceVersion(obj interface{}) (uint64, error) -} - // EtcdClient is an injectable interface for testing. type EtcdClient interface { AddChild(key, data string, ttl uint64) (*etcd.Response, error) @@ -77,9 +65,9 @@ type EtcdGetSet interface { // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { Client EtcdGetSet - Codec Codec + Codec runtime.Codec // optional, no atomic operations can be performed without this interface - ResourceVersioner ResourceVersioner + ResourceVersioner runtime.ResourceVersioner } // IsEtcdNotFound returns true iff err is an etcd not found error. @@ -151,9 +139,9 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersi v := pv.Elem() for _, node := range nodes { obj := reflect.New(v.Type().Elem()) - err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface()) + err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)) if h.ResourceVersioner != nil { - _ = h.ResourceVersioner.SetResourceVersion(obj.Interface(), node.ModifiedIndex) + _ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex) // being unable to set the version does not prevent the object from being extracted } if err != nil { @@ -167,12 +155,12 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}, resourceVersi // ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. -func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { +func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error { _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) return err } -func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { +func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { response, err := h.Client.Get(key, false, false) if err != nil && !IsEtcdNotFound(err) { @@ -198,7 +186,7 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot } // CreateObj adds a new object at a key unless it already exists. -func (h *EtcdHelper) CreateObj(key string, obj interface{}) error { +func (h *EtcdHelper) CreateObj(key string, obj runtime.Object) error { data, err := h.Codec.Encode(obj) if err != nil { return err @@ -221,7 +209,7 @@ func (h *EtcdHelper) Delete(key string, recursive bool) error { // SetObj marshals obj via json, and stores under key. Will do an // atomic update if obj's ResourceVersion field is set. -func (h *EtcdHelper) SetObj(key string, obj interface{}) error { +func (h *EtcdHelper) SetObj(key string, obj runtime.Object) error { data, err := h.Codec.Encode(obj) if err != nil { return err @@ -240,7 +228,7 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error { // Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. // See the comment for AtomicUpdate for more detail. -type EtcdUpdateFunc func(input interface{}) (output interface{}, err error) +type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error) // AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. // Note, tryUpdate may be called more than once. @@ -248,7 +236,7 @@ type EtcdUpdateFunc func(input interface{}) (output interface{}, err error) // Example: // // h := &util.EtcdHelper{client, encoding, versioning} -// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) { +// err := h.AtomicUpdate("myKey", &MyType{}, func(input runtime.Object) (runtime.Object, error) { // // Before this function is called, currentObj has been reset to etcd's current // // contents for "myKey". // @@ -261,14 +249,14 @@ type EtcdUpdateFunc func(input interface{}) (output interface{}, err error) // return cur, nil // }) // -func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate EtcdUpdateFunc) error { +func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, tryUpdate EtcdUpdateFunc) error { pt := reflect.TypeOf(ptrToType) if pt.Kind() != reflect.Ptr { // Panic is appropriate, because this is a programming error. panic("need ptr to type") } for { - obj := reflect.New(pt.Elem()).Interface() + obj := reflect.New(pt.Elem()).Interface().(runtime.Object) origBody, index, err := h.bodyAndExtractObj(key, obj, true) if err != nil { return err diff --git a/pkg/tools/etcd_tools_watch.go b/pkg/tools/etcd_tools_watch.go index bfb0bafebd..f8a4d9f845 100644 --- a/pkg/tools/etcd_tools_watch.go +++ b/pkg/tools/etcd_tools_watch.go @@ -19,6 +19,7 @@ package tools import ( "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" @@ -27,10 +28,10 @@ import ( // FilterFunc is a predicate which takes an API object and returns true // iff the object should remain in the set. -type FilterFunc func(obj interface{}) bool +type FilterFunc func(obj runtime.Object) bool // Everything is a FilterFunc which accepts all objects. -func Everything(interface{}) bool { +func Everything(runtime.Object) bool { return true } @@ -59,7 +60,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, // change or wrap the serialized etcd object. // // startTime := time.Now() -// helper.WatchAndTransform(key, version, func(input interface{}) (interface{}, error) { +// helper.WatchAndTransform(key, version, func(input runtime.Object) (runtime.Object, error) { // value := input.(TimeAwareValue) // value.Since = startTime // return value, nil @@ -72,12 +73,12 @@ func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, trans } // TransformFunc attempts to convert an object to another object for use with a watcher. -type TransformFunc func(interface{}) (interface{}, error) +type TransformFunc func(runtime.Object) (runtime.Object, error) // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { - encoding Codec - versioner ResourceVersioner + encoding runtime.Codec + versioner runtime.ResourceVersioner transform TransformFunc list bool // If we're doing a recursive watch, should be true. @@ -98,7 +99,7 @@ type etcdWatcher struct { // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec, versioner ResourceVersioner, transform TransformFunc) *etcdWatcher { +func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, @@ -192,7 +193,7 @@ func (w *etcdWatcher) translate() { } } -func (w *etcdWatcher) decodeObject(data []byte, index uint64) (interface{}, error) { +func (w *etcdWatcher) decodeObject(data []byte, index uint64) (runtime.Object, error) { obj, err := w.encoding.Decode(data) if err != nil { return nil, err @@ -260,7 +261,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { } curObjPasses := w.filter(curObj) oldObjPasses := false - var oldObj interface{} + var oldObj runtime.Object if res.PrevNode != nil && res.PrevNode.Value != "" { // Ignore problems reading the old object. if oldObj, err = w.decodeObject([]byte(res.PrevNode.Value), res.PrevNode.ModifiedIndex); err == nil { diff --git a/pkg/watch/iowatcher.go b/pkg/watch/iowatcher.go index a44eef698d..8e359e63dc 100644 --- a/pkg/watch/iowatcher.go +++ b/pkg/watch/iowatcher.go @@ -19,6 +19,7 @@ package watch import ( "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -27,7 +28,7 @@ type Decoder interface { // Decode should return the type of event, the decoded object, or an error. // An error will cause StreamWatcher to call Close(). Decode should block until // it has data or an error occurs. - Decode() (action EventType, object interface{}, err error) + Decode() (action EventType, object runtime.Object, err error) // Close should close the underlying io.Reader, signalling to the source of // the stream that it is no longer being watched. Close() must cause any diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 4935e204ff..0c87111ee1 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -18,6 +18,8 @@ package watch import ( "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) // Mux distributes event notifications among any number of watchers. Every event @@ -88,7 +90,7 @@ func (m *Mux) closeAll() { } // Action distributes the given event among all watchers. -func (m *Mux) Action(action EventType, obj interface{}) { +func (m *Mux) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 80e5c2865c..a9433a4bea 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -22,16 +22,19 @@ import ( "testing" ) +type myType struct { + ID string + Value string +} + +func (*myType) IsAnAPIObject() {} + func TestMux(t *testing.T) { - type myType struct { - ID string - Value string - } table := []Event{ - {Added, myType{"foo", "hello world 1"}}, - {Added, myType{"bar", "hello world 2"}}, - {Modified, myType{"foo", "goodbye world 3"}}, - {Deleted, myType{"bar", "hello world 4"}}, + {Added, &myType{"foo", "hello world 1"}}, + {Added, &myType{"bar", "hello world 2"}}, + {Modified, &myType{"foo", "goodbye world 3"}}, + {Deleted, &myType{"bar", "hello world 4"}}, } // The mux we're testing diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 59ddfbc579..bed7163f14 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -18,6 +18,8 @@ package watch import ( "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) // Interface can be implemented by anything that knows how to watch and report changes. @@ -47,7 +49,7 @@ type Event struct { // If Type == Deleted, then this is the state of the object // immediately before deletion. - Object interface{} + Object runtime.Object } // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. @@ -78,21 +80,21 @@ func (f *FakeWatcher) ResultChan() <-chan Event { } // Add sends an add event. -func (f *FakeWatcher) Add(obj interface{}) { +func (f *FakeWatcher) Add(obj runtime.Object) { f.result <- Event{Added, obj} } // Modify sends a modify event. -func (f *FakeWatcher) Modify(obj interface{}) { +func (f *FakeWatcher) Modify(obj runtime.Object) { f.result <- Event{Modified, obj} } // Delete sends a delete event. -func (f *FakeWatcher) Delete(lastValue interface{}) { +func (f *FakeWatcher) Delete(lastValue runtime.Object) { f.result <- Event{Deleted, lastValue} } // Action sends an event of the requested type, for table-based testing. -func (f *FakeWatcher) Action(action EventType, obj interface{}) { +func (f *FakeWatcher) Action(action EventType, obj runtime.Object) { f.result <- Event{action, obj} }