Merge pull request #5235 from jszczepkowski/pods-watch

Watch support in PodInterface.
pull/6/head
Daniel Smith 2015-03-10 11:59:04 -07:00
commit e1f64b2901
3 changed files with 66 additions and 3 deletions

View File

@ -19,6 +19,7 @@ package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// FakePods implements PodsInterface. Meant to be embedded into a struct to get a default
@ -53,6 +54,11 @@ func (c *FakePods) Update(pod *api.Pod) (*api.Pod, error) {
return &api.Pod{}, nil
}
func (c *FakePods) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-pods", Value: resourceVersion})
return c.Fake.Watch, c.Fake.Err
}
func (c *FakePods) Bind(bind *api.Binding) error {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name})
return nil

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// PodsNamespacer has methods to work with Pod resources in a namespace
@ -36,7 +37,7 @@ type PodInterface interface {
Delete(name string) error
Create(pod *api.Pod) (*api.Pod, error)
Update(pod *api.Pod) (*api.Pod, error)
Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
Bind(binding *api.Binding) error
}
@ -95,6 +96,18 @@ func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) {
return
}
// Watch returns a watch.Interface that watches the requested pods.
func (c *pods) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("pods").
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}
// Bind applies the provided binding to the named pod in the current namespace (binding.Namespace is ignored).
func (c *pods) Bind(binding *api.Binding) error {
return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -122,27 +123,70 @@ var _ = Describe("Pods", func() {
},
}
By("setting up watch")
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
if err != nil {
Fail(fmt.Sprintf("Failed to query for pods: %v", err))
}
Expect(len(pods.Items)).To(Equal(0))
w, err := podClient.Watch(
labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), labels.Everything(), pods.ListMeta.ResourceVersion)
if err != nil {
Fail(fmt.Sprintf("Failed to set up watch: %v", err))
}
By("submitting the pod to kubernetes")
// We call defer here in case there is a problem with
// the test so we can ensure that we clean up after
// ourselves
defer podClient.Delete(pod.Name)
_, err := podClient.Create(pod)
_, err = podClient.Create(pod)
if err != nil {
Fail(fmt.Sprintf("Failed to create pod: %v", err))
}
By("verifying the pod is in kubernetes")
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
if err != nil {
Fail(fmt.Sprintf("Failed to query for pods: %v", err))
}
Expect(len(pods.Items)).To(Equal(1))
By("veryfying pod creation was observed")
select {
case event, _ := <-w.ResultChan():
if event.Type != watch.Added {
Fail(fmt.Sprintf("Failed to observe pod creation: %v", event))
}
case <-time.After(podStartTimeout):
Fail("Timeout while waiting for pod creation")
}
By("deleting the pod")
podClient.Delete(pod.Name)
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
if err != nil {
Fail(fmt.Sprintf("Failed to delete pod: %v", err))
}
Expect(len(pods.Items)).To(Equal(0))
By("veryfying pod deletion was observed")
deleted := false
timeout := false
timer := time.After(podStartTimeout)
for !deleted && !timeout {
select {
case event, _ := <-w.ResultChan():
if event.Type == watch.Deleted {
deleted = true
}
case <-timer:
timeout = true
}
}
if !deleted {
Fail("Failed to observe pod deletion")
}
})
It("should be updated", func() {