diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 5214327e9a..19ba2c3724 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -181,6 +181,7 @@ func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc { } func runReplicationControllerTest(c *client.Client) { + ctx := api.NewDefaultContext() data, err := ioutil.ReadFile("api/examples/controller.json") if err != nil { glog.Fatalf("Unexpected error: %#v", err) @@ -191,7 +192,7 @@ func runReplicationControllerTest(c *client.Client) { } glog.Infof("Creating replication controllers") - if _, err := c.CreateReplicationController(&controllerRequest); err != nil { + if _, err := c.CreateReplicationController(ctx, &controllerRequest); err != nil { glog.Fatalf("Unexpected error: %#v", err) } glog.Infof("Done creating replication controllers") @@ -202,7 +203,7 @@ func runReplicationControllerTest(c *client.Client) { } // wait for minions to indicate they have info about the desired pods - pods, err := c.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector()) + pods, err := c.ListPods(ctx, labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector()) if err != nil { glog.Fatalf("FAILED: unable to get pods to list: %v", err) } diff --git a/cmd/kubecfg/kubecfg.go b/cmd/kubecfg/kubecfg.go index 0e00ba87b8..dfc6a506ee 100644 --- a/cmd/kubecfg/kubecfg.go +++ b/cmd/kubecfg/kubecfg.go @@ -175,7 +175,7 @@ func main() { } // TODO: get the namespace context when kubecfg ns is completed - clientConfig.Context = api.NewContext() + ctx := api.NewContext() if clientConfig.Host == "" { // TODO: eventually apiserver should start on 443 and be secure by default @@ -242,7 +242,7 @@ func main() { } method := flag.Arg(0) - matchFound := executeAPIRequest(method, kubeClient) || executeControllerRequest(method, kubeClient) + matchFound := executeAPIRequest(ctx, method, kubeClient) || executeControllerRequest(ctx, method, kubeClient) if matchFound == false { glog.Fatalf("Unknown command %s", method) } @@ -302,7 +302,7 @@ func getPrinter() kubecfg.ResourcePrinter { return printer } -func executeAPIRequest(method string, c *client.Client) bool { +func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool { storage, path, hasSuffix := storagePathFromArg(flag.Arg(1)) validStorage := checkStorage(storage) verb := "" @@ -401,7 +401,7 @@ func executeAPIRequest(method string, c *client.Client) bool { return true } -func executeControllerRequest(method string, c *client.Client) bool { +func executeControllerRequest(ctx api.Context, method string, c *client.Client) bool { parseController := func() string { if len(flag.Args()) != 2 { glog.Fatal("usage: kubecfg [OPTIONS] stop|rm|rollingupdate ") @@ -412,11 +412,11 @@ func executeControllerRequest(method string, c *client.Client) bool { var err error switch method { case "stop": - err = kubecfg.StopController(parseController(), c) + err = kubecfg.StopController(ctx, parseController(), c) case "rm": - err = kubecfg.DeleteController(parseController(), c) + err = kubecfg.DeleteController(ctx, parseController(), c) case "rollingupdate": - err = kubecfg.Update(parseController(), c, *updatePeriod, *imageName) + err = kubecfg.Update(ctx, parseController(), c, *updatePeriod, *imageName) case "run": if len(flag.Args()) != 4 { glog.Fatal("usage: kubecfg [OPTIONS] run ") @@ -427,7 +427,7 @@ func executeControllerRequest(method string, c *client.Client) bool { glog.Fatalf("Error parsing replicas: %v", err2) } name := flag.Arg(3) - err = kubecfg.RunController(image, name, replicas, c, *portSpec, *servicePort) + err = kubecfg.RunController(ctx, image, name, replicas, c, *portSpec, *servicePort) case "resize": args := flag.Args() if len(args) < 3 { @@ -438,7 +438,7 @@ func executeControllerRequest(method string, c *client.Client) bool { if err2 != nil { glog.Fatalf("Error parsing replicas: %v", err2) } - err = kubecfg.ResizeController(name, replicas, c) + err = kubecfg.ResizeController(ctx, name, replicas, c) default: return false } diff --git a/pkg/client/client.go b/pkg/client/client.go index eed1a14f81..c24d115ea3 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -39,38 +39,38 @@ type Interface interface { // PodInterface has methods to work with Pod resources. type PodInterface interface { - ListPods(selector labels.Selector) (*api.PodList, error) - GetPod(id string) (*api.Pod, error) - DeletePod(id string) error - CreatePod(*api.Pod) (*api.Pod, error) - UpdatePod(*api.Pod) (*api.Pod, error) + ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) + GetPod(ctx api.Context, id string) (*api.Pod, error) + DeletePod(ctx api.Context, id string) error + CreatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error) + UpdatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error) } // ReplicationControllerInterface has methods to work with ReplicationController resources. type ReplicationControllerInterface interface { - ListReplicationControllers(selector labels.Selector) (*api.ReplicationControllerList, error) - GetReplicationController(id string) (*api.ReplicationController, error) - CreateReplicationController(*api.ReplicationController) (*api.ReplicationController, error) - UpdateReplicationController(*api.ReplicationController) (*api.ReplicationController, error) - DeleteReplicationController(string) error - WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + ListReplicationControllers(ctx api.Context, selector labels.Selector) (*api.ReplicationControllerList, error) + GetReplicationController(ctx api.Context, id string) (*api.ReplicationController, error) + CreateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error) + UpdateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error) + DeleteReplicationController(ctx api.Context, id string) error + WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } // ServiceInterface has methods to work with Service resources. type ServiceInterface interface { - ListServices(selector labels.Selector) (*api.ServiceList, error) - GetService(id string) (*api.Service, error) - CreateService(*api.Service) (*api.Service, error) - UpdateService(*api.Service) (*api.Service, error) - DeleteService(string) error - WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + ListServices(ctx api.Context, selector labels.Selector) (*api.ServiceList, error) + GetService(ctx api.Context, id string) (*api.Service, error) + CreateService(ctx api.Context, srv *api.Service) (*api.Service, error) + UpdateService(ctx api.Context, srv *api.Service) (*api.Service, error) + DeleteService(ctx api.Context, id string) error + WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } // EndpointsInterface has methods to work with Endpoints resources type EndpointsInterface interface { - ListEndpoints(selector labels.Selector) (*api.EndpointsList, error) - GetEndpoints(id string) (*api.Endpoints, error) - WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + ListEndpoints(ctx api.Context, selector labels.Selector) (*api.EndpointsList, error) + GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) + WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } // VersionInterface has a method to retrieve the server version. @@ -94,33 +94,33 @@ type Client struct { } // ListPods takes a selector, and returns the list of pods that match that selector. -func (c *Client) ListPods(selector labels.Selector) (result *api.PodList, err error) { +func (c *Client) ListPods(ctx api.Context, selector labels.Selector) (result *api.PodList, err error) { result = &api.PodList{} err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(result) return } // GetPod takes the id of the pod, and returns the corresponding Pod object, and an error if it occurs -func (c *Client) GetPod(id string) (result *api.Pod, err error) { +func (c *Client) GetPod(ctx api.Context, id string) (result *api.Pod, err error) { result = &api.Pod{} err = c.Get().Path("pods").Path(id).Do().Into(result) return } // DeletePod takes the id of the pod, and returns an error if one occurs -func (c *Client) DeletePod(id string) error { +func (c *Client) DeletePod(ctx api.Context, id string) error { return c.Delete().Path("pods").Path(id).Do().Error() } // CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs. -func (c *Client) CreatePod(pod *api.Pod) (result *api.Pod, err error) { +func (c *Client) CreatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} err = c.Post().Path("pods").Body(pod).Do().Into(result) return } // UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs. -func (c *Client) UpdatePod(pod *api.Pod) (result *api.Pod, err error) { +func (c *Client) UpdatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} if pod.ResourceVersion == 0 { err = fmt.Errorf("invalid update object, missing resource version: %v", pod) @@ -131,28 +131,28 @@ func (c *Client) UpdatePod(pod *api.Pod) (result *api.Pod, err error) { } // ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector. -func (c *Client) ListReplicationControllers(selector labels.Selector) (result *api.ReplicationControllerList, err error) { +func (c *Client) ListReplicationControllers(ctx api.Context, selector labels.Selector) (result *api.ReplicationControllerList, err error) { result = &api.ReplicationControllerList{} err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(result) return } // GetReplicationController returns information about a particular replication controller. -func (c *Client) GetReplicationController(id string) (result *api.ReplicationController, err error) { +func (c *Client) GetReplicationController(ctx api.Context, id string) (result *api.ReplicationController, err error) { result = &api.ReplicationController{} err = c.Get().Path("replicationControllers").Path(id).Do().Into(result) return } // CreateReplicationController creates a new replication controller. -func (c *Client) CreateReplicationController(controller *api.ReplicationController) (result *api.ReplicationController, err error) { +func (c *Client) CreateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) { result = &api.ReplicationController{} err = c.Post().Path("replicationControllers").Body(controller).Do().Into(result) return } // UpdateReplicationController updates an existing replication controller. -func (c *Client) UpdateReplicationController(controller *api.ReplicationController) (result *api.ReplicationController, err error) { +func (c *Client) UpdateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) { result = &api.ReplicationController{} if controller.ResourceVersion == 0 { err = fmt.Errorf("invalid update object, missing resource version: %v", controller) @@ -163,12 +163,12 @@ func (c *Client) UpdateReplicationController(controller *api.ReplicationControll } // DeleteReplicationController deletes an existing replication controller. -func (c *Client) DeleteReplicationController(id string) error { +func (c *Client) DeleteReplicationController(ctx api.Context, id string) error { return c.Delete().Path("replicationControllers").Path(id).Do().Error() } // WatchReplicationControllers returns a watch.Interface that watches the requested controllers. -func (c *Client) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return c.Get(). Path("watch"). Path("replicationControllers"). @@ -179,28 +179,28 @@ func (c *Client) WatchReplicationControllers(label, field labels.Selector, resou } // ListServices takes a selector, and returns the list of services that match that selector -func (c *Client) ListServices(selector labels.Selector) (result *api.ServiceList, err error) { +func (c *Client) ListServices(ctx api.Context, selector labels.Selector) (result *api.ServiceList, err error) { result = &api.ServiceList{} err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(result) return } // GetService returns information about a particular service. -func (c *Client) GetService(id string) (result *api.Service, err error) { +func (c *Client) GetService(ctx api.Context, id string) (result *api.Service, err error) { result = &api.Service{} err = c.Get().Path("services").Path(id).Do().Into(result) return } // CreateService creates a new service. -func (c *Client) CreateService(svc *api.Service) (result *api.Service, err error) { +func (c *Client) CreateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) { result = &api.Service{} err = c.Post().Path("services").Body(svc).Do().Into(result) return } // UpdateService updates an existing service. -func (c *Client) UpdateService(svc *api.Service) (result *api.Service, err error) { +func (c *Client) UpdateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) { result = &api.Service{} if svc.ResourceVersion == 0 { err = fmt.Errorf("invalid update object, missing resource version: %v", svc) @@ -211,12 +211,12 @@ func (c *Client) UpdateService(svc *api.Service) (result *api.Service, err error } // DeleteService deletes an existing service. -func (c *Client) DeleteService(id string) error { +func (c *Client) DeleteService(ctx api.Context, id string) error { return c.Delete().Path("services").Path(id).Do().Error() } // WatchServices returns a watch.Interface that watches the requested services. -func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return c.Get(). Path("watch"). Path("services"). @@ -227,21 +227,21 @@ func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uin } // ListEndpoints takes a selector, and returns the list of endpoints that match that selector -func (c *Client) ListEndpoints(selector labels.Selector) (result *api.EndpointsList, err error) { +func (c *Client) ListEndpoints(ctx api.Context, selector labels.Selector) (result *api.EndpointsList, err error) { result = &api.EndpointsList{} err = c.Get().Path("endpoints").SelectorParam("labels", selector).Do().Into(result) return } // GetEndpoints returns information about the endpoints for a particular service. -func (c *Client) GetEndpoints(id string) (result *api.Endpoints, err error) { +func (c *Client) GetEndpoints(ctx api.Context, id string) (result *api.Endpoints, err error) { result = &api.Endpoints{} err = c.Get().Path("endpoints").Path(id).Do().Into(result) return } // WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service. -func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { return c.Get(). Path("watch"). Path("endpoints"). @@ -251,13 +251,13 @@ func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion ui Watch() } -func (c *Client) CreateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) { +func (c *Client) CreateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) { result := &api.Endpoints{} err := c.Post().Path("endpoints").Body(endpoints).Do().Into(result) return result, err } -func (c *Client) UpdateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) { +func (c *Client) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) { result := &api.Endpoints{} if endpoints.ResourceVersion == 0 { return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 7a48fc7839..e75c2289df 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -145,15 +145,17 @@ func (c *testClient) ValidateCommon(t *testing.T, err error) { } func TestListEmptyPods(t *testing.T) { + ctx := api.NewContext() c := &testClient{ Request: testRequest{Method: "GET", Path: "/pods"}, Response: Response{StatusCode: 200, Body: &api.PodList{}}, } - podList, err := c.Setup().ListPods(labels.Everything()) + podList, err := c.Setup().ListPods(ctx, labels.Everything()) c.Validate(t, podList, err) } func TestListPods(t *testing.T) { + ctx := api.NewDefaultContext() c := &testClient{ Request: testRequest{Method: "GET", Path: "/pods"}, Response: Response{StatusCode: 200, @@ -172,7 +174,7 @@ func TestListPods(t *testing.T) { }, }, } - receivedPodList, err := c.Setup().ListPods(labels.Everything()) + receivedPodList, err := c.Setup().ListPods(ctx, labels.Everything()) c.Validate(t, receivedPodList, err) } @@ -183,6 +185,7 @@ func validateLabels(a, b string) bool { } func TestListPodsLabels(t *testing.T) { + ctx := api.NewDefaultContext() c := &testClient{ Request: testRequest{Method: "GET", Path: "/pods", Query: url.Values{"labels": []string{"foo=bar,name=baz"}}}, Response: Response{ @@ -205,11 +208,12 @@ func TestListPodsLabels(t *testing.T) { c.Setup() c.QueryValidator["labels"] = validateLabels selector := labels.Set{"foo": "bar", "name": "baz"}.AsSelector() - receivedPodList, err := c.ListPods(selector) + receivedPodList, err := c.ListPods(ctx, selector) c.Validate(t, receivedPodList, err) } func TestGetPod(t *testing.T) { + ctx := api.NewDefaultContext() c := &testClient{ Request: testRequest{Method: "GET", Path: "/pods/foo"}, Response: Response{ @@ -225,7 +229,7 @@ func TestGetPod(t *testing.T) { }, }, } - receivedPod, err := c.Setup().GetPod("foo") + receivedPod, err := c.Setup().GetPod(ctx, "foo") c.Validate(t, receivedPod, err) } @@ -234,7 +238,7 @@ func TestDeletePod(t *testing.T) { Request: testRequest{Method: "DELETE", Path: "/pods/foo"}, Response: Response{StatusCode: 200}, } - err := c.Setup().DeletePod("foo") + err := c.Setup().DeletePod(api.NewDefaultContext(), "foo") c.Validate(t, nil, err) } @@ -255,7 +259,7 @@ func TestCreatePod(t *testing.T) { Body: requestPod, }, } - receivedPod, err := c.Setup().CreatePod(requestPod) + receivedPod, err := c.Setup().CreatePod(api.NewDefaultContext(), requestPod) c.Validate(t, receivedPod, err) } @@ -274,7 +278,7 @@ func TestUpdatePod(t *testing.T) { Request: testRequest{Method: "PUT", Path: "/pods/foo"}, Response: Response{StatusCode: 200, Body: requestPod}, } - receivedPod, err := c.Setup().UpdatePod(requestPod) + receivedPod, err := c.Setup().UpdatePod(api.NewDefaultContext(), requestPod) c.Validate(t, receivedPod, err) } @@ -298,7 +302,7 @@ func TestListControllers(t *testing.T) { }, }, } - receivedControllerList, err := c.Setup().ListReplicationControllers(labels.Everything()) + receivedControllerList, err := c.Setup().ListReplicationControllers(api.NewContext(), labels.Everything()) c.Validate(t, receivedControllerList, err) } @@ -320,7 +324,7 @@ func TestGetController(t *testing.T) { }, }, } - receivedController, err := c.Setup().GetReplicationController("foo") + receivedController, err := c.Setup().GetReplicationController(api.NewDefaultContext(), "foo") c.Validate(t, receivedController, err) } @@ -344,7 +348,7 @@ func TestUpdateController(t *testing.T) { }, }, } - receivedController, err := c.Setup().UpdateReplicationController(requestController) + receivedController, err := c.Setup().UpdateReplicationController(api.NewDefaultContext(), requestController) c.Validate(t, receivedController, err) } @@ -353,7 +357,7 @@ func TestDeleteController(t *testing.T) { Request: testRequest{Method: "DELETE", Path: "/replicationControllers/foo"}, Response: Response{StatusCode: 200}, } - err := c.Setup().DeleteReplicationController("foo") + err := c.Setup().DeleteReplicationController(api.NewDefaultContext(), "foo") c.Validate(t, nil, err) } @@ -377,7 +381,7 @@ func TestCreateController(t *testing.T) { }, }, } - receivedController, err := c.Setup().CreateReplicationController(requestController) + receivedController, err := c.Setup().CreateReplicationController(api.NewDefaultContext(), requestController) c.Validate(t, receivedController, err) } @@ -410,7 +414,7 @@ func TestListServices(t *testing.T) { }, }, } - receivedServiceList, err := c.Setup().ListServices(labels.Everything()) + receivedServiceList, err := c.Setup().ListServices(api.NewDefaultContext(), labels.Everything()) c.Validate(t, receivedServiceList, err) } @@ -437,7 +441,7 @@ func TestListServicesLabels(t *testing.T) { c.Setup() c.QueryValidator["labels"] = validateLabels selector := labels.Set{"foo": "bar", "name": "baz"}.AsSelector() - receivedServiceList, err := c.ListServices(selector) + receivedServiceList, err := c.ListServices(api.NewDefaultContext(), selector) c.Validate(t, receivedServiceList, err) } @@ -446,7 +450,7 @@ func TestGetService(t *testing.T) { Request: testRequest{Method: "GET", Path: "/services/1"}, Response: Response{StatusCode: 200, Body: &api.Service{JSONBase: api.JSONBase{ID: "service-1"}}}, } - response, err := c.Setup().GetService("1") + response, err := c.Setup().GetService(api.NewDefaultContext(), "1") c.Validate(t, response, err) } @@ -455,7 +459,7 @@ func TestCreateService(t *testing.T) { Request: testRequest{Method: "POST", Path: "/services", Body: &api.Service{JSONBase: api.JSONBase{ID: "service-1"}}}, Response: Response{StatusCode: 200, Body: &api.Service{JSONBase: api.JSONBase{ID: "service-1"}}}, } - response, err := c.Setup().CreateService(&api.Service{JSONBase: api.JSONBase{ID: "service-1"}}) + response, err := c.Setup().CreateService(api.NewDefaultContext(), &api.Service{JSONBase: api.JSONBase{ID: "service-1"}}) c.Validate(t, response, err) } @@ -465,7 +469,7 @@ func TestUpdateService(t *testing.T) { Request: testRequest{Method: "PUT", Path: "/services/service-1", Body: svc}, Response: Response{StatusCode: 200, Body: svc}, } - response, err := c.Setup().UpdateService(svc) + response, err := c.Setup().UpdateService(api.NewDefaultContext(), svc) c.Validate(t, response, err) } @@ -474,7 +478,7 @@ func TestDeleteService(t *testing.T) { Request: testRequest{Method: "DELETE", Path: "/services/1"}, Response: Response{StatusCode: 200}, } - err := c.Setup().DeleteService("1") + err := c.Setup().DeleteService(api.NewDefaultContext(), "1") c.Validate(t, nil, err) } @@ -492,7 +496,7 @@ func TestListEndpooints(t *testing.T) { }, }, } - receivedEndpointsList, err := c.Setup().ListEndpoints(labels.Everything()) + receivedEndpointsList, err := c.Setup().ListEndpoints(api.NewDefaultContext(), labels.Everything()) c.Validate(t, receivedEndpointsList, err) } @@ -501,7 +505,7 @@ func TestGetEndpoints(t *testing.T) { Request: testRequest{Method: "GET", Path: "/endpoints/endpoint-1"}, Response: Response{StatusCode: 200, Body: &api.Endpoints{JSONBase: api.JSONBase{ID: "endpoint-1"}}}, } - response, err := c.Setup().GetEndpoints("endpoint-1") + response, err := c.Setup().GetEndpoints(api.NewDefaultContext(), "endpoint-1") c.Validate(t, response, err) } diff --git a/pkg/client/conditions.go b/pkg/client/conditions.go index 4af01fe535..877263dedf 100644 --- a/pkg/client/conditions.go +++ b/pkg/client/conditions.go @@ -26,7 +26,8 @@ import ( // for a controller's ReplicaSelector equals the Replicas count. func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc { return func() (bool, error) { - pods, err := c.ListPods(labels.Set(controller.DesiredState.ReplicaSelector).AsSelector()) + ctx := api.WithNamespace(api.NewContext(), controller.Namespace) + pods, err := c.ListPods(ctx, labels.Set(controller.DesiredState.ReplicaSelector).AsSelector()) if err != nil { return false, err } diff --git a/pkg/client/fake.go b/pkg/client/fake.go index b172cd5294..a5538fef43 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -42,102 +42,102 @@ type Fake struct { Watch watch.Interface } -func (c *Fake) ListPods(selector labels.Selector) (*api.PodList, error) { +func (c *Fake) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) { c.Actions = append(c.Actions, FakeAction{Action: "list-pods"}) return api.Scheme.CopyOrDie(&c.Pods).(*api.PodList), nil } -func (c *Fake) GetPod(name string) (*api.Pod, error) { +func (c *Fake) GetPod(ctx api.Context, name string) (*api.Pod, error) { c.Actions = append(c.Actions, FakeAction{Action: "get-pod", Value: name}) return &api.Pod{}, nil } -func (c *Fake) DeletePod(name string) error { +func (c *Fake) DeletePod(ctx api.Context, name string) error { c.Actions = append(c.Actions, FakeAction{Action: "delete-pod", Value: name}) return nil } -func (c *Fake) CreatePod(pod *api.Pod) (*api.Pod, error) { +func (c *Fake) CreatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error) { c.Actions = append(c.Actions, FakeAction{Action: "create-pod"}) return &api.Pod{}, nil } -func (c *Fake) UpdatePod(pod *api.Pod) (*api.Pod, error) { +func (c *Fake) UpdatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error) { c.Actions = append(c.Actions, FakeAction{Action: "update-pod", Value: pod.ID}) return &api.Pod{}, nil } -func (c *Fake) ListReplicationControllers(selector labels.Selector) (*api.ReplicationControllerList, error) { +func (c *Fake) ListReplicationControllers(ctx api.Context, selector labels.Selector) (*api.ReplicationControllerList, error) { c.Actions = append(c.Actions, FakeAction{Action: "list-controllers"}) return &api.ReplicationControllerList{}, nil } -func (c *Fake) GetReplicationController(name string) (*api.ReplicationController, error) { +func (c *Fake) GetReplicationController(ctx api.Context, name string) (*api.ReplicationController, error) { c.Actions = append(c.Actions, FakeAction{Action: "get-controller", Value: name}) return api.Scheme.CopyOrDie(&c.Ctrl).(*api.ReplicationController), nil } -func (c *Fake) CreateReplicationController(controller *api.ReplicationController) (*api.ReplicationController, error) { +func (c *Fake) CreateReplicationController(ctx api.Context, controller *api.ReplicationController) (*api.ReplicationController, error) { c.Actions = append(c.Actions, FakeAction{Action: "create-controller", Value: controller}) return &api.ReplicationController{}, nil } -func (c *Fake) UpdateReplicationController(controller *api.ReplicationController) (*api.ReplicationController, error) { +func (c *Fake) UpdateReplicationController(ctx api.Context, controller *api.ReplicationController) (*api.ReplicationController, error) { c.Actions = append(c.Actions, FakeAction{Action: "update-controller", Value: controller}) return &api.ReplicationController{}, nil } -func (c *Fake) DeleteReplicationController(controller string) error { +func (c *Fake) DeleteReplicationController(ctx api.Context, controller string) error { c.Actions = append(c.Actions, FakeAction{Action: "delete-controller", Value: controller}) return nil } -func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Fake) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion}) return c.Watch, nil } -func (c *Fake) ListServices(selector labels.Selector) (*api.ServiceList, error) { +func (c *Fake) ListServices(ctx api.Context, selector labels.Selector) (*api.ServiceList, error) { c.Actions = append(c.Actions, FakeAction{Action: "list-services"}) return &c.ServiceList, c.Err } -func (c *Fake) GetService(name string) (*api.Service, error) { +func (c *Fake) GetService(ctx api.Context, name string) (*api.Service, error) { c.Actions = append(c.Actions, FakeAction{Action: "get-service", Value: name}) return &api.Service{}, nil } -func (c *Fake) CreateService(service *api.Service) (*api.Service, error) { +func (c *Fake) CreateService(ctx api.Context, service *api.Service) (*api.Service, error) { c.Actions = append(c.Actions, FakeAction{Action: "create-service", Value: service}) return &api.Service{}, nil } -func (c *Fake) UpdateService(service *api.Service) (*api.Service, error) { +func (c *Fake) UpdateService(ctx api.Context, service *api.Service) (*api.Service, error) { c.Actions = append(c.Actions, FakeAction{Action: "update-service", Value: service}) return &api.Service{}, nil } -func (c *Fake) DeleteService(service string) error { +func (c *Fake) DeleteService(ctx api.Context, service string) error { c.Actions = append(c.Actions, FakeAction{Action: "delete-service", Value: service}) return nil } -func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Fake) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion}) return c.Watch, c.Err } -func (c *Fake) ListEndpoints(selector labels.Selector) (*api.EndpointsList, error) { +func (c *Fake) ListEndpoints(ctx api.Context, selector labels.Selector) (*api.EndpointsList, error) { c.Actions = append(c.Actions, FakeAction{Action: "list-endpoints"}) return api.Scheme.CopyOrDie(&c.EndpointsList).(*api.EndpointsList), c.Err } -func (c *Fake) GetEndpoints(name string) (*api.Endpoints, error) { +func (c *Fake) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) { c.Actions = append(c.Actions, FakeAction{Action: "get-endpoints"}) return &api.Endpoints{}, nil } -func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { +func (c *Fake) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) return c.Watch, c.Err } diff --git a/pkg/client/helper.go b/pkg/client/helper.go index 6d84281654..42c6cb35f8 100644 --- a/pkg/client/helper.go +++ b/pkg/client/helper.go @@ -23,7 +23,6 @@ import ( "path" "strings" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" ) @@ -55,10 +54,6 @@ type Config struct { // Transport may be used for custom HTTP behavior. This attribute may not // be specified with the TLS client certificate options. Transport http.RoundTripper - - // Context is the context that should be passed down to the server. If nil, the - // context will be set to the appropriate default. - Context api.Context } // New creates a Kubernetes client for the given config. This client works with pods, diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index bf80784727..f8ff2d7e7c 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -42,9 +42,9 @@ type ReplicationManager struct { // created as an interface to allow testing. type PodControlInterface interface { // createReplica creates new replicated pods according to the spec. - createReplica(controllerSpec api.ReplicationController) + createReplica(ctx api.Context, controllerSpec api.ReplicationController) // deletePod deletes the pod identified by podID. - deletePod(podID string) error + deletePod(ctx api.Context, podID string) error } // RealPodControl is the default implementation of PodControllerInterface. @@ -52,7 +52,7 @@ type RealPodControl struct { kubeClient client.Interface } -func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) { +func (r RealPodControl) createReplica(ctx api.Context, controllerSpec api.ReplicationController) { labels := controllerSpec.DesiredState.PodTemplate.Labels // TODO: don't fail to set this label just because the map isn't created. if labels != nil { @@ -62,14 +62,14 @@ func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState, Labels: controllerSpec.DesiredState.PodTemplate.Labels, } - _, err := r.kubeClient.CreatePod(pod) + _, err := r.kubeClient.CreatePod(ctx, pod) if err != nil { glog.Errorf("%#v\n", err) } } -func (r RealPodControl) deletePod(podID string) error { - return r.kubeClient.DeletePod(podID) +func (r RealPodControl) deletePod(ctx api.Context, podID string) error { + return r.kubeClient.DeletePod(ctx, podID) } // NewReplicationManager creates a new ReplicationManager. @@ -93,7 +93,9 @@ func (rm *ReplicationManager) Run(period time.Duration) { // resourceVersion is a pointer to the resource version to use/update. func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) { + ctx := api.NewContext() watching, err := rm.kubeClient.WatchReplicationControllers( + ctx, labels.Everything(), labels.Everything(), *resourceVersion, @@ -143,7 +145,8 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error { s := labels.Set(controllerSpec.DesiredState.ReplicaSelector).AsSelector() - podList, err := rm.kubeClient.ListPods(s) + ctx := api.WithNamespace(api.NewContext(), controllerSpec.Namespace) + podList, err := rm.kubeClient.ListPods(ctx, s) if err != nil { return err } @@ -157,7 +160,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli for i := 0; i < diff; i++ { go func() { defer wait.Done() - rm.podControl.createReplica(controllerSpec) + rm.podControl.createReplica(ctx, controllerSpec) }() } wait.Wait() @@ -168,7 +171,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli for i := 0; i < diff; i++ { go func(ix int) { defer wait.Done() - rm.podControl.deletePod(filteredList[ix].ID) + rm.podControl.deletePod(ctx, filteredList[ix].ID) }(i) } wait.Wait() @@ -180,7 +183,8 @@ func (rm *ReplicationManager) synchronize() { // TODO: remove this method completely and rely on the watch. // Add resource version tracking to watch to make this work. var controllerSpecs []api.ReplicationController - list, err := rm.kubeClient.ListReplicationControllers(labels.Everything()) + ctx := api.NewContext() + list, err := rm.kubeClient.ListReplicationControllers(ctx, labels.Everything()) if err != nil { glog.Errorf("Synchronization error: %v (%#v)", err, err) return diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index fb96b9d915..58780b77e3 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -49,13 +49,13 @@ type FakePodControl struct { lock sync.Mutex } -func (f *FakePodControl) createReplica(spec api.ReplicationController) { +func (f *FakePodControl) createReplica(ctx api.Context, spec api.ReplicationController) { f.lock.Lock() defer f.lock.Unlock() f.controllerSpec = append(f.controllerSpec, spec) } -func (f *FakePodControl) deletePod(podID string) error { +func (f *FakePodControl) deletePod(ctx api.Context, podID string) error { f.lock.Lock() defer f.lock.Unlock() f.deletePodID = append(f.deletePodID, podID) @@ -169,6 +169,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) { } func TestCreateReplica(t *testing.T) { + ctx := api.NewDefaultContext() body := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Pod{}) fakeHandler := util.FakeHandler{ StatusCode: 200, @@ -204,7 +205,7 @@ func TestCreateReplica(t *testing.T) { }, } - podControl.createReplica(controllerSpec) + podControl.createReplica(ctx, controllerSpec) expectedPod := api.Pod{ JSONBase: api.JSONBase{ @@ -323,7 +324,7 @@ type FakeWatcher struct { *client.Fake } -func (fw FakeWatcher) WatchReplicationControllers(l, f labels.Selector, rv uint64) (watch.Interface, error) { +func (fw FakeWatcher) WatchReplicationControllers(ctx api.Context, l, f labels.Selector, rv uint64) (watch.Interface, error) { return fw.w, nil } diff --git a/pkg/kubecfg/kubecfg.go b/pkg/kubecfg/kubecfg.go index cd1ac41565..c211d71792 100644 --- a/pkg/kubecfg/kubecfg.go +++ b/pkg/kubecfg/kubecfg.go @@ -91,15 +91,15 @@ func LoadAuthInfo(path string, r io.Reader) (*AuthInfo, error) { // with the first container in the pod. There is no support yet for // updating more complex replication controllers. If this is blank then no // update of the image is performed. -func Update(name string, client client.Interface, updatePeriod time.Duration, imageName string) error { - controller, err := client.GetReplicationController(name) +func Update(ctx api.Context, name string, client client.Interface, updatePeriod time.Duration, imageName string) error { + controller, err := client.GetReplicationController(ctx, name) if err != nil { return err } if len(imageName) != 0 { controller.DesiredState.PodTemplate.DesiredState.Manifest.Containers[0].Image = imageName - controller, err = client.UpdateReplicationController(controller) + controller, err = client.UpdateReplicationController(ctx, controller) if err != nil { return err } @@ -107,7 +107,7 @@ func Update(name string, client client.Interface, updatePeriod time.Duration, im s := labels.Set(controller.DesiredState.ReplicaSelector).AsSelector() - podList, err := client.ListPods(s) + podList, err := client.ListPods(ctx, s) if err != nil { return err } @@ -118,14 +118,14 @@ func Update(name string, client client.Interface, updatePeriod time.Duration, im for _, pod := range podList.Items { // We delete the pod here, the controller will recreate it. This will result in pulling // a new Docker image. This isn't a full "update" but it's what we support for now. - err = client.DeletePod(pod.ID) + err = client.DeletePod(ctx, pod.ID) if err != nil { return err } time.Sleep(updatePeriod) } return wait.Poll(time.Second*5, time.Second*300, func() (bool, error) { - podList, err := client.ListPods(s) + podList, err := client.ListPods(ctx, s) if err != nil { return false, err } @@ -134,18 +134,18 @@ func Update(name string, client client.Interface, updatePeriod time.Duration, im } // StopController stops a controller named 'name' by setting replicas to zero. -func StopController(name string, client client.Interface) error { - return ResizeController(name, 0, client) +func StopController(ctx api.Context, name string, client client.Interface) error { + return ResizeController(ctx, name, 0, client) } // ResizeController resizes a controller named 'name' by setting replicas to 'replicas'. -func ResizeController(name string, replicas int, client client.Interface) error { - controller, err := client.GetReplicationController(name) +func ResizeController(ctx api.Context, name string, replicas int, client client.Interface) error { + controller, err := client.GetReplicationController(ctx, name) if err != nil { return err } controller.DesiredState.Replicas = replicas - controllerOut, err := client.UpdateReplicationController(controller) + controllerOut, err := client.UpdateReplicationController(ctx, controller) if err != nil { return err } @@ -198,7 +198,7 @@ func portsFromString(spec string) []api.Port { } // RunController creates a new replication controller named 'name' which creates 'replicas' pods running 'image'. -func RunController(image, name string, replicas int, client client.Interface, portSpec string, servicePort int) error { +func RunController(ctx api.Context, image, name string, replicas int, client client.Interface, portSpec string, servicePort int) error { if servicePort > 0 && !util.IsDNSLabel(name) { return fmt.Errorf("Service creation requested, but an invalid name for a service was provided (%s). Service names must be valid DNS labels.", name) } @@ -231,7 +231,7 @@ func RunController(image, name string, replicas int, client client.Interface, po }, } - controllerOut, err := client.CreateReplicationController(controller) + controllerOut, err := client.CreateReplicationController(ctx, controller) if err != nil { return err } @@ -242,7 +242,7 @@ func RunController(image, name string, replicas int, client client.Interface, po fmt.Print(string(data)) if servicePort > 0 { - svc, err := createService(name, servicePort, client) + svc, err := createService(ctx, name, servicePort, client) if err != nil { return err } @@ -255,7 +255,7 @@ func RunController(image, name string, replicas int, client client.Interface, po return nil } -func createService(name string, port int, client client.Interface) (*api.Service, error) { +func createService(ctx api.Context, name string, port int, client client.Interface) (*api.Service, error) { svc := &api.Service{ JSONBase: api.JSONBase{ID: name}, Port: port, @@ -266,19 +266,19 @@ func createService(name string, port int, client client.Interface) (*api.Service "simpleService": name, }, } - svc, err := client.CreateService(svc) + svc, err := client.CreateService(ctx, svc) return svc, err } // DeleteController deletes a replication controller named 'name', requires that the controller // already be stopped. -func DeleteController(name string, client client.Interface) error { - controller, err := client.GetReplicationController(name) +func DeleteController(ctx api.Context, name string, client client.Interface) error { + controller, err := client.GetReplicationController(ctx, name) if err != nil { return err } if controller.DesiredState.Replicas != 0 { return fmt.Errorf("controller has non-zero replicas (%d), please stop it first", controller.DesiredState.Replicas) } - return client.DeleteReplicationController(name) + return client.DeleteReplicationController(ctx, name) } diff --git a/pkg/kubecfg/kubecfg_test.go b/pkg/kubecfg/kubecfg_test.go index 912cbfa723..84b8d7c0ab 100644 --- a/pkg/kubecfg/kubecfg_test.go +++ b/pkg/kubecfg/kubecfg_test.go @@ -43,7 +43,7 @@ func TestUpdateWithPods(t *testing.T) { }, }, } - Update("foo", &fakeClient, 0, "") + Update(api.NewDefaultContext(), "foo", &fakeClient, 0, "") if len(fakeClient.Actions) != 5 { t.Fatalf("Unexpected action list %#v", fakeClient.Actions) } @@ -57,7 +57,7 @@ func TestUpdateWithPods(t *testing.T) { func TestUpdateNoPods(t *testing.T) { fakeClient := client.Fake{} - Update("foo", &fakeClient, 0, "") + Update(api.NewDefaultContext(), "foo", &fakeClient, 0, "") if len(fakeClient.Actions) != 2 { t.Errorf("Unexpected action list %#v", fakeClient.Actions) } @@ -87,7 +87,7 @@ func TestUpdateWithNewImage(t *testing.T) { }, }, } - Update("foo", &fakeClient, 0, "fooImage:2") + Update(api.NewDefaultContext(), "foo", &fakeClient, 0, "fooImage:2") if len(fakeClient.Actions) != 6 { t.Errorf("Unexpected action list %#v", fakeClient.Actions) } @@ -109,7 +109,7 @@ func TestRunController(t *testing.T) { name := "name" image := "foo/bar" replicas := 3 - RunController(image, name, replicas, &fakeClient, "8080:80", -1) + RunController(api.NewDefaultContext(), image, name, replicas, &fakeClient, "8080:80", -1) if len(fakeClient.Actions) != 1 || fakeClient.Actions[0].Action != "create-controller" { t.Errorf("Unexpected actions: %#v", fakeClient.Actions) } @@ -126,7 +126,7 @@ func TestRunControllerWithService(t *testing.T) { name := "name" image := "foo/bar" replicas := 3 - RunController(image, name, replicas, &fakeClient, "", 8000) + RunController(api.NewDefaultContext(), image, name, replicas, &fakeClient, "", 8000) if len(fakeClient.Actions) != 2 || fakeClient.Actions[0].Action != "create-controller" || fakeClient.Actions[1].Action != "create-service" { @@ -143,7 +143,7 @@ func TestRunControllerWithService(t *testing.T) { func TestStopController(t *testing.T) { fakeClient := client.Fake{} name := "name" - StopController(name, &fakeClient) + StopController(api.NewDefaultContext(), name, &fakeClient) if len(fakeClient.Actions) != 2 { t.Errorf("Unexpected actions: %#v", fakeClient.Actions) } @@ -162,7 +162,7 @@ func TestResizeController(t *testing.T) { fakeClient := client.Fake{} name := "name" replicas := 17 - ResizeController(name, replicas, &fakeClient) + ResizeController(api.NewDefaultContext(), name, replicas, &fakeClient) if len(fakeClient.Actions) != 2 { t.Errorf("Unexpected actions: %#v", fakeClient.Actions) } @@ -180,7 +180,7 @@ func TestResizeController(t *testing.T) { func TestCloudCfgDeleteController(t *testing.T) { fakeClient := client.Fake{} name := "name" - err := DeleteController(name, &fakeClient) + err := DeleteController(api.NewDefaultContext(), name, &fakeClient) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -206,7 +206,7 @@ func TestCloudCfgDeleteControllerWithReplicas(t *testing.T) { }, } name := "name" - err := DeleteController(name, &fakeClient) + err := DeleteController(api.NewDefaultContext(), name, &fakeClient) if len(fakeClient.Actions) != 1 { t.Errorf("Unexpected actions: %#v", fakeClient.Actions) } diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 995851dc3a..e1e28c55ce 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -29,10 +29,10 @@ import ( // Watcher is the interface needed to receive changes to services and endpoints. type Watcher interface { - ListServices(label labels.Selector) (*api.ServiceList, error) - ListEndpoints(label labels.Selector) (*api.EndpointsList, error) - WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) - WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + ListServices(ctx api.Context, label labels.Selector) (*api.ServiceList, error) + ListEndpoints(ctx api.Context, label labels.Selector) (*api.EndpointsList, error) + WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } // SourceAPI implements a configuration source for services and endpoints that @@ -72,8 +72,9 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU // runServices loops forever looking for changes to services. func (s *SourceAPI) runServices(resourceVersion *uint64) { + ctx := api.NewContext() if *resourceVersion == 0 { - services, err := s.client.ListServices(labels.Everything()) + services, err := s.client.ListServices(ctx, labels.Everything()) if err != nil { glog.Errorf("Unable to load services: %v", err) time.Sleep(wait.Jitter(s.waitDuration, 0.0)) @@ -83,7 +84,7 @@ func (s *SourceAPI) runServices(resourceVersion *uint64) { s.services <- ServiceUpdate{Op: SET, Services: services.Items} } - watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion) + watcher, err := s.client.WatchServices(ctx, labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for services changes: %v", err) time.Sleep(wait.Jitter(s.waitDuration, 0.0)) @@ -121,8 +122,9 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates // runEndpoints loops forever looking for changes to endpoints. func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { + ctx := api.NewContext() if *resourceVersion == 0 { - endpoints, err := s.client.ListEndpoints(labels.Everything()) + endpoints, err := s.client.ListEndpoints(ctx, labels.Everything()) if err != nil { glog.Errorf("Unable to load endpoints: %v", err) time.Sleep(wait.Jitter(s.waitDuration, 0.0)) @@ -132,7 +134,7 @@ func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items} } - watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion) + watcher, err := s.client.WatchEndpoints(ctx, labels.Everything(), labels.Everything(), *resourceVersion) if err != nil { glog.Errorf("Unable to watch for endpoints changes: %v", err) time.Sleep(wait.Jitter(s.waitDuration, 0.0)) diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index acfac20e25..48810c175b 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -47,14 +47,16 @@ func NewEndpointController(serviceRegistry service.Registry, client *client.Clie // SyncServiceEndpoints syncs service endpoints. func (e *EndpointController) SyncServiceEndpoints() error { - services, err := e.client.ListServices(labels.Everything()) + ctx := api.NewContext() + services, err := e.client.ListServices(ctx, labels.Everything()) if err != nil { glog.Errorf("Failed to list services: %v", err) return err } var resultErr error for _, service := range services.Items { - pods, err := e.client.ListPods(labels.Set(service.Selector).AsSelector()) + nsCtx := api.WithNamespace(ctx, service.Namespace) + pods, err := e.client.ListPods(nsCtx, labels.Set(service.Selector).AsSelector()) if err != nil { glog.Errorf("Error syncing service: %#v, skipping.", service) resultErr = err @@ -73,7 +75,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { } endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) } - currentEndpoints, err := e.client.GetEndpoints(service.ID) + currentEndpoints, err := e.client.GetEndpoints(nsCtx, service.ID) if err != nil { // TODO this is brittle as all get out, refactor the client libraries to return a structured error. if strings.Contains(err.Error(), "(404)") { @@ -93,14 +95,14 @@ func (e *EndpointController) SyncServiceEndpoints() error { if currentEndpoints.ResourceVersion == 0 { // No previous endpoints, create them - _, err = e.client.CreateEndpoints(newEndpoints) + _, err = e.client.CreateEndpoints(nsCtx, newEndpoints) } else { // Pre-existing if endpointsEqual(currentEndpoints, endpoints) { glog.V(2).Infof("endpoints are equal for %s, skipping update", service.ID) continue } - _, err = e.client.UpdateEndpoints(newEndpoints) + _, err = e.client.UpdateEndpoints(nsCtx, newEndpoints) } if err != nil { glog.Errorf("Error updating endpoints: %#v", err) diff --git a/test/integration/client_test.go b/test/integration/client_test.go index 7deca9b6cb..b535349233 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -59,6 +59,7 @@ func TestClient(t *testing.T) { } for apiVersion, values := range testCases { + ctx := api.NewDefaultContext() deleteAllEtcdKeys() s := httptest.NewServer(apiserver.Handle(values.Storage, values.Codec, fmt.Sprintf("/api/%s/", apiVersion), values.selfLinker)) client := client.NewOrDie(&client.Config{Host: s.URL, Version: apiVersion}) @@ -71,7 +72,7 @@ func TestClient(t *testing.T) { t.Errorf("expected %#v, got %#v", e, a) } - pods, err := client.ListPods(labels.Everything()) + pods, err := client.ListPods(ctx, labels.Everything()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -92,14 +93,14 @@ func TestClient(t *testing.T) { }, }, } - got, err := client.CreatePod(pod) + got, err := client.CreatePod(ctx, pod) if err == nil { t.Fatalf("unexpected non-error: %v", err) } // get a created pod pod.DesiredState.Manifest.Containers[0].Image = "an-image" - got, err = client.CreatePod(pod) + got, err = client.CreatePod(ctx, pod) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -108,7 +109,7 @@ func TestClient(t *testing.T) { } // pod is shown, but not scheduled - pods, err = client.ListPods(labels.Everything()) + pods, err = client.ListPods(ctx, labels.Everything()) if err != nil { t.Fatalf("unexpected error: %v", err) }