diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index c768c29760..849e39cef1 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" @@ -31,104 +32,66 @@ import ( "k8s.io/kubernetes/pkg/labels" ) -// TODO: generate these classes and methods for all resources of interest using -// a script. Can use "go generate" once 1.4 is supported by all users. +type AppendFunc func(interface{}) -// StoreToPodLister makes a Store have the List method of the client.PodInterface -// The Store must contain (only) Pods. -// -// Example: -// s := cache.NewStore() -// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} -// r := cache.NewReflector(lw, &api.Pod{}, s).Run() -// l := StoreToPodLister{s} -// l.List() -type StoreToPodLister struct { - Indexer -} - -// Please note that selector is filtering among the pods that have gotten into -// the store; there may have been some filtering that already happened before -// that. -// We explicitly don't return api.PodList, to avoid expensive allocations, which -// in most cases are unnecessary. -func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { - for _, m := range s.Indexer.List() { - pod := m.(*api.Pod) - if selector.Matches(labels.Set(pod.Labels)) { - pods = append(pods, pod) +func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error { + for _, m := range store.List() { + metadata, err := meta.Accessor(m) + if err != nil { + return err + } + if selector.Matches(labels.Set(metadata.GetLabels())) { + appendFn(m) } } - return pods, nil + return nil } -// Pods is taking baby steps to be more like the api in pkg/client -func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { - return storePodsNamespacer{s.Indexer, namespace} -} - -type storePodsNamespacer struct { - indexer Indexer - namespace string -} - -// Please note that selector is filtering among the pods that have gotten into -// the store; there may have been some filtering that already happened before -// that. -// We explicitly don't return api.PodList, to avoid expensive allocations, which -// in most cases are unnecessary. -func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) { - if s.namespace == api.NamespaceAll { - for _, m := range s.indexer.List() { - pod := m.(*api.Pod) - if selector.Matches(labels.Set(pod.Labels)) { - pods = append(pods, pod) +func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error { + if namespace == api.NamespaceAll { + for _, m := range indexer.List() { + metadata, err := meta.Accessor(m) + if err != nil { + return err + } + if selector.Matches(labels.Set(metadata.GetLabels())) { + appendFn(m) } } - return pods, nil + return nil } - key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} - items, err := s.indexer.Index(NamespaceIndex, key) + items, err := indexer.Index(NamespaceIndex, api.ObjectMeta{Namespace: namespace}) if err != nil { // Ignore error; do slow search without index. glog.Warningf("can not retrieve list of objects using index : %v", err) - for _, m := range s.indexer.List() { - pod := m.(*api.Pod) - if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) { - pods = append(pods, pod) + for _, m := range indexer.List() { + metadata, err := meta.Accessor(m) + if err != nil { + return err } + if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) { + appendFn(m) + } + } - return pods, nil + return nil } for _, m := range items { - pod := m.(*api.Pod) - if selector.Matches(labels.Set(pod.Labels)) { - pods = append(pods, pod) + metadata, err := meta.Accessor(m) + if err != nil { + return err + } + if selector.Matches(labels.Set(metadata.GetLabels())) { + appendFn(m) } } - return pods, nil + + return nil } -func (s storePodsNamespacer) Get(name string) (*api.Pod, error) { - obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) - if err != nil { - return nil, err - } - if !exists { - return nil, errors.NewNotFound(api.Resource("pod"), name) - } - return obj.(*api.Pod), nil -} - -// Exists returns true if a pod matching the namespace/name of the given pod exists in the store. -func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) { - _, exists, err := s.Indexer.Get(pod) - if err != nil { - return false, err - } - return exists, nil -} +// TODO: generate these classes and methods for all resources of interest using +// a script. Can use "go generate" once 1.4 is supported by all users. // NodeConditionPredicate is a function that indicates whether the given node's conditions meet // some set of criteria defined by the function. diff --git a/pkg/client/cache/listers_core.go b/pkg/client/cache/listers_core.go new file mode 100644 index 0000000000..851c8834a1 --- /dev/null +++ b/pkg/client/cache/listers_core.go @@ -0,0 +1,73 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/labels" +) + +// TODO: generate these classes and methods for all resources of interest using +// a script. Can use "go generate" once 1.4 is supported by all users. + +// StoreToPodLister makes a Store have the List method of the client.PodInterface +// The Store must contain (only) Pods. +// +// Example: +// s := cache.NewStore() +// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} +// r := cache.NewReflector(lw, &api.Pod{}, s).Run() +// l := StoreToPodLister{s} +// l.List() +type StoreToPodLister struct { + Indexer Indexer +} + +func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + pods = append(pods, m.(*api.Pod)) + }) + return pods, err +} + +func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { + return storePodsNamespacer{Indexer: s.Indexer, namespace: namespace} +} + +type storePodsNamespacer struct { + Indexer Indexer + namespace string +} + +func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) { + err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) { + pods = append(pods, m.(*api.Pod)) + }) + return pods, err +} + +func (s storePodsNamespacer) Get(name string) (*api.Pod, error) { + obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(api.Resource("pod"), name) + } + return obj.(*api.Pod), nil +} diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index f592dc89c9..0c91e5ac21 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -20,6 +20,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" @@ -700,8 +701,9 @@ func TestStoreToPodLister(t *testing.T) { for _, id := range ids { store.Add(&api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: id, - Labels: map[string]string{"name": id}, + Namespace: "other", + Name: id, + Labels: map[string]string{"name": id}, }, }) } @@ -739,20 +741,13 @@ func TestStoreToPodLister(t *testing.T) { continue } - exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: id}}) + _, err = spl.Pods("other").Get(id) if err != nil { t.Errorf("unexpected error: %v", err) } - if !exists { - t.Errorf("exists returned false for %v", id) - } } - exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "qux"}}) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if exists { + if _, err := spl.Pods("").Get("qux"); !apierrors.IsNotFound(err) { t.Error("Unexpected pod exists") } } diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index d7e5a0fe8e..61cbee19b9 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -251,7 +251,7 @@ func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) { node := newNode("too-much-mem", nil) node.Status.Allocatable = allocatableResources("100M", "200m") manager.nodeStore.Add(node) - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ Spec: podSpec, }) ds := newDaemonSet("foo") @@ -266,7 +266,7 @@ func TestSufficentCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) { node := newNode("too-much-mem", nil) node.Status.Allocatable = allocatableResources("100M", "200m") manager.nodeStore.Add(node) - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ Spec: podSpec, Status: api.PodStatus{Phase: api.PodSucceeded}, }) @@ -283,7 +283,7 @@ func TestSufficentCapacityNodeDaemonLaunchesPod(t *testing.T) { node := newNode("not-too-much-mem", nil) node.Status.Allocatable = allocatableResources("200M", "200m") manager.nodeStore.Add(node) - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ Spec: podSpec, }) ds := newDaemonSet("foo") @@ -299,7 +299,7 @@ func TestDontDoAnythingIfBeingDeleted(t *testing.T) { node := newNode("not-too-much-mem", nil) node.Status.Allocatable = allocatableResources("200M", "200m") manager.nodeStore.Add(node) - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ Spec: podSpec, }) ds := newDaemonSet("foo") @@ -323,7 +323,7 @@ func TestPortConflictNodeDaemonDoesNotLaunchPod(t *testing.T) { manager, podControl := newTestController() node := newNode("port-conflict", nil) manager.nodeStore.Add(node) - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ Spec: podSpec, }) @@ -349,7 +349,7 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) { manager, podControl := newTestController() node := newNode("port-conflict", nil) manager.nodeStore.Add(node) - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ ObjectMeta: api.ObjectMeta{ Labels: simpleDaemonSetLabel, Namespace: api.NamespaceDefault, @@ -383,7 +383,7 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) { manager, podControl := newTestController() node := newNode("no-port-conflict", nil) manager.nodeStore.Add(node) - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ Spec: podSpec1, }) ds := newDaemonSet("foo") @@ -399,7 +399,7 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { manager, podControl := newTestController() manager.nodeStore.Store.Add(newNode("node1", nil)) // Create pod not controlled by a daemonset. - manager.podStore.Add(&api.Pod{ + manager.podStore.Indexer.Add(&api.Pod{ ObjectMeta: api.ObjectMeta{ Labels: map[string]string{"bang": "boom"}, Namespace: api.NamespaceDefault, diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 8c7a96435c..fd7efd6cf3 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -779,7 +779,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // has exactly one expectation at the end, to verify that we // don't double delete. for i := range podsToDelete[1:] { - manager.podStore.Delete(podsToDelete[i]) + manager.podStore.Indexer.Delete(podsToDelete[i]) manager.deletePod(podsToDelete[i]) } podExp, exists, err := manager.expectations.GetExpectations(rsKey)