Add context as parameter to client interface functions

pull/6/head
derekwaynecarr 2014-10-02 12:51:36 -04:00
parent d3816069e6
commit fc67d822c6
9 changed files with 123 additions and 119 deletions

View File

@ -175,7 +175,7 @@ func main() {
}
// TODO: get the namespace context when kubecfg ns is completed
clientConfig.Context = api.NewContext()
ctx := api.NewContext()
if clientConfig.Host == "" {
// TODO: eventually apiserver should start on 443 and be secure by default
@ -242,7 +242,7 @@ func main() {
}
method := flag.Arg(0)
matchFound := executeAPIRequest(method, kubeClient) || executeControllerRequest(method, kubeClient)
matchFound := executeAPIRequest(ctx, method, kubeClient) || executeControllerRequest(ctx, method, kubeClient)
if matchFound == false {
glog.Fatalf("Unknown command %s", method)
}
@ -302,7 +302,7 @@ func getPrinter() kubecfg.ResourcePrinter {
return printer
}
func executeAPIRequest(method string, c *client.Client) bool {
func executeAPIRequest(ctx api.Context, method string, c *client.Client) bool {
storage, path, hasSuffix := storagePathFromArg(flag.Arg(1))
validStorage := checkStorage(storage)
verb := ""
@ -401,7 +401,7 @@ func executeAPIRequest(method string, c *client.Client) bool {
return true
}
func executeControllerRequest(method string, c *client.Client) bool {
func executeControllerRequest(ctx api.Context, method string, c *client.Client) bool {
parseController := func() string {
if len(flag.Args()) != 2 {
glog.Fatal("usage: kubecfg [OPTIONS] stop|rm|rollingupdate <controller>")
@ -412,11 +412,11 @@ func executeControllerRequest(method string, c *client.Client) bool {
var err error
switch method {
case "stop":
err = kubecfg.StopController(parseController(), c)
err = kubecfg.StopController(ctx, parseController(), c)
case "rm":
err = kubecfg.DeleteController(parseController(), c)
err = kubecfg.DeleteController(ctx, parseController(), c)
case "rollingupdate":
err = kubecfg.Update(parseController(), c, *updatePeriod, *imageName)
err = kubecfg.Update(ctx, parseController(), c, *updatePeriod, *imageName)
case "run":
if len(flag.Args()) != 4 {
glog.Fatal("usage: kubecfg [OPTIONS] run <image> <replicas> <controller>")
@ -427,7 +427,7 @@ func executeControllerRequest(method string, c *client.Client) bool {
glog.Fatalf("Error parsing replicas: %v", err2)
}
name := flag.Arg(3)
err = kubecfg.RunController(image, name, replicas, c, *portSpec, *servicePort)
err = kubecfg.RunController(ctx, image, name, replicas, c, *portSpec, *servicePort)
case "resize":
args := flag.Args()
if len(args) < 3 {
@ -438,7 +438,7 @@ func executeControllerRequest(method string, c *client.Client) bool {
if err2 != nil {
glog.Fatalf("Error parsing replicas: %v", err2)
}
err = kubecfg.ResizeController(name, replicas, c)
err = kubecfg.ResizeController(ctx, name, replicas, c)
default:
return false
}

View File

@ -39,38 +39,38 @@ type Interface interface {
// PodInterface has methods to work with Pod resources.
type PodInterface interface {
ListPods(selector labels.Selector) (*api.PodList, error)
GetPod(id string) (*api.Pod, error)
DeletePod(id string) error
CreatePod(*api.Pod) (*api.Pod, error)
UpdatePod(*api.Pod) (*api.Pod, error)
ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error)
GetPod(ctx api.Context, id string) (*api.Pod, error)
DeletePod(ctx api.Context, id string) error
CreatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error)
UpdatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error)
}
// ReplicationControllerInterface has methods to work with ReplicationController resources.
type ReplicationControllerInterface interface {
ListReplicationControllers(selector labels.Selector) (*api.ReplicationControllerList, error)
GetReplicationController(id string) (*api.ReplicationController, error)
CreateReplicationController(*api.ReplicationController) (*api.ReplicationController, error)
UpdateReplicationController(*api.ReplicationController) (*api.ReplicationController, error)
DeleteReplicationController(string) error
WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
ListReplicationControllers(ctx api.Context, selector labels.Selector) (*api.ReplicationControllerList, error)
GetReplicationController(ctx api.Context, id string) (*api.ReplicationController, error)
CreateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error)
UpdateReplicationController(ctx api.Context, ctrl *api.ReplicationController) (*api.ReplicationController, error)
DeleteReplicationController(ctx api.Context, id string) error
WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// ServiceInterface has methods to work with Service resources.
type ServiceInterface interface {
ListServices(selector labels.Selector) (*api.ServiceList, error)
GetService(id string) (*api.Service, error)
CreateService(*api.Service) (*api.Service, error)
UpdateService(*api.Service) (*api.Service, error)
DeleteService(string) error
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
ListServices(ctx api.Context, selector labels.Selector) (*api.ServiceList, error)
GetService(ctx api.Context, id string) (*api.Service, error)
CreateService(ctx api.Context, srv *api.Service) (*api.Service, error)
UpdateService(ctx api.Context, srv *api.Service) (*api.Service, error)
DeleteService(ctx api.Context, id string) error
WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// EndpointsInterface has methods to work with Endpoints resources
type EndpointsInterface interface {
ListEndpoints(selector labels.Selector) (*api.EndpointsList, error)
GetEndpoints(id string) (*api.Endpoints, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
ListEndpoints(ctx api.Context, selector labels.Selector) (*api.EndpointsList, error)
GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error)
WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// VersionInterface has a method to retrieve the server version.
@ -94,33 +94,33 @@ type Client struct {
}
// ListPods takes a selector, and returns the list of pods that match that selector.
func (c *Client) ListPods(selector labels.Selector) (result *api.PodList, err error) {
func (c *Client) ListPods(ctx api.Context, selector labels.Selector) (result *api.PodList, err error) {
result = &api.PodList{}
err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(result)
return
}
// GetPod takes the id of the pod, and returns the corresponding Pod object, and an error if it occurs
func (c *Client) GetPod(id string) (result *api.Pod, err error) {
func (c *Client) GetPod(ctx api.Context, id string) (result *api.Pod, err error) {
result = &api.Pod{}
err = c.Get().Path("pods").Path(id).Do().Into(result)
return
}
// DeletePod takes the id of the pod, and returns an error if one occurs
func (c *Client) DeletePod(id string) error {
func (c *Client) DeletePod(ctx api.Context, id string) error {
return c.Delete().Path("pods").Path(id).Do().Error()
}
// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
func (c *Client) CreatePod(pod *api.Pod) (result *api.Pod, err error) {
func (c *Client) CreatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) {
result = &api.Pod{}
err = c.Post().Path("pods").Body(pod).Do().Into(result)
return
}
// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
func (c *Client) UpdatePod(pod *api.Pod) (result *api.Pod, err error) {
func (c *Client) UpdatePod(ctx api.Context, pod *api.Pod) (result *api.Pod, err error) {
result = &api.Pod{}
if pod.ResourceVersion == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", pod)
@ -131,28 +131,28 @@ func (c *Client) UpdatePod(pod *api.Pod) (result *api.Pod, err error) {
}
// ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector.
func (c *Client) ListReplicationControllers(selector labels.Selector) (result *api.ReplicationControllerList, err error) {
func (c *Client) ListReplicationControllers(ctx api.Context, selector labels.Selector) (result *api.ReplicationControllerList, err error) {
result = &api.ReplicationControllerList{}
err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(result)
return
}
// GetReplicationController returns information about a particular replication controller.
func (c *Client) GetReplicationController(id string) (result *api.ReplicationController, err error) {
func (c *Client) GetReplicationController(ctx api.Context, id string) (result *api.ReplicationController, err error) {
result = &api.ReplicationController{}
err = c.Get().Path("replicationControllers").Path(id).Do().Into(result)
return
}
// CreateReplicationController creates a new replication controller.
func (c *Client) CreateReplicationController(controller *api.ReplicationController) (result *api.ReplicationController, err error) {
func (c *Client) CreateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) {
result = &api.ReplicationController{}
err = c.Post().Path("replicationControllers").Body(controller).Do().Into(result)
return
}
// UpdateReplicationController updates an existing replication controller.
func (c *Client) UpdateReplicationController(controller *api.ReplicationController) (result *api.ReplicationController, err error) {
func (c *Client) UpdateReplicationController(ctx api.Context, controller *api.ReplicationController) (result *api.ReplicationController, err error) {
result = &api.ReplicationController{}
if controller.ResourceVersion == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", controller)
@ -163,12 +163,12 @@ func (c *Client) UpdateReplicationController(controller *api.ReplicationControll
}
// DeleteReplicationController deletes an existing replication controller.
func (c *Client) DeleteReplicationController(id string) error {
func (c *Client) DeleteReplicationController(ctx api.Context, id string) error {
return c.Delete().Path("replicationControllers").Path(id).Do().Error()
}
// WatchReplicationControllers returns a watch.Interface that watches the requested controllers.
func (c *Client) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Client) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("replicationControllers").
@ -179,28 +179,28 @@ func (c *Client) WatchReplicationControllers(label, field labels.Selector, resou
}
// ListServices takes a selector, and returns the list of services that match that selector
func (c *Client) ListServices(selector labels.Selector) (result *api.ServiceList, err error) {
func (c *Client) ListServices(ctx api.Context, selector labels.Selector) (result *api.ServiceList, err error) {
result = &api.ServiceList{}
err = c.Get().Path("services").SelectorParam("labels", selector).Do().Into(result)
return
}
// GetService returns information about a particular service.
func (c *Client) GetService(id string) (result *api.Service, err error) {
func (c *Client) GetService(ctx api.Context, id string) (result *api.Service, err error) {
result = &api.Service{}
err = c.Get().Path("services").Path(id).Do().Into(result)
return
}
// CreateService creates a new service.
func (c *Client) CreateService(svc *api.Service) (result *api.Service, err error) {
func (c *Client) CreateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) {
result = &api.Service{}
err = c.Post().Path("services").Body(svc).Do().Into(result)
return
}
// UpdateService updates an existing service.
func (c *Client) UpdateService(svc *api.Service) (result *api.Service, err error) {
func (c *Client) UpdateService(ctx api.Context, svc *api.Service) (result *api.Service, err error) {
result = &api.Service{}
if svc.ResourceVersion == 0 {
err = fmt.Errorf("invalid update object, missing resource version: %v", svc)
@ -211,12 +211,12 @@ func (c *Client) UpdateService(svc *api.Service) (result *api.Service, err error
}
// DeleteService deletes an existing service.
func (c *Client) DeleteService(id string) error {
func (c *Client) DeleteService(ctx api.Context, id string) error {
return c.Delete().Path("services").Path(id).Do().Error()
}
// WatchServices returns a watch.Interface that watches the requested services.
func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Client) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("services").
@ -227,21 +227,21 @@ func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uin
}
// ListEndpoints takes a selector, and returns the list of endpoints that match that selector
func (c *Client) ListEndpoints(selector labels.Selector) (result *api.EndpointsList, err error) {
func (c *Client) ListEndpoints(ctx api.Context, selector labels.Selector) (result *api.EndpointsList, err error) {
result = &api.EndpointsList{}
err = c.Get().Path("endpoints").SelectorParam("labels", selector).Do().Into(result)
return
}
// GetEndpoints returns information about the endpoints for a particular service.
func (c *Client) GetEndpoints(id string) (result *api.Endpoints, err error) {
func (c *Client) GetEndpoints(ctx api.Context, id string) (result *api.Endpoints, err error) {
result = &api.Endpoints{}
err = c.Get().Path("endpoints").Path(id).Do().Into(result)
return
}
// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service.
func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Client) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("endpoints").
@ -251,13 +251,13 @@ func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion ui
Watch()
}
func (c *Client) CreateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) {
func (c *Client) CreateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) {
result := &api.Endpoints{}
err := c.Post().Path("endpoints").Body(endpoints).Do().Into(result)
return result, err
}
func (c *Client) UpdateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) {
func (c *Client) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) (*api.Endpoints, error) {
result := &api.Endpoints{}
if endpoints.ResourceVersion == 0 {
return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints)

View File

@ -26,7 +26,8 @@ import (
// for a controller's ReplicaSelector equals the Replicas count.
func (c *Client) ControllerHasDesiredReplicas(controller api.ReplicationController) wait.ConditionFunc {
return func() (bool, error) {
pods, err := c.ListPods(labels.Set(controller.DesiredState.ReplicaSelector).AsSelector())
ctx := api.WithNamespace(api.NewContext(), controller.Namespace)
pods, err := c.ListPods(ctx, labels.Set(controller.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
return false, err
}

View File

@ -42,102 +42,102 @@ type Fake struct {
Watch watch.Interface
}
func (c *Fake) ListPods(selector labels.Selector) (*api.PodList, error) {
func (c *Fake) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-pods"})
return api.Scheme.CopyOrDie(&c.Pods).(*api.PodList), nil
}
func (c *Fake) GetPod(name string) (*api.Pod, error) {
func (c *Fake) GetPod(ctx api.Context, name string) (*api.Pod, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-pod", Value: name})
return &api.Pod{}, nil
}
func (c *Fake) DeletePod(name string) error {
func (c *Fake) DeletePod(ctx api.Context, name string) error {
c.Actions = append(c.Actions, FakeAction{Action: "delete-pod", Value: name})
return nil
}
func (c *Fake) CreatePod(pod *api.Pod) (*api.Pod, error) {
func (c *Fake) CreatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error) {
c.Actions = append(c.Actions, FakeAction{Action: "create-pod"})
return &api.Pod{}, nil
}
func (c *Fake) UpdatePod(pod *api.Pod) (*api.Pod, error) {
func (c *Fake) UpdatePod(ctx api.Context, pod *api.Pod) (*api.Pod, error) {
c.Actions = append(c.Actions, FakeAction{Action: "update-pod", Value: pod.ID})
return &api.Pod{}, nil
}
func (c *Fake) ListReplicationControllers(selector labels.Selector) (*api.ReplicationControllerList, error) {
func (c *Fake) ListReplicationControllers(ctx api.Context, selector labels.Selector) (*api.ReplicationControllerList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-controllers"})
return &api.ReplicationControllerList{}, nil
}
func (c *Fake) GetReplicationController(name string) (*api.ReplicationController, error) {
func (c *Fake) GetReplicationController(ctx api.Context, name string) (*api.ReplicationController, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-controller", Value: name})
return api.Scheme.CopyOrDie(&c.Ctrl).(*api.ReplicationController), nil
}
func (c *Fake) CreateReplicationController(controller *api.ReplicationController) (*api.ReplicationController, error) {
func (c *Fake) CreateReplicationController(ctx api.Context, controller *api.ReplicationController) (*api.ReplicationController, error) {
c.Actions = append(c.Actions, FakeAction{Action: "create-controller", Value: controller})
return &api.ReplicationController{}, nil
}
func (c *Fake) UpdateReplicationController(controller *api.ReplicationController) (*api.ReplicationController, error) {
func (c *Fake) UpdateReplicationController(ctx api.Context, controller *api.ReplicationController) (*api.ReplicationController, error) {
c.Actions = append(c.Actions, FakeAction{Action: "update-controller", Value: controller})
return &api.ReplicationController{}, nil
}
func (c *Fake) DeleteReplicationController(controller string) error {
func (c *Fake) DeleteReplicationController(ctx api.Context, controller string) error {
c.Actions = append(c.Actions, FakeAction{Action: "delete-controller", Value: controller})
return nil
}
func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Fake) WatchReplicationControllers(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion})
return c.Watch, nil
}
func (c *Fake) ListServices(selector labels.Selector) (*api.ServiceList, error) {
func (c *Fake) ListServices(ctx api.Context, selector labels.Selector) (*api.ServiceList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-services"})
return &c.ServiceList, c.Err
}
func (c *Fake) GetService(name string) (*api.Service, error) {
func (c *Fake) GetService(ctx api.Context, name string) (*api.Service, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-service", Value: name})
return &api.Service{}, nil
}
func (c *Fake) CreateService(service *api.Service) (*api.Service, error) {
func (c *Fake) CreateService(ctx api.Context, service *api.Service) (*api.Service, error) {
c.Actions = append(c.Actions, FakeAction{Action: "create-service", Value: service})
return &api.Service{}, nil
}
func (c *Fake) UpdateService(service *api.Service) (*api.Service, error) {
func (c *Fake) UpdateService(ctx api.Context, service *api.Service) (*api.Service, error) {
c.Actions = append(c.Actions, FakeAction{Action: "update-service", Value: service})
return &api.Service{}, nil
}
func (c *Fake) DeleteService(service string) error {
func (c *Fake) DeleteService(ctx api.Context, service string) error {
c.Actions = append(c.Actions, FakeAction{Action: "delete-service", Value: service})
return nil
}
func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Fake) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion})
return c.Watch, c.Err
}
func (c *Fake) ListEndpoints(selector labels.Selector) (*api.EndpointsList, error) {
func (c *Fake) ListEndpoints(ctx api.Context, selector labels.Selector) (*api.EndpointsList, error) {
c.Actions = append(c.Actions, FakeAction{Action: "list-endpoints"})
return api.Scheme.CopyOrDie(&c.EndpointsList).(*api.EndpointsList), c.Err
}
func (c *Fake) GetEndpoints(name string) (*api.Endpoints, error) {
func (c *Fake) GetEndpoints(ctx api.Context, name string) (*api.Endpoints, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-endpoints"})
return &api.Endpoints{}, nil
}
func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (c *Fake) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion})
return c.Watch, c.Err
}

View File

@ -23,7 +23,6 @@ import (
"path"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
)
@ -55,10 +54,6 @@ type Config struct {
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options.
Transport http.RoundTripper
// Context is the context that should be passed down to the server. If nil, the
// context will be set to the appropriate default.
Context api.Context
}
// New creates a Kubernetes client for the given config. This client works with pods,

View File

@ -42,9 +42,9 @@ type ReplicationManager struct {
// created as an interface to allow testing.
type PodControlInterface interface {
// createReplica creates new replicated pods according to the spec.
createReplica(controllerSpec api.ReplicationController)
createReplica(ctx api.Context, controllerSpec api.ReplicationController)
// deletePod deletes the pod identified by podID.
deletePod(podID string) error
deletePod(ctx api.Context, podID string) error
}
// RealPodControl is the default implementation of PodControllerInterface.
@ -52,7 +52,7 @@ type RealPodControl struct {
kubeClient client.Interface
}
func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) {
func (r RealPodControl) createReplica(ctx api.Context, controllerSpec api.ReplicationController) {
labels := controllerSpec.DesiredState.PodTemplate.Labels
// TODO: don't fail to set this label just because the map isn't created.
if labels != nil {
@ -62,14 +62,14 @@ func (r RealPodControl) createReplica(controllerSpec api.ReplicationController)
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
Labels: controllerSpec.DesiredState.PodTemplate.Labels,
}
_, err := r.kubeClient.CreatePod(pod)
_, err := r.kubeClient.CreatePod(ctx, pod)
if err != nil {
glog.Errorf("%#v\n", err)
}
}
func (r RealPodControl) deletePod(podID string) error {
return r.kubeClient.DeletePod(podID)
func (r RealPodControl) deletePod(ctx api.Context, podID string) error {
return r.kubeClient.DeletePod(ctx, podID)
}
// NewReplicationManager creates a new ReplicationManager.
@ -93,7 +93,9 @@ func (rm *ReplicationManager) Run(period time.Duration) {
// resourceVersion is a pointer to the resource version to use/update.
func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
ctx := api.NewContext()
watching, err := rm.kubeClient.WatchReplicationControllers(
ctx,
labels.Everything(),
labels.Everything(),
*resourceVersion,
@ -143,7 +145,8 @@ func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
s := labels.Set(controllerSpec.DesiredState.ReplicaSelector).AsSelector()
podList, err := rm.kubeClient.ListPods(s)
ctx := api.WithNamespace(api.NewContext(), controllerSpec.Namespace)
podList, err := rm.kubeClient.ListPods(ctx, s)
if err != nil {
return err
}
@ -157,7 +160,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
rm.podControl.createReplica(controllerSpec)
rm.podControl.createReplica(ctx, controllerSpec)
}()
}
wait.Wait()
@ -168,7 +171,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
rm.podControl.deletePod(filteredList[ix].ID)
rm.podControl.deletePod(ctx, filteredList[ix].ID)
}(i)
}
wait.Wait()
@ -180,7 +183,8 @@ func (rm *ReplicationManager) synchronize() {
// TODO: remove this method completely and rely on the watch.
// Add resource version tracking to watch to make this work.
var controllerSpecs []api.ReplicationController
list, err := rm.kubeClient.ListReplicationControllers(labels.Everything())
ctx := api.NewContext()
list, err := rm.kubeClient.ListReplicationControllers(ctx, labels.Everything())
if err != nil {
glog.Errorf("Synchronization error: %v (%#v)", err, err)
return

View File

@ -91,15 +91,15 @@ func LoadAuthInfo(path string, r io.Reader) (*AuthInfo, error) {
// with the first container in the pod. There is no support yet for
// updating more complex replication controllers. If this is blank then no
// update of the image is performed.
func Update(name string, client client.Interface, updatePeriod time.Duration, imageName string) error {
controller, err := client.GetReplicationController(name)
func Update(ctx api.Context, name string, client client.Interface, updatePeriod time.Duration, imageName string) error {
controller, err := client.GetReplicationController(ctx, name)
if err != nil {
return err
}
if len(imageName) != 0 {
controller.DesiredState.PodTemplate.DesiredState.Manifest.Containers[0].Image = imageName
controller, err = client.UpdateReplicationController(controller)
controller, err = client.UpdateReplicationController(ctx, controller)
if err != nil {
return err
}
@ -107,7 +107,7 @@ func Update(name string, client client.Interface, updatePeriod time.Duration, im
s := labels.Set(controller.DesiredState.ReplicaSelector).AsSelector()
podList, err := client.ListPods(s)
podList, err := client.ListPods(ctx, s)
if err != nil {
return err
}
@ -118,14 +118,14 @@ func Update(name string, client client.Interface, updatePeriod time.Duration, im
for _, pod := range podList.Items {
// We delete the pod here, the controller will recreate it. This will result in pulling
// a new Docker image. This isn't a full "update" but it's what we support for now.
err = client.DeletePod(pod.ID)
err = client.DeletePod(ctx, pod.ID)
if err != nil {
return err
}
time.Sleep(updatePeriod)
}
return wait.Poll(time.Second*5, time.Second*300, func() (bool, error) {
podList, err := client.ListPods(s)
podList, err := client.ListPods(ctx, s)
if err != nil {
return false, err
}
@ -134,18 +134,18 @@ func Update(name string, client client.Interface, updatePeriod time.Duration, im
}
// StopController stops a controller named 'name' by setting replicas to zero.
func StopController(name string, client client.Interface) error {
return ResizeController(name, 0, client)
func StopController(ctx api.Context, name string, client client.Interface) error {
return ResizeController(ctx, name, 0, client)
}
// ResizeController resizes a controller named 'name' by setting replicas to 'replicas'.
func ResizeController(name string, replicas int, client client.Interface) error {
controller, err := client.GetReplicationController(name)
func ResizeController(ctx api.Context, name string, replicas int, client client.Interface) error {
controller, err := client.GetReplicationController(ctx, name)
if err != nil {
return err
}
controller.DesiredState.Replicas = replicas
controllerOut, err := client.UpdateReplicationController(controller)
controllerOut, err := client.UpdateReplicationController(ctx, controller)
if err != nil {
return err
}
@ -198,7 +198,7 @@ func portsFromString(spec string) []api.Port {
}
// RunController creates a new replication controller named 'name' which creates 'replicas' pods running 'image'.
func RunController(image, name string, replicas int, client client.Interface, portSpec string, servicePort int) error {
func RunController(ctx api.Context, image, name string, replicas int, client client.Interface, portSpec string, servicePort int) error {
if servicePort > 0 && !util.IsDNSLabel(name) {
return fmt.Errorf("Service creation requested, but an invalid name for a service was provided (%s). Service names must be valid DNS labels.", name)
}
@ -231,7 +231,7 @@ func RunController(image, name string, replicas int, client client.Interface, po
},
}
controllerOut, err := client.CreateReplicationController(controller)
controllerOut, err := client.CreateReplicationController(ctx, controller)
if err != nil {
return err
}
@ -242,7 +242,7 @@ func RunController(image, name string, replicas int, client client.Interface, po
fmt.Print(string(data))
if servicePort > 0 {
svc, err := createService(name, servicePort, client)
svc, err := createService(ctx, name, servicePort, client)
if err != nil {
return err
}
@ -255,7 +255,7 @@ func RunController(image, name string, replicas int, client client.Interface, po
return nil
}
func createService(name string, port int, client client.Interface) (*api.Service, error) {
func createService(ctx api.Context, name string, port int, client client.Interface) (*api.Service, error) {
svc := &api.Service{
JSONBase: api.JSONBase{ID: name},
Port: port,
@ -266,19 +266,19 @@ func createService(name string, port int, client client.Interface) (*api.Service
"simpleService": name,
},
}
svc, err := client.CreateService(svc)
svc, err := client.CreateService(ctx, svc)
return svc, err
}
// DeleteController deletes a replication controller named 'name', requires that the controller
// already be stopped.
func DeleteController(name string, client client.Interface) error {
controller, err := client.GetReplicationController(name)
func DeleteController(ctx api.Context, name string, client client.Interface) error {
controller, err := client.GetReplicationController(ctx, name)
if err != nil {
return err
}
if controller.DesiredState.Replicas != 0 {
return fmt.Errorf("controller has non-zero replicas (%d), please stop it first", controller.DesiredState.Replicas)
}
return client.DeleteReplicationController(name)
return client.DeleteReplicationController(ctx, name)
}

View File

@ -29,10 +29,10 @@ import (
// Watcher is the interface needed to receive changes to services and endpoints.
type Watcher interface {
ListServices(label labels.Selector) (*api.ServiceList, error)
ListEndpoints(label labels.Selector) (*api.EndpointsList, error)
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
ListServices(ctx api.Context, label labels.Selector) (*api.ServiceList, error)
ListEndpoints(ctx api.Context, label labels.Selector) (*api.EndpointsList, error)
WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// SourceAPI implements a configuration source for services and endpoints that
@ -72,8 +72,9 @@ func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceU
// runServices loops forever looking for changes to services.
func (s *SourceAPI) runServices(resourceVersion *uint64) {
ctx := api.NewContext()
if *resourceVersion == 0 {
services, err := s.client.ListServices(labels.Everything())
services, err := s.client.ListServices(ctx, labels.Everything())
if err != nil {
glog.Errorf("Unable to load services: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
@ -83,7 +84,7 @@ func (s *SourceAPI) runServices(resourceVersion *uint64) {
s.services <- ServiceUpdate{Op: SET, Services: services.Items}
}
watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion)
watcher, err := s.client.WatchServices(ctx, labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for services changes: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
@ -121,8 +122,9 @@ func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates
// runEndpoints loops forever looking for changes to endpoints.
func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
ctx := api.NewContext()
if *resourceVersion == 0 {
endpoints, err := s.client.ListEndpoints(labels.Everything())
endpoints, err := s.client.ListEndpoints(ctx, labels.Everything())
if err != nil {
glog.Errorf("Unable to load endpoints: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
@ -132,7 +134,7 @@ func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
s.endpoints <- EndpointsUpdate{Op: SET, Endpoints: endpoints.Items}
}
watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion)
watcher, err := s.client.WatchEndpoints(ctx, labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))

View File

@ -47,14 +47,16 @@ func NewEndpointController(serviceRegistry service.Registry, client *client.Clie
// SyncServiceEndpoints syncs service endpoints.
func (e *EndpointController) SyncServiceEndpoints() error {
services, err := e.client.ListServices(labels.Everything())
ctx := api.NewContext()
services, err := e.client.ListServices(ctx, labels.Everything())
if err != nil {
glog.Errorf("Failed to list services: %v", err)
return err
}
var resultErr error
for _, service := range services.Items {
pods, err := e.client.ListPods(labels.Set(service.Selector).AsSelector())
nsCtx := api.WithNamespace(ctx, service.Namespace)
pods, err := e.client.ListPods(nsCtx, labels.Set(service.Selector).AsSelector())
if err != nil {
glog.Errorf("Error syncing service: %#v, skipping.", service)
resultErr = err
@ -73,7 +75,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
}
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
}
currentEndpoints, err := e.client.GetEndpoints(service.ID)
currentEndpoints, err := e.client.GetEndpoints(nsCtx, service.ID)
if err != nil {
// TODO this is brittle as all get out, refactor the client libraries to return a structured error.
if strings.Contains(err.Error(), "(404)") {
@ -93,14 +95,14 @@ func (e *EndpointController) SyncServiceEndpoints() error {
if currentEndpoints.ResourceVersion == 0 {
// No previous endpoints, create them
_, err = e.client.CreateEndpoints(newEndpoints)
_, err = e.client.CreateEndpoints(nsCtx, newEndpoints)
} else {
// Pre-existing
if endpointsEqual(currentEndpoints, endpoints) {
glog.V(2).Infof("endpoints are equal for %s, skipping update", service.ID)
continue
}
_, err = e.client.UpdateEndpoints(newEndpoints)
_, err = e.client.UpdateEndpoints(nsCtx, newEndpoints)
}
if err != nil {
glog.Errorf("Error updating endpoints: %#v", err)