Added field selector for listing pods.

pull/6/head
Ravi Gadde 2015-04-20 11:53:06 -07:00
parent a45b5c3ebf
commit bf8f258471
18 changed files with 43 additions and 37 deletions

View File

@ -43,6 +43,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
@ -260,7 +261,7 @@ func podsOnMinions(c *client.Client, podNamespace string, labelSelector labels.S
podInfo := fakeKubeletClient{}
// wait for minions to indicate they have info about the desired pods
return func() (bool, error) {
pods, err := c.Pods(podNamespace).List(labelSelector)
pods, err := c.Pods(podNamespace).List(labelSelector, fields.Everything())
if err != nil {
glog.Infof("Unable to get pods to list: %v", err)
return false, nil
@ -384,7 +385,7 @@ containers:
namespace := kubelet.NamespaceDefault
if err := wait.Poll(time.Second, time.Minute*2,
podRunning(c, namespace, podName)); err != nil {
if pods, err := c.Pods(namespace).List(labels.Everything()); err == nil {
if pods, err := c.Pods(namespace).List(labels.Everything(), fields.Everything()); err == nil {
for _, pod := range pods.Items {
glog.Infof("pod found: %s/%s", namespace, pod.Name)
}

View File

@ -30,7 +30,7 @@ type PodsNamespacer interface {
// PodInterface has methods to work with Pod resources.
type PodInterface interface {
List(selector labels.Selector) (*api.PodList, error)
List(label labels.Selector, field fields.Selector) (*api.PodList, error)
Get(name string) (*api.Pod, error)
Delete(name string) error
Create(pod *api.Pod) (*api.Pod, error)
@ -54,10 +54,10 @@ func newPods(c *Client, namespace string) *pods {
}
}
// List takes a selector, and returns the list of pods that match that selector.
func (c *pods) List(selector labels.Selector) (result *api.PodList, err error) {
// List takes label and field selectors, and returns the list of pods that match those selectors.
func (c *pods) List(label labels.Selector, field fields.Selector) (result *api.PodList, err error) {
result = &api.PodList{}
err = c.r.Get().Namespace(c.ns).Resource("pods").LabelsSelectorParam(selector).Do().Into(result)
err = c.r.Get().Namespace(c.ns).Resource("pods").LabelsSelectorParam(label).FieldsSelectorParam(field).Do().Into(result)
return
}

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
@ -31,7 +32,7 @@ func TestListEmptyPods(t *testing.T) {
Request: testRequest{Method: "GET", Path: testapi.ResourcePath("pods", ns, ""), Query: buildQueryValues(ns, nil)},
Response: Response{StatusCode: 200, Body: &api.PodList{}},
}
podList, err := c.Setup().Pods(ns).List(labels.Everything())
podList, err := c.Setup().Pods(ns).List(labels.Everything(), fields.Everything())
c.Validate(t, podList, err)
}
@ -57,7 +58,7 @@ func TestListPods(t *testing.T) {
},
},
}
receivedPodList, err := c.Setup().Pods(ns).List(labels.Everything())
receivedPodList, err := c.Setup().Pods(ns).List(labels.Everything(), fields.Everything())
c.Validate(t, receivedPodList, err)
}
@ -91,7 +92,7 @@ func TestListPodsLabels(t *testing.T) {
c.Setup()
c.QueryValidator[labelSelectorQueryParamName] = validateLabels
selector := labels.Set{"foo": "bar", "name": "baz"}.AsSelector()
receivedPodList, err := c.Pods(ns).List(selector)
receivedPodList, err := c.Pods(ns).List(selector, fields.Everything())
c.Validate(t, receivedPodList, err)
}

View File

@ -30,7 +30,7 @@ type FakePods struct {
Namespace string
}
func (c *FakePods) List(selector labels.Selector) (*api.PodList, error) {
func (c *FakePods) List(label labels.Selector, field fields.Selector) (*api.PodList, error) {
obj, err := c.Fake.Invokes(FakeAction{Action: "list-pods"}, &api.PodList{})
return obj.(*api.PodList), err
}

View File

@ -652,7 +652,7 @@ func (nc *NodeController) getCloudNodesWithSpec() (*api.NodeList, error) {
func (nc *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
// TODO: We don't yet have field selectors from client, see issue #1362.
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
if err != nil {
return err
}

View File

@ -201,7 +201,7 @@ func FilterActivePods(pods []api.Pod) []*api.Pod {
func (rm *ReplicationManager) syncReplicationController(controller api.ReplicationController) error {
s := labels.Set(controller.Spec.Selector).AsSelector()
podList, err := rm.kubeClient.Pods(controller.Namespace).List(s)
podList, err := rm.kubeClient.Pods(controller.Namespace).List(s, fields.Everything())
if err != nil {
return err
}

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
@ -479,7 +480,7 @@ func (d *NodeDescriber) Describe(namespace, name string) (string, error) {
}
var pods []*api.Pod
allPods, err := d.Pods(namespace).List(labels.Everything())
allPods, err := d.Pods(namespace).List(labels.Everything(), fields.Everything())
if err != nil {
return "", err
}
@ -613,7 +614,7 @@ func printReplicationControllersByLabels(matchingRCs []api.ReplicationController
}
func getPodStatusForReplicationController(c client.PodInterface, controller *api.ReplicationController) (running, waiting, succeeded, failed int, err error) {
rcPods, err := c.List(labels.SelectorFromSet(controller.Spec.Selector))
rcPods, err := c.List(labels.SelectorFromSet(controller.Spec.Selector), fields.Everything())
if err != nil {
return
}

View File

@ -236,7 +236,7 @@ func deleteReplicationControllers(kubeClient client.Interface, ns string) error
}
func deletePods(kubeClient client.Interface, ns string) error {
items, err := kubeClient.Pods(ns).List(labels.Everything())
items, err := kubeClient.Pods(ns).List(labels.Everything(), fields.Everything())
if err != nil {
return err
}

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
@ -147,7 +148,7 @@ func (rm *ResourceQuotaManager) syncResourceQuota(quota api.ResourceQuota) (err
pods := &api.PodList{}
if set[api.ResourcePods] || set[api.ResourceMemory] || set[api.ResourceCPU] {
pods, err = rm.kubeClient.Pods(usage.Namespace).List(labels.Everything())
pods, err = rm.kubeClient.Pods(usage.Namespace).List(labels.Everything(), fields.Everything())
if err != nil {
return err
}

View File

@ -85,7 +85,7 @@ func NewEndpointController(client *client.Client) *EndpointController {
e.podStore.Store, e.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return e.client.Pods(api.NamespaceAll).List(labels.Everything())
return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)

View File

@ -35,14 +35,14 @@ import (
)
// Convenient wrapper around listing pods supporting retries.
func listPods(c *client.Client, namespace string, label labels.Selector) (*api.PodList, error) {
func listPods(c *client.Client, namespace string, label labels.Selector, field fields.Selector) (*api.PodList, error) {
maxRetries := 4
pods, err := c.Pods(namespace).List(label)
pods, err := c.Pods(namespace).List(label, field)
for i := 0; i < maxRetries; i++ {
if err == nil {
return pods, nil
}
pods, err = c.Pods(namespace).List(label)
pods, err = c.Pods(namespace).List(label, field)
}
return pods, err
}
@ -127,7 +127,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) {
By(fmt.Sprintf("Making sure all %d replicas exist", replicas))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := listPods(c, ns, label)
pods, err := listPods(c, ns, label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
current = len(pods.Items)
failCount := 5
@ -147,7 +147,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) {
last = current
time.Sleep(5 * time.Second)
pods, err = listPods(c, ns, label)
pods, err = listPods(c, ns, label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
current = len(pods.Items)
}
@ -166,7 +166,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) {
unknown := 0
time.Sleep(10 * time.Second)
currentPods, listErr := listPods(c, ns, label)
currentPods, listErr := listPods(c, ns, label, fields.Everything())
Expect(listErr).NotTo(HaveOccurred())
if len(currentPods.Items) != len(pods.Items) {
Failf("Number of reported pods changed: %d vs %d", len(currentPods.Items), len(pods.Items))

View File

@ -83,7 +83,7 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
// Wait for the Elasticsearch pods to enter the running state.
By("Checking to make sure the Elasticsearch pods are running")
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": "elasticsearch-logging"}))
pods, err := c.Pods(api.NamespaceDefault).List(label)
pods, err := c.Pods(api.NamespaceDefault).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
for _, pod := range pods.Items {
err = waitForPodRunning(c, pod.Name)

View File

@ -78,7 +78,7 @@ var _ = Describe("Events", func() {
expectNoError(waitForPodRunning(c, pod.Name))
By("verifying the pod is in kubernetes")
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything())
Expect(len(pods.Items)).To(Equal(1))
By("retrieving the pod")

View File

@ -86,7 +86,7 @@ func verifyExpectedRcsExistAndGetExpectedPods(c *client.Client) ([]string, error
return nil, fmt.Errorf("expected to find only one replica for rc %q, found %d", rc.Name, rc.Status.Replicas)
}
expectedRcs[rc.Name] = true
podList, err := c.Pods(api.NamespaceDefault).List(labels.Set(rc.Spec.Selector).AsSelector())
podList, err := c.Pods(api.NamespaceDefault).List(labels.Set(rc.Spec.Selector).AsSelector(), fields.Everything())
if err != nil {
return nil, err
}

View File

@ -174,7 +174,7 @@ var _ = Describe("Pods", func() {
}
By("setting up watch")
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything())
if err != nil {
Fail(fmt.Sprintf("Failed to query for pods: %v", err))
}
@ -196,7 +196,7 @@ var _ = Describe("Pods", func() {
}
By("verifying the pod is in kubernetes")
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything())
if err != nil {
Fail(fmt.Sprintf("Failed to query for pods: %v", err))
}
@ -214,7 +214,7 @@ var _ = Describe("Pods", func() {
By("deleting the pod")
podClient.Delete(pod.Name)
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything())
if err != nil {
Fail(fmt.Sprintf("Failed to delete pod: %v", err))
}
@ -286,7 +286,7 @@ var _ = Describe("Pods", func() {
expectNoError(waitForPodRunning(c, pod.Name))
By("verifying the pod is in kubernetes")
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything())
Expect(len(pods.Items)).To(Equal(1))
By("retrieving the pod")
@ -309,7 +309,7 @@ var _ = Describe("Pods", func() {
expectNoError(waitForPodRunning(c, pod.Name))
By("verifying the updated pod is in kubernetes")
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), fields.Everything())
Expect(len(pods.Items)).To(Equal(1))
fmt.Println("pod update OK")
})

View File

@ -22,6 +22,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -110,7 +111,7 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
// List the pods, making sure we observe all the replicas.
listTimeout := time.Minute
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := c.Pods(ns).List(label)
pods, err := c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
t := time.Now()
for {
@ -123,7 +124,7 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
name, replicas, len(pods.Items), time.Since(t).Seconds())
}
time.Sleep(5 * time.Second)
pods, err = c.Pods(ns).List(label)
pods, err = c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
}
@ -165,7 +166,7 @@ type responseChecker struct {
func (r responseChecker) checkAllResponses() (done bool, err error) {
successes := 0
currentPods, err := r.c.Pods(r.ns).List(r.label)
currentPods, err := r.c.Pods(r.ns).List(r.label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
for i, pod := range r.pods.Items {
// Check that the replica list remains unchanged, otherwise we have problems.

View File

@ -56,7 +56,7 @@ func TestClient(t *testing.T) {
t.Errorf("expected %#v, got %#v", e, a)
}
pods, err := client.Pods(ns).List(labels.Everything())
pods, err := client.Pods(ns).List(labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -94,7 +94,7 @@ func TestClient(t *testing.T) {
}
// pod is shown, but not scheduled
pods, err = client.Pods(ns).List(labels.Everything())
pods, err = client.Pods(ns).List(labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
@ -111,7 +112,7 @@ func TestApiserverMetrics(t *testing.T) {
// Make a request to the apiserver to ensure there's at least one data point
// for the metrics we're expecting -- otherwise, they won't be exported.
client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()})
if _, err := client.Pods(api.NamespaceDefault).List(labels.Everything()); err != nil {
if _, err := client.Pods(api.NamespaceDefault).List(labels.Everything(), fields.Everything()); err != nil {
t.Fatalf("unexpected error getting pods: %v", err)
}