mirror of https://github.com/k3s-io/k3s
Enforce unique constraint at namespace boundary in etcd, make client and server namespace aware
parent
b63974bd21
commit
085ca40291
|
@ -61,6 +61,8 @@ var (
|
|||
imageName = flag.String("image", "", "Image used when updating a replicationController. Will apply to the first container in the pod template.")
|
||||
clientConfig = &client.Config{}
|
||||
openBrowser = flag.Bool("open_browser", true, "If true, and -proxy is specified, open a browser pointed at the Kubernetes UX. Default true.")
|
||||
ns = flag.String("ns", "", "If present, the namespace scope for this request.")
|
||||
nsFile = flag.String("ns_file", os.Getenv("HOME")+"/.kubernetes_ns", "Path to the namespace file")
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -97,6 +99,9 @@ on the given image:
|
|||
|
||||
kubecfg [OPTIONS] [-p <port spec>] run <image> <replicas> <controller>
|
||||
|
||||
Manage namespace:
|
||||
kubecfg [OPTIONS] ns [<namespace>]
|
||||
|
||||
Options:
|
||||
`, prettyWireStorage())
|
||||
flag.PrintDefaults()
|
||||
|
@ -178,8 +183,18 @@ func main() {
|
|||
clientConfig.Host = os.Getenv("KUBERNETES_MASTER")
|
||||
}
|
||||
|
||||
// TODO: get the namespace context when kubecfg ns is completed
|
||||
ctx := api.NewContext()
|
||||
// Load namespace information for requests
|
||||
// Check if the namespace was overriden by the -ns argument
|
||||
ctx := api.NewDefaultContext()
|
||||
if len(*ns) > 0 {
|
||||
ctx = api.WithNamespace(ctx, *ns)
|
||||
} else {
|
||||
nsInfo, err := kubecfg.LoadNamespaceInfo(*nsFile)
|
||||
if err != nil {
|
||||
glog.Fatalf("Error loading current namespace: %v", err)
|
||||
}
|
||||
ctx = api.WithNamespace(ctx, nsInfo.Namespace)
|
||||
}
|
||||
|
||||
if clientConfig.Host == "" {
|
||||
// TODO: eventually apiserver should start on 443 and be secure by default
|
||||
|
@ -255,7 +270,7 @@ func main() {
|
|||
}
|
||||
method := flag.Arg(0)
|
||||
|
||||
matchFound := executeAPIRequest(ctx, method, kubeClient) || executeControllerRequest(ctx, method, kubeClient)
|
||||
matchFound := executeAPIRequest(ctx, method, kubeClient) || executeControllerRequest(ctx, method, kubeClient) || executeNamespaceRequest(method, kubeClient)
|
||||
if matchFound == false {
|
||||
glog.Fatalf("Unknown command %s", method)
|
||||
}
|
||||
|
@ -347,7 +362,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool {
|
|||
glog.Fatalf("usage: kubecfg [OPTIONS] %s <%s>", method, prettyWireStorage())
|
||||
}
|
||||
case "update":
|
||||
obj, err := c.Verb("GET").Path(path).Do().Get()
|
||||
obj, err := c.Verb("GET").Namespace(api.Namespace(ctx)).Path(path).Do().Get()
|
||||
if err != nil {
|
||||
glog.Fatalf("error obtaining resource version for update: %v", err)
|
||||
}
|
||||
|
@ -373,7 +388,7 @@ func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
r := c.Verb(verb).Path(path)
|
||||
r := c.Verb(verb).Namespace(api.Namespace(ctx)).Path(path)
|
||||
if len(*selector) > 0 {
|
||||
r.ParseSelectorParam("labels", *selector)
|
||||
}
|
||||
|
@ -464,6 +479,32 @@ func executeControllerRequest(ctx api.Context, method string, c *client.Client)
|
|||
return true
|
||||
}
|
||||
|
||||
// executeNamespaceRequest handles client operations for namespaces
|
||||
func executeNamespaceRequest(method string, c *client.Client) bool {
|
||||
var err error
|
||||
var ns *kubecfg.NamespaceInfo
|
||||
switch method {
|
||||
case "ns":
|
||||
args := flag.Args()
|
||||
switch len(args) {
|
||||
case 1:
|
||||
ns, err = kubecfg.LoadNamespaceInfo(*nsFile)
|
||||
case 2:
|
||||
ns = &kubecfg.NamespaceInfo{Namespace: args[1]}
|
||||
err = kubecfg.SaveNamespaceInfo(*nsFile, ns)
|
||||
default:
|
||||
glog.Fatalf("usage: kubecfg ns [<namespace>]")
|
||||
}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
if err != nil {
|
||||
glog.Fatalf("Error: %v", err)
|
||||
}
|
||||
fmt.Printf("Using namespace %s\n", ns.Namespace)
|
||||
return true
|
||||
}
|
||||
|
||||
func humanReadablePrinter() *kubecfg.HumanReadablePrinter {
|
||||
printer := kubecfg.NewHumanReadablePrinter()
|
||||
// Add Handler calls here to support additional types
|
||||
|
|
|
@ -63,6 +63,12 @@ func NamespaceFrom(ctx Context) (string, bool) {
|
|||
return namespace, ok
|
||||
}
|
||||
|
||||
// Namespace returns the value of the namespace key on the ctx, or the empty string if none
|
||||
func Namespace(ctx Context) string {
|
||||
namespace, _ := NamespaceFrom(ctx)
|
||||
return namespace
|
||||
}
|
||||
|
||||
// ValidNamespace returns false if the namespace on the context differs from the resource. If the resource has no namespace, it is set to the value in the context.
|
||||
func ValidNamespace(ctx Context, resource *TypeMeta) bool {
|
||||
ns, ok := NamespaceFrom(ctx)
|
||||
|
@ -71,3 +77,12 @@ func ValidNamespace(ctx Context, resource *TypeMeta) bool {
|
|||
}
|
||||
return ns == resource.Namespace && ok
|
||||
}
|
||||
|
||||
// WithNamespaceDefaultIfNone returns a context whose namespace is the default if and only if the parent context has no namespace value
|
||||
func WithNamespaceDefaultIfNone(parent Context) Context {
|
||||
namespace, ok := NamespaceFrom(parent)
|
||||
if !ok || len(namespace) == 0 {
|
||||
return WithNamespace(parent, NamespaceDefault)
|
||||
}
|
||||
return parent
|
||||
}
|
||||
|
|
|
@ -59,4 +59,10 @@ func TestValidNamespace(t *testing.T) {
|
|||
if api.ValidNamespace(ctx, &resource.TypeMeta) {
|
||||
t.Errorf("Expected error that resource and context errors do not match since context has no namespace")
|
||||
}
|
||||
|
||||
ctx = api.NewContext()
|
||||
ns := api.Namespace(ctx)
|
||||
if ns != "" {
|
||||
t.Errorf("Expected the empty string")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,10 +100,17 @@ func curry(f func(runtime.Object, *http.Request) error, req *http.Request) func(
|
|||
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true
|
||||
// labels=<label-selector> Used for filtering list operations
|
||||
func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
|
||||
// TODO for now, we perform all operations in the default namespace
|
||||
ctx := api.NewDefaultContext()
|
||||
ctx := api.NewContext()
|
||||
sync := req.URL.Query().Get("sync") == "true"
|
||||
timeout := parseTimeout(req.URL.Query().Get("timeout"))
|
||||
// TODO for now, we pull namespace from query parameter, but according to spec, it must go in resource path in future PR
|
||||
// if a namespace if specified, it's always used.
|
||||
// for list/watch operations, a namespace is not required if omitted.
|
||||
// for all other operations, if namespace is omitted, we will default to default namespace.
|
||||
namespace := req.URL.Query().Get("namespace")
|
||||
if len(namespace) > 0 {
|
||||
ctx = api.WithNamespace(ctx, namespace)
|
||||
}
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
switch len(parts) {
|
||||
|
@ -129,7 +136,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||
}
|
||||
writeJSON(http.StatusOK, h.codec, list, w)
|
||||
case 2:
|
||||
item, err := storage.Get(ctx, parts[1])
|
||||
item, err := storage.Get(api.WithNamespaceDefaultIfNone(ctx), parts[1])
|
||||
if err != nil {
|
||||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
|
@ -159,7 +166,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
}
|
||||
out, err := storage.Create(ctx, obj)
|
||||
out, err := storage.Create(api.WithNamespaceDefaultIfNone(ctx), obj)
|
||||
if err != nil {
|
||||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
|
@ -172,7 +179,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||
notFound(w, req)
|
||||
return
|
||||
}
|
||||
out, err := storage.Delete(ctx, parts[1])
|
||||
out, err := storage.Delete(api.WithNamespaceDefaultIfNone(ctx), parts[1])
|
||||
if err != nil {
|
||||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
|
@ -196,7 +203,7 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
}
|
||||
out, err := storage.Update(ctx, obj)
|
||||
out, err := storage.Update(api.WithNamespaceDefaultIfNone(ctx), obj)
|
||||
if err != nil {
|
||||
errorJSON(err, h.codec, w)
|
||||
return
|
||||
|
|
|
@ -104,26 +104,26 @@ type Client struct {
|
|||
// ListPods takes a selector, and returns the list of pods that match that selector.
|
||||
func (c *Client) ListPods(ctx api.Context, selector labels.Selector) (result *api.PodList, err error) {
|
||||
result = &api.PodList{}
|
||||
err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("pods").SelectorParam("labels", selector).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// GetPod takes the id of the pod, and returns the corresponding Pod object, and an error if it occurs
|
||||
func (c *Client) GetPod(ctx api.Context, id string) (result *api.Pod, err error) {
|
||||
result = &api.Pod{}
|
||||
err = c.Get().Path("pods").Path(id).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("pods").Path(id).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// DeletePod takes the id of the pod, and returns an error if one occurs
|
||||
func (c *Client) DeletePod(ctx api.Context, id string) error {
|
||||
return c.Delete().Path("pods").Path(id).Do().Error()
|
||||
return c.Delete().Namespace(api.Namespace(ctx)).Path("pods").Path(id).Do().Error()
|
||||
}
|
||||
|
||||
// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
|
||||
func (c *Client) CreatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) {
|
||||
result = &api.Pod{}
|
||||
err = c.Post().Path("pods").Body(pod).Do().Into(result)
|
||||
err = c.Post().Namespace(api.Namespace(ctx)).Path("pods").Body(pod).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -134,28 +134,28 @@ func (c *Client) UpdatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err
|
|||
err = fmt.Errorf("invalid update object, missing resource version: %v", pod)
|
||||
return
|
||||
}
|
||||
err = c.Put().Path("pods").Path(pod.ID).Body(pod).Do().Into(result)
|
||||
err = c.Put().Namespace(api.Namespace(ctx)).Path("pods").Path(pod.ID).Body(pod).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector.
|
||||
func (c *Client) ListReplicationControllers(ctx api.Context, selector labels.Selector) (result *api.ReplicationControllerList, err error) {
|
||||
result = &api.ReplicationControllerList{}
|
||||
err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("replicationControllers").SelectorParam("labels", selector).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// GetReplicationController returns information about a particular replication controller.
|
||||
func (c *Client) GetReplicationController(ctx api.Context, id string) (result *api.ReplicationController, err error) {
|
||||
result = &api.ReplicationController{}
|
||||
err = c.Get().Path("replicationControllers").Path(id).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("replicationControllers").Path(id).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// CreateReplicationController creates a new replication controller.
|
||||
func (c *Client) CreateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) {
|
||||
result = &api.ReplicationController{}
|
||||
err = c.Post().Path("replicationControllers").Body(controller).Do().Into(result)
|
||||
err = c.Post().Namespace(api.Namespace(ctx)).Path("replicationControllers").Body(controller).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -166,18 +166,19 @@ func (c *Client) UpdateReplicationController(ctx api.Context, controller *api.Re
|
|||
err = fmt.Errorf("invalid update object, missing resource version: %v", controller)
|
||||
return
|
||||
}
|
||||
err = c.Put().Path("replicationControllers").Path(controller.ID).Body(controller).Do().Into(result)
|
||||
err = c.Put().Namespace(api.Namespace(ctx)).Path("replicationControllers").Path(controller.ID).Body(controller).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteReplicationController deletes an existing replication controller.
|
||||
func (c *Client) DeleteReplicationController(ctx api.Context, id string) error {
|
||||
return c.Delete().Path("replicationControllers").Path(id).Do().Error()
|
||||
return c.Delete().Namespace(api.Namespace(ctx)).Path("replicationControllers").Path(id).Do().Error()
|
||||
}
|
||||
|
||||
// WatchReplicationControllers returns a watch.Interface that watches the requested controllers.
|
||||
func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
return c.Get().
|
||||
Namespace(api.Namespace(ctx)).
|
||||
Path("watch").
|
||||
Path("replicationControllers").
|
||||
Param("resourceVersion", resourceVersion).
|
||||
|
@ -189,21 +190,21 @@ func (c *Client) WatchReplicationControllers(ctx api.Context, label, field label
|
|||
// ListServices takes a selector, and returns the list of services that match that selector
|
||||
func (c *Client) ListServices(ctx api.Context, selector labels.Selector) (result *api.ServiceList, err error) {
|
||||
result = &api.ServiceList{}
|
||||
err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("services").SelectorParam("labels", selector).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// GetService returns information about a particular service.
|
||||
func (c *Client) GetService(ctx api.Context, id string) (result *api.Service, err error) {
|
||||
result = &api.Service{}
|
||||
err = c.Get().Path("services").Path(id).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("services").Path(id).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// CreateService creates a new service.
|
||||
func (c *Client) CreateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) {
|
||||
result = &api.Service{}
|
||||
err = c.Post().Path("services").Body(svc).Do().Into(result)
|
||||
err = c.Post().Namespace(api.Namespace(ctx)).Path("services").Body(svc).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -214,18 +215,19 @@ func (c *Client) UpdateService(ctx api.Context, svc *api.Service) (result *api.S
|
|||
err = fmt.Errorf("invalid update object, missing resource version: %v", svc)
|
||||
return
|
||||
}
|
||||
err = c.Put().Path("services").Path(svc.ID).Body(svc).Do().Into(result)
|
||||
err = c.Put().Namespace(api.Namespace(ctx)).Path("services").Path(svc.ID).Body(svc).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteService deletes an existing service.
|
||||
func (c *Client) DeleteService(ctx api.Context, id string) error {
|
||||
return c.Delete().Path("services").Path(id).Do().Error()
|
||||
return c.Delete().Namespace(api.Namespace(ctx)).Path("services").Path(id).Do().Error()
|
||||
}
|
||||
|
||||
// WatchServices returns a watch.Interface that watches the requested services.
|
||||
func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
return c.Get().
|
||||
Namespace(api.Namespace(ctx)).
|
||||
Path("watch").
|
||||
Path("services").
|
||||
Param("resourceVersion", resourceVersion).
|
||||
|
@ -237,20 +239,21 @@ func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, re
|
|||
// ListEndpoints takes a selector, and returns the list of endpoints that match that selector
|
||||
func (c *Client) ListEndpoints(ctx api.Context, selector labels.Selector) (result *api.EndpointsList, err error) {
|
||||
result = &api.EndpointsList{}
|
||||
err = c.Get().Path("endpoints").SelectorParam("labels", selector).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("endpoints").SelectorParam("labels", selector).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// GetEndpoints returns information about the endpoints for a particular service.
|
||||
func (c *Client) GetEndpoints(ctx api.Context, id string) (result *api.Endpoints, err error) {
|
||||
result = &api.Endpoints{}
|
||||
err = c.Get().Path("endpoints").Path(id).Do().Into(result)
|
||||
err = c.Get().Namespace(api.Namespace(ctx)).Path("endpoints").Path(id).Do().Into(result)
|
||||
return
|
||||
}
|
||||
|
||||
// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service.
|
||||
func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
return c.Get().
|
||||
Namespace(api.Namespace(ctx)).
|
||||
Path("watch").
|
||||
Path("endpoints").
|
||||
Param("resourceVersion", resourceVersion).
|
||||
|
@ -261,7 +264,7 @@ func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, r
|
|||
|
||||
func (c *Client) CreateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) {
|
||||
result := &api.Endpoints{}
|
||||
err := c.Post().Path("endpoints").Body(endpoints).Do().Into(result)
|
||||
err := c.Post().Namespace(api.Namespace(ctx)).Path("endpoints").Body(endpoints).Do().Into(result)
|
||||
return result, err
|
||||
}
|
||||
|
||||
|
@ -271,6 +274,7 @@ func (c *Client) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*ap
|
|||
return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints)
|
||||
}
|
||||
err := c.Put().
|
||||
Namespace(api.Namespace(ctx)).
|
||||
Path("endpoints").
|
||||
Path(endpoints.ID).
|
||||
Body(endpoints).
|
||||
|
|
|
@ -74,6 +74,14 @@ func (r *Request) Sync(sync bool) *Request {
|
|||
return r
|
||||
}
|
||||
|
||||
// Namespace applies the namespace scope to a request
|
||||
func (r *Request) Namespace(namespace string) *Request {
|
||||
if len(namespace) > 0 {
|
||||
return r.setParam("namespace", namespace)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// AbsPath overwrites an existing path with the path parameter.
|
||||
func (r *Request) AbsPath(path string) *Request {
|
||||
if r.err != nil {
|
||||
|
@ -196,6 +204,7 @@ func (r *Request) finalURL() string {
|
|||
for key, value := range r.params {
|
||||
query.Add(key, value)
|
||||
}
|
||||
|
||||
// sync and timeout are handled specially here, to allow setting them
|
||||
// in any order.
|
||||
if r.sync {
|
||||
|
|
|
@ -215,7 +215,7 @@ func TestCreateReplica(t *testing.T) {
|
|||
Labels: controllerSpec.DesiredState.PodTemplate.Labels,
|
||||
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
|
||||
}
|
||||
fakeHandler.ValidateRequest(t, makeURL("/pods"), "POST", nil)
|
||||
fakeHandler.ValidateRequest(t, makeURL("/pods?namespace=default"), "POST", nil)
|
||||
actualPod := api.Pod{}
|
||||
if err := json.Unmarshal([]byte(fakeHandler.RequestBody), &actualPod); err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
|
|
|
@ -60,6 +60,10 @@ type AuthInfo struct {
|
|||
Insecure *bool
|
||||
}
|
||||
|
||||
type NamespaceInfo struct {
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// LoadAuthInfo parses an AuthInfo object from a file path. It prompts user and creates file if it doesn't exist.
|
||||
func LoadAuthInfo(path string, r io.Reader) (*AuthInfo, error) {
|
||||
var auth AuthInfo
|
||||
|
@ -84,6 +88,35 @@ func LoadAuthInfo(path string, r io.Reader) (*AuthInfo, error) {
|
|||
return &auth, err
|
||||
}
|
||||
|
||||
// LoadNamespaceInfo parses a NamespaceInfo object from a file path. It creates a file at the specified path if it doesn't exist with the default namespace.
|
||||
func LoadNamespaceInfo(path string) (*NamespaceInfo, error) {
|
||||
var ns NamespaceInfo
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
ns.Namespace = api.NamespaceDefault
|
||||
err = SaveNamespaceInfo(path, &ns)
|
||||
return &ns, err
|
||||
}
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = json.Unmarshal(data, &ns)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ns, err
|
||||
}
|
||||
|
||||
// SaveNamespaceInfo saves a NamespaceInfo object at the specified file path.
|
||||
func SaveNamespaceInfo(path string, ns *NamespaceInfo) error {
|
||||
if !util.IsDNSLabel(ns.Namespace) {
|
||||
return fmt.Errorf("Namespace %s is not a valid DNS Label", ns.Namespace)
|
||||
}
|
||||
data, err := json.Marshal(ns)
|
||||
err = ioutil.WriteFile(path, data, 0600)
|
||||
return err
|
||||
}
|
||||
|
||||
// Update performs a rolling update of a collection of pods.
|
||||
// 'name' points to a replication controller.
|
||||
// 'client' is used for updating pods.
|
||||
|
|
|
@ -240,6 +240,58 @@ func TestCloudCfgDeleteControllerWithReplicas(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLoadNamespaceInfo(t *testing.T) {
|
||||
loadNamespaceInfoTests := []struct {
|
||||
nsData string
|
||||
nsInfo *NamespaceInfo
|
||||
}{
|
||||
{
|
||||
`{"Namespace":"test"}`,
|
||||
&NamespaceInfo{Namespace: "test"},
|
||||
},
|
||||
{
|
||||
"", nil,
|
||||
},
|
||||
{
|
||||
"missing",
|
||||
&NamespaceInfo{Namespace: "default"},
|
||||
},
|
||||
}
|
||||
for _, loadNamespaceInfoTest := range loadNamespaceInfoTests {
|
||||
tt := loadNamespaceInfoTest
|
||||
nsfile, err := ioutil.TempFile("", "testNamespaceInfo")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if tt.nsData != "missing" {
|
||||
defer os.Remove(nsfile.Name())
|
||||
defer nsfile.Close()
|
||||
_, err := nsfile.WriteString(tt.nsData)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
} else {
|
||||
nsfile.Close()
|
||||
os.Remove(nsfile.Name())
|
||||
}
|
||||
nsInfo, err := LoadNamespaceInfo(nsfile.Name())
|
||||
if len(tt.nsData) == 0 && tt.nsData != "missing" {
|
||||
if err == nil {
|
||||
t.Error("LoadNamespaceInfo didn't fail on an empty file")
|
||||
}
|
||||
continue
|
||||
}
|
||||
if tt.nsData != "missing" {
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v, %v", tt.nsData, err)
|
||||
}
|
||||
if !reflect.DeepEqual(nsInfo, tt.nsInfo) {
|
||||
t.Errorf("Expected %v, got %v", tt.nsInfo, nsInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAuthInfo(t *testing.T) {
|
||||
loadAuthInfoTests := []struct {
|
||||
authData string
|
||||
|
|
|
@ -34,6 +34,17 @@ import (
|
|||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
// PodPath is the path to pod resources in etcd
|
||||
PodPath string = "/registry/pods"
|
||||
// ControllerPath is the path to controller resources in etcd
|
||||
ControllerPath string = "/registry/controllers"
|
||||
// ServicePath is the path to service resources in etcd
|
||||
ServicePath string = "/registry/services/specs"
|
||||
// ServiceEndpointPath is the path to service endpoints resources in etcd
|
||||
ServiceEndpointPath string = "/registry/services/endpoints"
|
||||
)
|
||||
|
||||
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
|
||||
// kubelet (and vice versa)
|
||||
|
||||
|
@ -52,8 +63,38 @@ func NewRegistry(helper tools.EtcdHelper, manifestFactory pod.ManifestFactory) *
|
|||
return registry
|
||||
}
|
||||
|
||||
func makePodKey(podID string) string {
|
||||
return "/registry/pods/" + podID
|
||||
// MakeEtcdListKey constructs etcd paths to resource directories enforcing namespace rules
|
||||
func MakeEtcdListKey(ctx api.Context, prefix string) string {
|
||||
key := prefix
|
||||
ns, ok := api.NamespaceFrom(ctx)
|
||||
if ok && len(ns) > 0 {
|
||||
key = key + "/" + ns
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// MakeEtcdItemKey constructs etcd paths to a resource relative to prefix enforcing namespace rules. If no namespace is on context, it errors.
|
||||
func MakeEtcdItemKey(ctx api.Context, prefix string, id string) (string, error) {
|
||||
key := MakeEtcdListKey(ctx, prefix)
|
||||
ns, ok := api.NamespaceFrom(ctx)
|
||||
if !ok || len(ns) == 0 {
|
||||
return "", fmt.Errorf("Invalid request. Unable to address and item without a namespace on context")
|
||||
}
|
||||
if len(id) == 0 {
|
||||
return "", fmt.Errorf("Invalid request. Id parameter required")
|
||||
}
|
||||
key = key + "/" + id
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// makePodListKey constructs etcd paths to pod directories enforcing namespace rules.
|
||||
func makePodListKey(ctx api.Context) string {
|
||||
return MakeEtcdListKey(ctx, PodPath)
|
||||
}
|
||||
|
||||
// makePodKey constructs etcd paths to pod items enforcing namespace rules.
|
||||
func makePodKey(ctx api.Context, id string) (string, error) {
|
||||
return MakeEtcdItemKey(ctx, PodPath, id)
|
||||
}
|
||||
|
||||
// parseWatchResourceVersion takes a resource version argument and converts it to
|
||||
|
@ -81,7 +122,8 @@ func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.Pod
|
|||
// ListPodsPredicate obtains a list of pods that match filter.
|
||||
func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error) {
|
||||
allPods := api.PodList{}
|
||||
err := r.ExtractToList("/registry/pods", &allPods)
|
||||
key := makePodListKey(ctx)
|
||||
err := r.ExtractToList(key, &allPods)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -105,7 +147,8 @@ func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter fun
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.WatchList("/registry/pods", version, func(obj runtime.Object) bool {
|
||||
key := makePodListKey(ctx)
|
||||
return r.WatchList(key, version, func(obj runtime.Object) bool {
|
||||
switch t := obj.(type) {
|
||||
case *api.Pod:
|
||||
return filter(t)
|
||||
|
@ -117,10 +160,14 @@ func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter fun
|
|||
}
|
||||
|
||||
// GetPod gets a specific pod specified by its ID.
|
||||
func (r *Registry) GetPod(ctx api.Context, podID string) (*api.Pod, error) {
|
||||
func (r *Registry) GetPod(ctx api.Context, id string) (*api.Pod, error) {
|
||||
var pod api.Pod
|
||||
if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil {
|
||||
return nil, etcderr.InterpretGetError(err, "pod", podID)
|
||||
key, err := makePodKey(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = r.ExtractObj(key, &pod, false); err != nil {
|
||||
return nil, etcderr.InterpretGetError(err, "pod", id)
|
||||
}
|
||||
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
|
||||
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
|
||||
|
@ -141,19 +188,26 @@ func (r *Registry) CreatePod(ctx api.Context, pod *api.Pod) error {
|
|||
// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
|
||||
pod.DesiredState.Status = api.PodRunning
|
||||
pod.DesiredState.Host = ""
|
||||
err := r.CreateObj(makePodKey(pod.ID), pod, 0)
|
||||
key, err := makePodKey(ctx, pod.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.CreateObj(key, pod, 0)
|
||||
return etcderr.InterpretCreateError(err, "pod", pod.ID)
|
||||
}
|
||||
|
||||
// ApplyBinding implements binding's registry
|
||||
func (r *Registry) ApplyBinding(ctx api.Context, binding *api.Binding) error {
|
||||
return etcderr.InterpretCreateError(r.assignPod(binding.PodID, binding.Host), "binding", "")
|
||||
return etcderr.InterpretCreateError(r.assignPod(ctx, binding.PodID, binding.Host), "binding", "")
|
||||
}
|
||||
|
||||
// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'.
|
||||
// Returns the current state of the pod, or an error.
|
||||
func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
|
||||
podKey := makePodKey(podID)
|
||||
func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
|
||||
podKey, err := makePodKey(ctx, podID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj runtime.Object) (runtime.Object, error) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
|
@ -170,8 +224,8 @@ func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *ap
|
|||
}
|
||||
|
||||
// assignPod assigns the given pod to the given machine.
|
||||
func (r *Registry) assignPod(podID string, machine string) error {
|
||||
finalPod, err := r.setPodHostTo(podID, "", machine)
|
||||
func (r *Registry) assignPod(ctx api.Context, podID string, machine string) error {
|
||||
finalPod, err := r.setPodHostTo(ctx, podID, "", machine)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -192,7 +246,7 @@ func (r *Registry) assignPod(podID string, machine string) error {
|
|||
if err != nil {
|
||||
// Put the pod's host back the way it was. This is a terrible hack that
|
||||
// won't be needed if we convert this to a rectification loop.
|
||||
if _, err2 := r.setPodHostTo(podID, machine, ""); err2 != nil {
|
||||
if _, err2 := r.setPodHostTo(ctx, podID, machine, ""); err2 != nil {
|
||||
glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
|
||||
}
|
||||
}
|
||||
|
@ -201,8 +255,11 @@ func (r *Registry) assignPod(podID string, machine string) error {
|
|||
|
||||
func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
|
||||
var podOut api.Pod
|
||||
podKey := makePodKey(pod.ID)
|
||||
err := r.EtcdHelper.ExtractObj(podKey, &podOut, false)
|
||||
podKey, err := makePodKey(ctx, pod.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.EtcdHelper.ExtractObj(podKey, &podOut, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -243,8 +300,11 @@ func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
|
|||
// DeletePod deletes an existing pod specified by its ID.
|
||||
func (r *Registry) DeletePod(ctx api.Context, podID string) error {
|
||||
var pod api.Pod
|
||||
podKey := makePodKey(podID)
|
||||
err := r.ExtractObj(podKey, &pod, false)
|
||||
podKey, err := makePodKey(ctx, podID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.ExtractObj(podKey, &pod, false)
|
||||
if err != nil {
|
||||
return etcderr.InterpretDeleteError(err, "pod", podID)
|
||||
}
|
||||
|
@ -286,7 +346,8 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
|
|||
// ListControllers obtains a list of ReplicationControllers.
|
||||
func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
|
||||
controllers := &api.ReplicationControllerList{}
|
||||
err := r.ExtractToList("/registry/controllers", controllers)
|
||||
key := makeControllerListKey(ctx)
|
||||
err := r.ExtractToList(key, controllers)
|
||||
return controllers, err
|
||||
}
|
||||
|
||||
|
@ -296,18 +357,28 @@ func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (wa
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.WatchList("/registry/controllers", version, tools.Everything)
|
||||
key := makeControllerListKey(ctx)
|
||||
return r.WatchList(key, version, tools.Everything)
|
||||
}
|
||||
|
||||
func makeControllerKey(id string) string {
|
||||
return "/registry/controllers/" + id
|
||||
// makeControllerListKey constructs etcd paths to controller directories enforcing namespace rules.
|
||||
func makeControllerListKey(ctx api.Context) string {
|
||||
return MakeEtcdListKey(ctx, ControllerPath)
|
||||
}
|
||||
|
||||
// makeControllerKey constructs etcd paths to controller items enforcing namespace rules.
|
||||
func makeControllerKey(ctx api.Context, id string) (string, error) {
|
||||
return MakeEtcdItemKey(ctx, ControllerPath, id)
|
||||
}
|
||||
|
||||
// GetController gets a specific ReplicationController specified by its ID.
|
||||
func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) {
|
||||
var controller api.ReplicationController
|
||||
key := makeControllerKey(controllerID)
|
||||
err := r.ExtractObj(key, &controller, false)
|
||||
key, err := makeControllerKey(ctx, controllerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = r.ExtractObj(key, &controller, false)
|
||||
if err != nil {
|
||||
return nil, etcderr.InterpretGetError(err, "replicationController", controllerID)
|
||||
}
|
||||
|
@ -316,45 +387,69 @@ func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.Rep
|
|||
|
||||
// CreateController creates a new ReplicationController.
|
||||
func (r *Registry) CreateController(ctx api.Context, controller *api.ReplicationController) error {
|
||||
err := r.CreateObj(makeControllerKey(controller.ID), controller, 0)
|
||||
key, err := makeControllerKey(ctx, controller.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.CreateObj(key, controller, 0)
|
||||
return etcderr.InterpretCreateError(err, "replicationController", controller.ID)
|
||||
}
|
||||
|
||||
// UpdateController replaces an existing ReplicationController.
|
||||
func (r *Registry) UpdateController(ctx api.Context, controller *api.ReplicationController) error {
|
||||
err := r.SetObj(makeControllerKey(controller.ID), controller)
|
||||
key, err := makeControllerKey(ctx, controller.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.SetObj(key, controller)
|
||||
return etcderr.InterpretUpdateError(err, "replicationController", controller.ID)
|
||||
}
|
||||
|
||||
// DeleteController deletes a ReplicationController specified by its ID.
|
||||
func (r *Registry) DeleteController(ctx api.Context, controllerID string) error {
|
||||
key := makeControllerKey(controllerID)
|
||||
err := r.Delete(key, false)
|
||||
key, err := makeControllerKey(ctx, controllerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.Delete(key, false)
|
||||
return etcderr.InterpretDeleteError(err, "replicationController", controllerID)
|
||||
}
|
||||
|
||||
func makeServiceKey(name string) string {
|
||||
return "/registry/services/specs/" + name
|
||||
// makePodListKey constructs etcd paths to service directories enforcing namespace rules.
|
||||
func makeServiceListKey(ctx api.Context) string {
|
||||
return MakeEtcdListKey(ctx, ServicePath)
|
||||
}
|
||||
|
||||
// makePodKey constructs etcd paths to service items enforcing namespace rules.
|
||||
func makeServiceKey(ctx api.Context, name string) (string, error) {
|
||||
return MakeEtcdItemKey(ctx, ServicePath, name)
|
||||
}
|
||||
|
||||
// ListServices obtains a list of Services.
|
||||
func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) {
|
||||
list := &api.ServiceList{}
|
||||
err := r.ExtractToList("/registry/services/specs", list)
|
||||
err := r.ExtractToList(makeServiceListKey(ctx), list)
|
||||
return list, err
|
||||
}
|
||||
|
||||
// CreateService creates a new Service.
|
||||
func (r *Registry) CreateService(ctx api.Context, svc *api.Service) error {
|
||||
err := r.CreateObj(makeServiceKey(svc.ID), svc, 0)
|
||||
key, err := makeServiceKey(ctx, svc.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.CreateObj(key, svc, 0)
|
||||
return etcderr.InterpretCreateError(err, "service", svc.ID)
|
||||
}
|
||||
|
||||
// GetService obtains a Service specified by its name.
|
||||
func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error) {
|
||||
key := makeServiceKey(name)
|
||||
key, err := makeServiceKey(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var svc api.Service
|
||||
err := r.ExtractObj(key, &svc, false)
|
||||
err = r.ExtractObj(key, &svc, false)
|
||||
if err != nil {
|
||||
return nil, etcderr.InterpretGetError(err, "service", name)
|
||||
}
|
||||
|
@ -363,30 +458,45 @@ func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error
|
|||
|
||||
// GetEndpoints obtains the endpoints for the service identified by 'name'.
|
||||
func (r *Registry) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) {
|
||||
key := makeServiceEndpointsKey(name)
|
||||
var endpoints api.Endpoints
|
||||
err := r.ExtractObj(key, &endpoints, false)
|
||||
key, err := makeServiceEndpointsKey(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = r.ExtractObj(key, &endpoints, false)
|
||||
if err != nil {
|
||||
return nil, etcderr.InterpretGetError(err, "endpoints", name)
|
||||
}
|
||||
return &endpoints, nil
|
||||
}
|
||||
|
||||
func makeServiceEndpointsKey(name string) string {
|
||||
return "/registry/services/endpoints/" + name
|
||||
// makeServiceEndpointsListKey constructs etcd paths to service endpoint directories enforcing namespace rules.
|
||||
func makeServiceEndpointsListKey(ctx api.Context) string {
|
||||
return MakeEtcdListKey(ctx, ServiceEndpointPath)
|
||||
}
|
||||
|
||||
// makeServiceEndpointsListKey constructs etcd paths to service endpoint items enforcing namespace rules.
|
||||
func makeServiceEndpointsKey(ctx api.Context, name string) (string, error) {
|
||||
return MakeEtcdItemKey(ctx, ServiceEndpointPath, name)
|
||||
}
|
||||
|
||||
// DeleteService deletes a Service specified by its name.
|
||||
func (r *Registry) DeleteService(ctx api.Context, name string) error {
|
||||
key := makeServiceKey(name)
|
||||
err := r.Delete(key, true)
|
||||
key, err := makeServiceKey(ctx, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.Delete(key, true)
|
||||
if err != nil {
|
||||
return etcderr.InterpretDeleteError(err, "service", name)
|
||||
}
|
||||
|
||||
// TODO: can leave dangling endpoints, and potentially return incorrect
|
||||
// endpoints if a new service is created with the same name
|
||||
key = makeServiceEndpointsKey(name)
|
||||
key, err = makeServiceEndpointsKey(ctx, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Delete(key, true); err != nil && !tools.IsEtcdNotFound(err) {
|
||||
return etcderr.InterpretDeleteError(err, "endpoints", name)
|
||||
}
|
||||
|
@ -395,7 +505,11 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error {
|
|||
|
||||
// UpdateService replaces an existing Service.
|
||||
func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
|
||||
err := r.SetObj(makeServiceKey(svc.ID), svc)
|
||||
key, err := makeServiceKey(ctx, svc.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.SetObj(key, svc)
|
||||
return etcderr.InterpretUpdateError(err, "service", svc.ID)
|
||||
}
|
||||
|
||||
|
@ -409,10 +523,14 @@ func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector,
|
|||
return nil, fmt.Errorf("label selectors are not supported on services")
|
||||
}
|
||||
if value, found := field.RequiresExactMatch("ID"); found {
|
||||
return r.Watch(makeServiceKey(value), version), nil
|
||||
key, err := makeServiceKey(ctx, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.Watch(key, version), nil
|
||||
}
|
||||
if field.Empty() {
|
||||
return r.WatchList("/registry/services/specs", version, tools.Everything)
|
||||
return r.WatchList(makeServiceListKey(ctx), version, tools.Everything)
|
||||
}
|
||||
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
|
||||
}
|
||||
|
@ -420,14 +538,19 @@ func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector,
|
|||
// ListEndpoints obtains a list of Services.
|
||||
func (r *Registry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
|
||||
list := &api.EndpointsList{}
|
||||
err := r.ExtractToList("/registry/services/endpoints", list)
|
||||
key := makeServiceEndpointsListKey(ctx)
|
||||
err := r.ExtractToList(key, list)
|
||||
return list, err
|
||||
}
|
||||
|
||||
// UpdateEndpoints update Endpoints of a Service.
|
||||
func (r *Registry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error {
|
||||
key, err := makeServiceEndpointsKey(ctx, e.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
|
||||
err := r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
|
||||
err = r.AtomicUpdate(key, &api.Endpoints{},
|
||||
func(input runtime.Object) (runtime.Object, error) {
|
||||
// TODO: racy - label query is returning different results for two simultaneous updaters
|
||||
return e, nil
|
||||
|
@ -445,10 +568,18 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector,
|
|||
return nil, fmt.Errorf("label selectors are not supported on endpoints")
|
||||
}
|
||||
if value, found := field.RequiresExactMatch("ID"); found {
|
||||
return r.Watch(makeServiceEndpointsKey(value), version), nil
|
||||
key, err := makeServiceEndpointsKey(ctx, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.Watch(key, version), nil
|
||||
}
|
||||
if field.Empty() {
|
||||
return r.WatchList("/registry/services/endpoints", version, tools.Everything)
|
||||
key, err := makeServiceEndpointsKey(ctx, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.WatchList(key, version, tools.Everything)
|
||||
}
|
||||
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package etcd
|
|||
import (
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
|
@ -76,10 +77,50 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEtcdGetPod(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash
|
||||
func TestEtcdGetPodDifferentNamespace(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Set("/registry/pods/foo", runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
|
||||
ctx1 := api.NewDefaultContext()
|
||||
ctx2 := api.WithNamespace(api.NewContext(), "other")
|
||||
|
||||
key1, _ := makePodKey(ctx1, "foo")
|
||||
key2, _ := makePodKey(ctx2, "foo")
|
||||
|
||||
fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{Namespace: "default", ID: "foo"}}), 0)
|
||||
fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{Namespace: "other", ID: "foo"}}), 0)
|
||||
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
|
||||
pod1, err := registry.GetPod(ctx1, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if pod1.ID != "foo" {
|
||||
t.Errorf("Unexpected pod: %#v", pod1)
|
||||
}
|
||||
if pod1.Namespace != "default" {
|
||||
t.Errorf("Unexpected pod: %#v", pod1)
|
||||
}
|
||||
|
||||
pod2, err := registry.GetPod(ctx2, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if pod2.ID != "foo" {
|
||||
t.Errorf("Unexpected pod: %#v", pod2)
|
||||
}
|
||||
if pod2.Namespace != "other" {
|
||||
t.Errorf("Unexpected pod: %#v", pod2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEtcdGetPod(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
pod, err := registry.GetPod(ctx, "foo")
|
||||
if err != nil {
|
||||
|
@ -92,9 +133,10 @@ func TestEtcdGetPod(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdGetPodNotFound(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
|
@ -108,10 +150,11 @@ func TestEtcdGetPodNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdCreatePod(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
|
@ -138,12 +181,12 @@ func TestEtcdCreatePod(t *testing.T) {
|
|||
}
|
||||
|
||||
// Suddenly, a wild scheduler appears:
|
||||
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine"})
|
||||
err = registry.ApplyBinding(ctx, &api.Binding{PodID: "foo", Host: "machine", TypeMeta: api.TypeMeta{Namespace: api.NamespaceDefault}})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||
resp, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -168,10 +211,34 @@ func TestEtcdCreatePod(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
func TestEtcdCreatePodFailsWithoutNamespace(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||
fakeClient.TestIndex = true
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
err := registry.CreatePod(api.NewContext(), &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
ID: "foo",
|
||||
},
|
||||
DesiredState: api.PodState{
|
||||
Manifest: api.ContainerManifest{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err == nil || !strings.Contains(err.Error(), "namespace") {
|
||||
t.Fatalf("expected error that namespace was missing from context")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Value: runtime.EncodeOrDie(latest.Codec, &api.Pod{TypeMeta: api.TypeMeta{ID: "foo"}}),
|
||||
|
@ -191,10 +258,11 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdCreatePodWithContainersError(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
|
@ -232,10 +300,11 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
|
@ -273,7 +342,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||
resp, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -299,10 +368,11 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
|
@ -339,7 +409,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||
resp, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -365,11 +435,11 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdUpdatePodNotFound(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
|
||||
key := "/registry/pods/foo"
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
|
@ -389,11 +459,11 @@ func TestEtcdUpdatePodNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdUpdatePodNotScheduled(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
|
||||
key := "/registry/pods/foo"
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
|
||||
TypeMeta: api.TypeMeta{ID: "foo"},
|
||||
}), 1)
|
||||
|
@ -421,11 +491,11 @@ func TestEtcdUpdatePodNotScheduled(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdUpdatePodScheduled(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
|
||||
key := "/registry/pods/foo"
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
|
||||
TypeMeta: api.TypeMeta{ID: "foo"},
|
||||
DesiredState: api.PodState{
|
||||
|
@ -507,11 +577,11 @@ func TestEtcdUpdatePodScheduled(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdDeletePod(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
|
||||
key := "/registry/pods/foo"
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
|
||||
TypeMeta: api.TypeMeta{ID: "foo"},
|
||||
DesiredState: api.PodState{Host: "machine"},
|
||||
|
@ -544,11 +614,10 @@ func TestEtcdDeletePod(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdDeletePodMultipleContainers(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
|
||||
key := "/registry/pods/foo"
|
||||
key, _ := makePodKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Pod{
|
||||
TypeMeta: api.TypeMeta{ID: "foo"},
|
||||
DesiredState: api.PodState{Host: "machine"},
|
||||
|
@ -587,7 +656,8 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
|
|||
|
||||
func TestEtcdEmptyListPods(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/pods"
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makePodListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
|
@ -597,12 +667,10 @@ func TestEtcdEmptyListPods(t *testing.T) {
|
|||
E: nil,
|
||||
}
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
ctx := api.NewContext()
|
||||
pods, err := registry.ListPods(ctx, labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(pods.Items) != 0 {
|
||||
t.Errorf("Unexpected pod list: %#v", pods)
|
||||
}
|
||||
|
@ -610,13 +678,13 @@ func TestEtcdEmptyListPods(t *testing.T) {
|
|||
|
||||
func TestEtcdListPodsNotFound(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/pods"
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makePodListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
}
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
ctx := api.NewContext()
|
||||
pods, err := registry.ListPods(ctx, labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -629,7 +697,8 @@ func TestEtcdListPodsNotFound(t *testing.T) {
|
|||
|
||||
func TestEtcdListPods(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/pods"
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makePodListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
|
@ -652,7 +721,6 @@ func TestEtcdListPods(t *testing.T) {
|
|||
E: nil,
|
||||
}
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
ctx := api.NewContext()
|
||||
pods, err := registry.ListPods(ctx, labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -668,9 +736,9 @@ func TestEtcdListPods(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdListControllersNotFound(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/controllers"
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makeControllerListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
|
@ -687,9 +755,9 @@ func TestEtcdListControllersNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdListServicesNotFound(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/services/specs"
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makeServiceListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
|
@ -706,9 +774,9 @@ func TestEtcdListServicesNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdListControllers(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/controllers"
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makeControllerListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
|
@ -735,10 +803,50 @@ func TestEtcdListControllers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEtcdGetController(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
// TestEtcdGetControllerDifferentNamespace ensures same-name controllers in different namespaces do not clash
|
||||
func TestEtcdGetControllerDifferentNamespace(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
|
||||
ctx1 := api.NewDefaultContext()
|
||||
ctx2 := api.WithNamespace(api.NewContext(), "other")
|
||||
|
||||
key1, _ := makeControllerKey(ctx1, "foo")
|
||||
key2, _ := makeControllerKey(ctx2, "foo")
|
||||
|
||||
fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{Namespace: "default", ID: "foo"}}), 0)
|
||||
fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{Namespace: "other", ID: "foo"}}), 0)
|
||||
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
|
||||
ctrl1, err := registry.GetController(ctx1, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if ctrl1.ID != "foo" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl1)
|
||||
}
|
||||
if ctrl1.Namespace != "default" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl1)
|
||||
}
|
||||
|
||||
ctrl2, err := registry.GetController(ctx2, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if ctrl2.ID != "foo" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl2)
|
||||
}
|
||||
if ctrl2.Namespace != "other" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEtcdGetController(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
ctrl, err := registry.GetController(ctx, "foo")
|
||||
if err != nil {
|
||||
|
@ -751,9 +859,10 @@ func TestEtcdGetController(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdGetControllerNotFound(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/registry/controllers/foo"] = tools.EtcdResponseWithError{
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
|
@ -770,9 +879,10 @@ func TestEtcdGetControllerNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdDeleteController(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
err := registry.DeleteController(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -781,16 +891,16 @@ func TestEtcdDeleteController(t *testing.T) {
|
|||
if len(fakeClient.DeletedKeys) != 1 {
|
||||
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
|
||||
}
|
||||
key := "/registry/controllers/foo"
|
||||
if fakeClient.DeletedKeys[0] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdCreateController(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
err := registry.CreateController(ctx, &api.ReplicationController{
|
||||
TypeMeta: api.TypeMeta{
|
||||
ID: "foo",
|
||||
|
@ -799,8 +909,7 @@ func TestEtcdCreateController(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
resp, err := fakeClient.Get("/registry/controllers/foo", false, false)
|
||||
resp, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -816,9 +925,10 @@ func TestEtcdCreateController(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdCreateControllerAlreadyExisting(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
err := registry.CreateController(ctx, &api.ReplicationController{
|
||||
|
@ -832,11 +942,11 @@ func TestEtcdCreateControllerAlreadyExisting(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdUpdateController(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
|
||||
resp, _ := fakeClient.Set("/registry/controllers/foo", runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
err := registry.UpdateController(ctx, &api.ReplicationController{
|
||||
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)},
|
||||
|
@ -855,9 +965,9 @@ func TestEtcdUpdateController(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdListServices(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/services/specs"
|
||||
key := makeServiceListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
|
@ -885,7 +995,7 @@ func TestEtcdListServices(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdCreateService(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
err := registry.CreateService(ctx, &api.Service{
|
||||
|
@ -895,7 +1005,8 @@ func TestEtcdCreateService(t *testing.T) {
|
|||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
resp, err := fakeClient.Get("/registry/services/specs/foo", false, false)
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
resp, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -912,9 +1023,10 @@ func TestEtcdCreateService(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
err := registry.CreateService(ctx, &api.Service{
|
||||
TypeMeta: api.TypeMeta{ID: "foo"},
|
||||
|
@ -924,10 +1036,50 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEtcdGetService(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
// TestEtcdGetServiceDifferentNamespace ensures same-name services in different namespaces do not clash
|
||||
func TestEtcdGetServiceDifferentNamespace(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
|
||||
ctx1 := api.NewDefaultContext()
|
||||
ctx2 := api.WithNamespace(api.NewContext(), "other")
|
||||
|
||||
key1, _ := makeServiceKey(ctx1, "foo")
|
||||
key2, _ := makeServiceKey(ctx2, "foo")
|
||||
|
||||
fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{Namespace: "default", ID: "foo"}}), 0)
|
||||
fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{Namespace: "other", ID: "foo"}}), 0)
|
||||
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
|
||||
service1, err := registry.GetService(ctx1, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if service1.ID != "foo" {
|
||||
t.Errorf("Unexpected service: %#v", service1)
|
||||
}
|
||||
if service1.Namespace != "default" {
|
||||
t.Errorf("Unexpected service: %#v", service1)
|
||||
}
|
||||
|
||||
service2, err := registry.GetService(ctx2, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if service2.ID != "foo" {
|
||||
t.Errorf("Unexpected service: %#v", service2)
|
||||
}
|
||||
if service2.Namespace != "other" {
|
||||
t.Errorf("Unexpected service: %#v", service2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEtcdGetService(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
service, err := registry.GetService(ctx, "foo")
|
||||
if err != nil {
|
||||
|
@ -940,9 +1092,10 @@ func TestEtcdGetService(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdGetServiceNotFound(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
|
@ -956,7 +1109,7 @@ func TestEtcdGetServiceNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdDeleteService(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
err := registry.DeleteService(ctx, "foo")
|
||||
|
@ -967,22 +1120,22 @@ func TestEtcdDeleteService(t *testing.T) {
|
|||
if len(fakeClient.DeletedKeys) != 2 {
|
||||
t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys)
|
||||
}
|
||||
key := "/registry/services/specs/foo"
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
if fakeClient.DeletedKeys[0] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||
}
|
||||
key = "/registry/services/endpoints/foo"
|
||||
key, _ = makeServiceEndpointsKey(ctx, "foo")
|
||||
if fakeClient.DeletedKeys[1] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdUpdateService(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
|
||||
resp, _ := fakeClient.Set("/registry/services/specs/foo", runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
key, _ := makeServiceKey(ctx, "foo")
|
||||
resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
testService := api.Service{
|
||||
TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)},
|
||||
|
@ -1012,9 +1165,9 @@ func TestEtcdUpdateService(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdListEndpoints(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
key := "/registry/services/endpoints"
|
||||
key := makeServiceEndpointsListKey(ctx)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
|
@ -1042,7 +1195,7 @@ func TestEtcdListEndpoints(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdGetEndpoints(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
endpoints := &api.Endpoints{
|
||||
|
@ -1050,7 +1203,8 @@ func TestEtcdGetEndpoints(t *testing.T) {
|
|||
Endpoints: []string{"127.0.0.1:34855"},
|
||||
}
|
||||
|
||||
fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, endpoints), 0)
|
||||
key, _ := makeServiceEndpointsKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, endpoints), 0)
|
||||
|
||||
got, err := registry.GetEndpoints(ctx, "foo")
|
||||
if err != nil {
|
||||
|
@ -1063,7 +1217,7 @@ func TestEtcdGetEndpoints(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdUpdateEndpoints(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
|
@ -1072,14 +1226,15 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
|
|||
Endpoints: []string{"baz", "bar"},
|
||||
}
|
||||
|
||||
fakeClient.Set("/registry/services/endpoints/foo", runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0)
|
||||
key, _ := makeServiceEndpointsKey(ctx, "foo")
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{}), 0)
|
||||
|
||||
err := registry.UpdateEndpoints(ctx, &endpoints)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
response, err := fakeClient.Get("/registry/services/endpoints/foo", false, false)
|
||||
response, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -1091,7 +1246,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdWatchServices(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
watching, err := registry.WatchServices(ctx,
|
||||
|
@ -1119,7 +1274,7 @@ func TestEtcdWatchServices(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdWatchServicesBadSelector(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
_, err := registry.WatchServices(
|
||||
|
@ -1144,7 +1299,7 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEtcdWatchEndpoints(t *testing.T) {
|
||||
ctx := api.NewContext()
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
watching, err := registry.WatchEndpoints(
|
||||
|
|
|
@ -179,18 +179,18 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
|||
for _, node := range nodes {
|
||||
if node.Dir {
|
||||
h.decodeNodeList(node.Nodes, slicePtr)
|
||||
} else {
|
||||
obj := reflect.New(v.Type().Elem())
|
||||
err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object))
|
||||
if h.ResourceVersioner != nil {
|
||||
_ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex)
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
continue
|
||||
}
|
||||
obj := reflect.New(v.Type().Elem())
|
||||
err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object))
|
||||
if h.ResourceVersioner != nil {
|
||||
_ = h.ResourceVersioner.SetResourceVersion(obj.Interface().(runtime.Object), node.ModifiedIndex)
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -141,10 +141,10 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
|
|||
},
|
||||
}
|
||||
expect := api.PodList{
|
||||
JSONBase: api.JSONBase{ResourceVersion: 10},
|
||||
TypeMeta: api.TypeMeta{ResourceVersion: "10"},
|
||||
Items: []api.Pod{
|
||||
{JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}},
|
||||
{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 2}},
|
||||
{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}},
|
||||
{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -187,11 +187,11 @@ func TestExtractToListExcludesDirectories(t *testing.T) {
|
|||
},
|
||||
}
|
||||
expect := api.PodList{
|
||||
JSONBase: api.JSONBase{ResourceVersion: 10},
|
||||
TypeMeta: api.TypeMeta{ResourceVersion: "10"},
|
||||
Items: []api.Pod{
|
||||
{JSONBase: api.JSONBase{ID: "foo", ResourceVersion: 1}},
|
||||
{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 2}},
|
||||
{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 3}},
|
||||
{TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}},
|
||||
{TypeMeta: api.TypeMeta{ID: "bar", ResourceVersion: "2"}},
|
||||
{TypeMeta: api.TypeMeta{ID: "baz", ResourceVersion: "3"}},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -184,7 +184,8 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue
|
|||
backoff.wait(podID)
|
||||
// Get the pod again; it may have changed/been scheduled already.
|
||||
pod = &api.Pod{}
|
||||
err := factory.Client.Get().Path("pods").Path(podID).Do().Into(pod)
|
||||
ctx := api.WithNamespace(api.NewContext(), pod.Namespace)
|
||||
err := factory.Client.Get().Namespace(api.Namespace(ctx)).Path("pods").Path(podID).Do().Into(pod)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting pod %v for retry: %v; abandoning", podID, err)
|
||||
return
|
||||
|
@ -256,7 +257,8 @@ type binder struct {
|
|||
// Bind just does a POST binding RPC.
|
||||
func (b *binder) Bind(binding *api.Binding) error {
|
||||
glog.V(2).Infof("Attempting to bind %v to %v", binding.PodID, binding.Host)
|
||||
return b.Post().Path("bindings").Body(binding).Do().Error()
|
||||
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
|
||||
return b.Post().Namespace(api.Namespace(ctx)).Path("bindings").Body(binding).Do().Error()
|
||||
}
|
||||
|
||||
type clock interface {
|
||||
|
|
|
@ -73,8 +73,9 @@ func (s *Scheduler) scheduleOne() {
|
|||
return
|
||||
}
|
||||
b := &api.Binding{
|
||||
PodID: pod.ID,
|
||||
Host: dest,
|
||||
TypeMeta: api.TypeMeta{Namespace: pod.Namespace},
|
||||
PodID: pod.ID,
|
||||
Host: dest,
|
||||
}
|
||||
if err := s.config.Binder.Bind(b); err != nil {
|
||||
record.Eventf(pod, "", string(api.PodWaiting), "failedScheduling", "Binding rejected: %v", err)
|
||||
|
|
Loading…
Reference in New Issue