change factorization of listers to make them easier to add

pull/6/head
deads2k 2016-09-16 12:53:03 -04:00
parent 0dc72b7522
commit 1bf17eb4e9
5 changed files with 129 additions and 98 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
@ -31,104 +32,66 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
) )
// TODO: generate these classes and methods for all resources of interest using type AppendFunc func(interface{})
// a script. Can use "go generate" once 1.4 is supported by all users.
// StoreToPodLister makes a Store have the List method of the client.PodInterface func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
// The Store must contain (only) Pods. for _, m := range store.List() {
// metadata, err := meta.Accessor(m)
// Example: if err != nil {
// s := cache.NewStore() return err
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} }
// r := cache.NewReflector(lw, &api.Pod{}, s).Run() if selector.Matches(labels.Set(metadata.GetLabels())) {
// l := StoreToPodLister{s} appendFn(m)
// l.List()
type StoreToPodLister struct {
Indexer
}
// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
// We explicitly don't return api.PodList, to avoid expensive allocations, which
// in most cases are unnecessary.
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
for _, m := range s.Indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
} }
} }
return pods, nil return nil
} }
// Pods is taking baby steps to be more like the api in pkg/client func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { if namespace == api.NamespaceAll {
return storePodsNamespacer{s.Indexer, namespace} for _, m := range indexer.List() {
} metadata, err := meta.Accessor(m)
if err != nil {
type storePodsNamespacer struct { return err
indexer Indexer }
namespace string if selector.Matches(labels.Set(metadata.GetLabels())) {
} appendFn(m)
// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
// We explicitly don't return api.PodList, to avoid expensive allocations, which
// in most cases are unnecessary.
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
} }
} }
return pods, nil return nil
} }
key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} items, err := indexer.Index(NamespaceIndex, api.ObjectMeta{Namespace: namespace})
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil { if err != nil {
// Ignore error; do slow search without index. // Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err) glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() { for _, m := range indexer.List() {
pod := m.(*api.Pod) metadata, err := meta.Accessor(m)
if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) { if err != nil {
pods = append(pods, pod) return err
} }
if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
}
} }
return pods, nil return nil
} }
for _, m := range items { for _, m := range items {
pod := m.(*api.Pod) metadata, err := meta.Accessor(m)
if selector.Matches(labels.Set(pod.Labels)) { if err != nil {
pods = append(pods, pod) return err
}
if selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
} }
} }
return pods, nil
return nil
} }
func (s storePodsNamespacer) Get(name string) (*api.Pod, error) { // TODO: generate these classes and methods for all resources of interest using
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) // a script. Can use "go generate" once 1.4 is supported by all users.
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("pod"), name)
}
return obj.(*api.Pod), nil
}
// Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
_, exists, err := s.Indexer.Get(pod)
if err != nil {
return false, err
}
return exists, nil
}
// NodeConditionPredicate is a function that indicates whether the given node's conditions meet // NodeConditionPredicate is a function that indicates whether the given node's conditions meet
// some set of criteria defined by the function. // some set of criteria defined by the function.

73
pkg/client/cache/listers_core.go vendored Normal file
View File

@ -0,0 +1,73 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/labels"
)
// TODO: generate these classes and methods for all resources of interest using
// a script. Can use "go generate" once 1.4 is supported by all users.
// StoreToPodLister makes a Store have the List method of the client.PodInterface
// The Store must contain (only) Pods.
//
// Example:
// s := cache.NewStore()
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
// r := cache.NewReflector(lw, &api.Pod{}, s).Run()
// l := StoreToPodLister{s}
// l.List()
type StoreToPodLister struct {
Indexer Indexer
}
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
})
return pods, err
}
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
return storePodsNamespacer{Indexer: s.Indexer, namespace: namespace}
}
type storePodsNamespacer struct {
Indexer Indexer
namespace string
}
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
})
return pods, err
}
func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("pod"), name)
}
return obj.(*api.Pod), nil
}

View File

@ -20,6 +20,7 @@ import (
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
@ -700,8 +701,9 @@ func TestStoreToPodLister(t *testing.T) {
for _, id := range ids { for _, id := range ids {
store.Add(&api.Pod{ store.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: id, Namespace: "other",
Labels: map[string]string{"name": id}, Name: id,
Labels: map[string]string{"name": id},
}, },
}) })
} }
@ -739,20 +741,13 @@ func TestStoreToPodLister(t *testing.T) {
continue continue
} }
exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: id}}) _, err = spl.Pods("other").Get(id)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if !exists {
t.Errorf("exists returned false for %v", id)
}
} }
exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "qux"}}) if _, err := spl.Pods("").Get("qux"); !apierrors.IsNotFound(err) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if exists {
t.Error("Unexpected pod exists") t.Error("Unexpected pod exists")
} }
} }

View File

@ -251,7 +251,7 @@ func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
node := newNode("too-much-mem", nil) node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m") node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec, Spec: podSpec,
}) })
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
@ -266,7 +266,7 @@ func TestSufficentCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) {
node := newNode("too-much-mem", nil) node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m") node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec, Spec: podSpec,
Status: api.PodStatus{Phase: api.PodSucceeded}, Status: api.PodStatus{Phase: api.PodSucceeded},
}) })
@ -283,7 +283,7 @@ func TestSufficentCapacityNodeDaemonLaunchesPod(t *testing.T) {
node := newNode("not-too-much-mem", nil) node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m") node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec, Spec: podSpec,
}) })
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
@ -299,7 +299,7 @@ func TestDontDoAnythingIfBeingDeleted(t *testing.T) {
node := newNode("not-too-much-mem", nil) node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m") node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec, Spec: podSpec,
}) })
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
@ -323,7 +323,7 @@ func TestPortConflictNodeDaemonDoesNotLaunchPod(t *testing.T) {
manager, podControl := newTestController() manager, podControl := newTestController()
node := newNode("port-conflict", nil) node := newNode("port-conflict", nil)
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec, Spec: podSpec,
}) })
@ -349,7 +349,7 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
manager, podControl := newTestController() manager, podControl := newTestController()
node := newNode("port-conflict", nil) node := newNode("port-conflict", nil)
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Labels: simpleDaemonSetLabel, Labels: simpleDaemonSetLabel,
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,
@ -383,7 +383,7 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) {
manager, podControl := newTestController() manager, podControl := newTestController()
node := newNode("no-port-conflict", nil) node := newNode("no-port-conflict", nil)
manager.nodeStore.Add(node) manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec1, Spec: podSpec1,
}) })
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
@ -399,7 +399,7 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
manager, podControl := newTestController() manager, podControl := newTestController()
manager.nodeStore.Store.Add(newNode("node1", nil)) manager.nodeStore.Store.Add(newNode("node1", nil))
// Create pod not controlled by a daemonset. // Create pod not controlled by a daemonset.
manager.podStore.Add(&api.Pod{ manager.podStore.Indexer.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"bang": "boom"}, Labels: map[string]string{"bang": "boom"},
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,

View File

@ -779,7 +779,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// has exactly one expectation at the end, to verify that we // has exactly one expectation at the end, to verify that we
// don't double delete. // don't double delete.
for i := range podsToDelete[1:] { for i := range podsToDelete[1:] {
manager.podStore.Delete(podsToDelete[i]) manager.podStore.Indexer.Delete(podsToDelete[i])
manager.deletePod(podsToDelete[i]) manager.deletePod(podsToDelete[i])
} }
podExp, exists, err := manager.expectations.GetExpectations(rsKey) podExp, exists, err := manager.expectations.GetExpectations(rsKey)