mirror of https://github.com/k3s-io/k3s
fix up service lister
parent
185a7adf84
commit
16fbb47189
|
@ -55,7 +55,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
|
|||
client: client,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
|
||||
}
|
||||
e.serviceStore.Store, e.serviceController = cache.NewInformer(
|
||||
e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return e.client.Core().Services(api.NamespaceAll).List(options)
|
||||
|
@ -73,6 +73,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
|
|||
},
|
||||
DeleteFunc: e.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
e.podStore.Indexer, e.podController = cache.NewIndexerInformer(
|
||||
|
@ -262,7 +263,7 @@ func (e *endpointController) syncService(key string) error {
|
|||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
obj, exists, err := e.serviceStore.Store.GetByKey(key)
|
||||
obj, exists, err := e.serviceStore.Indexer.GetByKey(key)
|
||||
if err != nil || !exists {
|
||||
// Delete the corresponding endpoint, as the service has been deleted.
|
||||
// TODO: Please note that this will delete an endpoint when a
|
||||
|
|
|
@ -114,7 +114,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
|
|||
},
|
||||
)
|
||||
|
||||
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = cache.NewInformer(
|
||||
cachedClusterClient.serviceStore.Indexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return clientset.Core().Services(v1.NamespaceAll).List(options)
|
||||
|
@ -149,6 +149,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
|
|||
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
|
||||
},
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
cc.clientMap[clusterName] = cachedClusterClient
|
||||
go cachedClusterClient.serviceController.Run(wait.NeverStop)
|
||||
|
|
|
@ -63,7 +63,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
|
|||
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
|
||||
return nil
|
||||
}
|
||||
serviceInterface, exists, err := clusterCache.serviceStore.GetByKey(key)
|
||||
serviceInterface, exists, err := clusterCache.serviceStore.Indexer.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
|
||||
clusterCache.serviceQueue.Add(key)
|
||||
|
|
|
@ -144,7 +144,7 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
|
|||
queue: workqueue.New(),
|
||||
knownClusterSet: make(sets.String),
|
||||
}
|
||||
s.serviceStore.Store, s.serviceController = cache.NewInformer(
|
||||
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return s.federationClient.Core().Services(v1.NamespaceAll).List(options)
|
||||
|
@ -165,6 +165,7 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
|
|||
},
|
||||
DeleteFunc: s.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
s.clusterStore.Store, s.clusterController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
|
@ -816,7 +817,7 @@ func (s *ServiceController) syncService(key string) error {
|
|||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
// obj holds the latest service info from apiserver
|
||||
obj, exists, err := s.serviceStore.Store.GetByKey(key)
|
||||
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
|
||||
s.queue.Add(key)
|
||||
|
|
|
@ -409,47 +409,6 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex
|
|||
return
|
||||
}
|
||||
|
||||
// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface
|
||||
// The Store must contain (only) Services.
|
||||
type StoreToServiceLister struct {
|
||||
Store
|
||||
}
|
||||
|
||||
func (s *StoreToServiceLister) List() (services api.ServiceList, err error) {
|
||||
for _, m := range s.Store.List() {
|
||||
services.Items = append(services.Items, *(m.(*api.Service)))
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// TODO: Move this back to scheduler as a helper function that takes a Store,
|
||||
// rather than a method of StoreToServiceLister.
|
||||
func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) {
|
||||
var selector labels.Selector
|
||||
var service api.Service
|
||||
|
||||
for _, m := range s.Store.List() {
|
||||
service = *m.(*api.Service)
|
||||
// consider only services that are in the same namespace as the pod
|
||||
if service.Namespace != pod.Namespace {
|
||||
continue
|
||||
}
|
||||
if service.Spec.Selector == nil {
|
||||
// services with nil selectors match nothing, not everything.
|
||||
continue
|
||||
}
|
||||
selector = labels.Set(service.Spec.Selector).AsSelectorPreValidated()
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
if len(services) == 0 {
|
||||
err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// StoreToEndpointsLister makes a Store that lists endpoints.
|
||||
type StoreToEndpointsLister struct {
|
||||
Store
|
||||
|
|
|
@ -141,3 +141,65 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StoreToServiceLister helps list services
|
||||
type StoreToServiceLister struct {
|
||||
Indexer Indexer
|
||||
}
|
||||
|
||||
func (s *StoreToServiceLister) List(selector labels.Selector) (ret []*api.Service, err error) {
|
||||
err = ListAll(s.Indexer, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.Service))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (s *StoreToServiceLister) Services(namespace string) storeServicesNamespacer {
|
||||
return storeServicesNamespacer{s.Indexer, namespace}
|
||||
}
|
||||
|
||||
type storeServicesNamespacer struct {
|
||||
indexer Indexer
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (s storeServicesNamespacer) List(selector labels.Selector) (ret []*api.Service, err error) {
|
||||
err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.Service))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (s storeServicesNamespacer) Get(name string) (*api.Service, error) {
|
||||
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.NewNotFound(api.Resource("service"), name)
|
||||
}
|
||||
return obj.(*api.Service), nil
|
||||
}
|
||||
|
||||
// TODO: Move this back to scheduler as a helper function that takes a Store,
|
||||
// rather than a method of StoreToServiceLister.
|
||||
func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
|
||||
allServices, err := s.Services(pod.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range allServices {
|
||||
service := allServices[i]
|
||||
if service.Spec.Selector == nil {
|
||||
// services with nil selectors match nothing, not everything.
|
||||
continue
|
||||
}
|
||||
selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
|
|
@ -754,7 +754,7 @@ func TestStoreToPodLister(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStoreToServiceLister(t *testing.T) {
|
||||
store := NewStore(MetaNamespaceKeyFunc)
|
||||
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
|
||||
store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
Spec: api.ServiceSpec{
|
||||
|
|
|
@ -79,7 +79,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client
|
|||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
|
||||
}
|
||||
|
||||
e.serviceStore.Store, e.serviceController = cache.NewInformer(
|
||||
e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return e.client.Core().Services(api.NamespaceAll).List(options)
|
||||
|
@ -98,6 +98,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client
|
|||
},
|
||||
DeleteFunc: e.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
@ -334,7 +335,7 @@ func (e *EndpointController) syncService(key string) error {
|
|||
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
obj, exists, err := e.serviceStore.Store.GetByKey(key)
|
||||
obj, exists, err := e.serviceStore.Indexer.GetByKey(key)
|
||||
if err != nil || !exists {
|
||||
// Delete the corresponding endpoint, as the service has been deleted.
|
||||
// TODO: Please note that this will delete an endpoint when a
|
||||
|
|
|
@ -110,7 +110,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
|||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
|
||||
})
|
||||
|
@ -174,7 +174,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
|||
endpoints.podStoreSynced = alwaysReady
|
||||
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
|
@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
|||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
|
@ -255,7 +255,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
|||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
|
@ -293,7 +293,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
|
|||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 0, 1, 1)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
|
@ -331,7 +331,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
|
|||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 1)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
|
@ -373,7 +373,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
|||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
|
@ -414,7 +414,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
|
|||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0)
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
|
@ -435,7 +435,7 @@ func TestSyncEndpointsItems(t *testing.T) {
|
|||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
|
||||
addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found!
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
|
@ -478,7 +478,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
|
|||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
|
||||
serviceLabels := map[string]string{"foo": "bar"}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
|
@ -539,7 +539,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
|
|||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
serviceLabels := map[string]string{"baz": "blah"}
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
|
|
|
@ -119,7 +119,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
|||
},
|
||||
workingQueue: workqueue.NewDelayingQueue(),
|
||||
}
|
||||
s.serviceStore.Store, s.serviceController = cache.NewInformer(
|
||||
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
|
||||
|
@ -141,6 +141,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
|||
},
|
||||
DeleteFunc: s.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
if err := s.init(); err != nil {
|
||||
return nil, err
|
||||
|
@ -724,7 +725,7 @@ func (s *ServiceController) syncService(key string) error {
|
|||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
// obj holds the latest service info from apiserver
|
||||
obj, exists, err := s.serviceStore.Store.GetByKey(key)
|
||||
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
|
||||
s.workingQueue.Add(key)
|
||||
|
|
|
@ -27,10 +27,10 @@ import (
|
|||
// FromServices builds environment variables that a container is started with,
|
||||
// which tell the container where to find the services it may need, which are
|
||||
// provided as an argument.
|
||||
func FromServices(services *api.ServiceList) []api.EnvVar {
|
||||
func FromServices(services []*api.Service) []api.EnvVar {
|
||||
var result []api.EnvVar
|
||||
for i := range services.Items {
|
||||
service := &services.Items[i]
|
||||
for i := range services {
|
||||
service := services[i]
|
||||
|
||||
// ignore services where ClusterIP is "None" or empty
|
||||
// the services passed to this method should be pre-filtered
|
||||
|
|
|
@ -25,63 +25,61 @@ import (
|
|||
)
|
||||
|
||||
func TestFromServices(t *testing.T) {
|
||||
sl := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo-bar"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "1.2.3.4",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8080, Protocol: "TCP"},
|
||||
},
|
||||
sl := []*api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo-bar"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "1.2.3.4",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8080, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "abc-123"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "5.6.7.8",
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "u-d-p", Port: 8081, Protocol: "UDP"},
|
||||
{Name: "t-c-p", Port: 8081, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "abc-123"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "5.6.7.8",
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "u-d-p", Port: 8081, Protocol: "UDP"},
|
||||
{Name: "t-c-p", Port: 8081, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "9.8.7.6",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
{Name: "8083", Port: 8083, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "9.8.7.6",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
{Name: "8083", Port: 8083, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "None",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "None",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
vars := envvars.FromServices(&sl)
|
||||
vars := envvars.FromServices(sl)
|
||||
expected := []api.EnvVar{
|
||||
{Name: "FOO_BAR_SERVICE_HOST", Value: "1.2.3.4"},
|
||||
{Name: "FOO_BAR_SERVICE_PORT", Value: "8080"},
|
||||
|
|
|
@ -76,6 +76,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/security/apparmor"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
|
@ -370,7 +371,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||
dockerExecHandler = &dockertools.NativeExecHandler{}
|
||||
}
|
||||
|
||||
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
if kubeClient != nil {
|
||||
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
|
||||
// than an interface. There is no way to construct a list+watcher using resource name.
|
||||
|
@ -384,7 +385,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||
}
|
||||
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
|
||||
}
|
||||
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
|
||||
serviceLister := &cache.StoreToServiceLister{Indexer: serviceStore}
|
||||
|
||||
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
if kubeClient != nil {
|
||||
|
@ -751,7 +752,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||
}
|
||||
|
||||
type serviceLister interface {
|
||||
List() (api.ServiceList, error)
|
||||
List(labels.Selector) ([]*api.Service, error)
|
||||
}
|
||||
|
||||
type nodeLister interface {
|
||||
|
@ -1421,7 +1422,7 @@ var masterServices = sets.NewString("kubernetes")
|
|||
// pod in namespace ns should see.
|
||||
func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
|
||||
var (
|
||||
serviceMap = make(map[string]api.Service)
|
||||
serviceMap = make(map[string]*api.Service)
|
||||
m = make(map[string]string)
|
||||
)
|
||||
|
||||
|
@ -1431,15 +1432,16 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
|
|||
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
|
||||
return m, nil
|
||||
}
|
||||
services, err := kl.serviceLister.List()
|
||||
services, err := kl.serviceLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("failed to list services when setting up env vars.")
|
||||
}
|
||||
|
||||
// project the services in namespace ns onto the master services
|
||||
for _, service := range services.Items {
|
||||
for i := range services {
|
||||
service := services[i]
|
||||
// ignore services where ClusterIP is "None" or empty
|
||||
if !api.IsServiceIPSet(&service) {
|
||||
if !api.IsServiceIPSet(service) {
|
||||
continue
|
||||
}
|
||||
serviceName := service.Name
|
||||
|
@ -1459,12 +1461,13 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
services.Items = []api.Service{}
|
||||
for _, service := range serviceMap {
|
||||
services.Items = append(services.Items, service)
|
||||
|
||||
mappedServices := []*api.Service{}
|
||||
for key := range serviceMap {
|
||||
mappedServices = append(mappedServices, serviceMap[key])
|
||||
}
|
||||
|
||||
for _, e := range envvars.FromServices(&services) {
|
||||
for _, e := range envvars.FromServices(mappedServices) {
|
||||
m[e.Name] = e.Value
|
||||
}
|
||||
return m, nil
|
||||
|
|
|
@ -61,6 +61,7 @@ import (
|
|||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
||||
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
|
@ -896,13 +897,11 @@ func TestDNSConfigurationParams(t *testing.T) {
|
|||
}
|
||||
|
||||
type testServiceLister struct {
|
||||
services []api.Service
|
||||
services []*api.Service
|
||||
}
|
||||
|
||||
func (ls testServiceLister) List() (api.ServiceList, error) {
|
||||
return api.ServiceList{
|
||||
Items: ls.services,
|
||||
}, nil
|
||||
func (ls testServiceLister) List(labels.Selector) ([]*api.Service, error) {
|
||||
return ls.services, nil
|
||||
}
|
||||
|
||||
type testNodeLister struct {
|
||||
|
@ -938,8 +937,8 @@ func (e envs) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
|
|||
|
||||
func (e envs) Less(i, j int) bool { return e[i].Name < e[j].Name }
|
||||
|
||||
func buildService(name, namespace, clusterIP, protocol string, port int) api.Service {
|
||||
return api.Service{
|
||||
func buildService(name, namespace, clusterIP, protocol string, port int) *api.Service {
|
||||
return &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: name, Namespace: namespace},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{{
|
||||
|
@ -952,7 +951,7 @@ func buildService(name, namespace, clusterIP, protocol string, port int) api.Ser
|
|||
}
|
||||
|
||||
func TestMakeEnvironmentVariables(t *testing.T) {
|
||||
services := []api.Service{
|
||||
services := []*api.Service{
|
||||
buildService("kubernetes", api.NamespaceDefault, "1.2.3.1", "TCP", 8081),
|
||||
buildService("test", "test1", "1.2.3.3", "TCP", 8083),
|
||||
buildService("kubernetes", "test2", "1.2.3.4", "TCP", 8084),
|
||||
|
|
|
@ -63,24 +63,25 @@ func (f FakePodLister) List(s labels.Selector) (selected []*api.Pod, err error)
|
|||
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.
|
||||
type ServiceLister interface {
|
||||
// Lists all the services
|
||||
List() (api.ServiceList, error)
|
||||
List(labels.Selector) ([]*api.Service, error)
|
||||
// Gets the services for the given pod
|
||||
GetPodServices(*api.Pod) ([]api.Service, error)
|
||||
GetPodServices(*api.Pod) ([]*api.Service, error)
|
||||
}
|
||||
|
||||
// FakeServiceLister implements ServiceLister on []api.Service for test purposes.
|
||||
type FakeServiceLister []api.Service
|
||||
type FakeServiceLister []*api.Service
|
||||
|
||||
// List returns api.ServiceList, the list of all services.
|
||||
func (f FakeServiceLister) List() (api.ServiceList, error) {
|
||||
return api.ServiceList{Items: f}, nil
|
||||
func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// GetPodServices gets the services that have the selector that match the labels on the given pod
|
||||
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) {
|
||||
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
|
||||
var selector labels.Selector
|
||||
|
||||
for _, service := range f {
|
||||
for i := range f {
|
||||
service := f[i]
|
||||
// consider only services that are in the same namespace as the pod
|
||||
if service.Namespace != pod.Namespace {
|
||||
continue
|
||||
|
|
|
@ -670,7 +670,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
|
|||
// skip looking at other pods in the service if the current pod defines all the required affinity labels
|
||||
if !labelsExist {
|
||||
services, err := s.serviceLister.GetPodServices(pod)
|
||||
if err == nil {
|
||||
if err == nil && len(services) > 0 {
|
||||
// just use the first service and get the other pods within the service
|
||||
// TODO: a separate predicate can be created that tries to handle all services for the pod
|
||||
selector := labels.SelectorFromSet(services[0].Spec.Selector)
|
||||
|
|
|
@ -1209,7 +1209,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
tests := []struct {
|
||||
pod *api.Pod
|
||||
pods []*api.Pod
|
||||
services []api.Service
|
||||
services []*api.Service
|
||||
node *api.Node
|
||||
labels []string
|
||||
fits bool
|
||||
|
@ -1240,7 +1240,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "service pod on same node",
|
||||
|
@ -1249,7 +1249,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "service pod on different node, region match",
|
||||
|
@ -1258,7 +1258,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: false,
|
||||
labels: []string{"region"},
|
||||
test: "service pod on different node, region mismatch",
|
||||
|
@ -1267,7 +1267,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
|
||||
node: &node1,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "service in different namespace, region mismatch",
|
||||
|
@ -1276,7 +1276,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns2"}}},
|
||||
node: &node1,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "pod in different namespace, region mismatch",
|
||||
|
@ -1285,7 +1285,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
|
||||
node: &node1,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
fits: false,
|
||||
labels: []string{"region"},
|
||||
test: "service and pod in same namespace, region mismatch",
|
||||
|
@ -1294,7 +1294,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: false,
|
||||
labels: []string{"region", "zone"},
|
||||
test: "service pod on different node, multiple labels, not all match",
|
||||
|
@ -1303,7 +1303,7 @@ func TestServiceAffinity(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node4,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: true,
|
||||
labels: []string{"region", "zone"},
|
||||
test: "service pod on different node, multiple labels, all match",
|
||||
|
|
|
@ -194,7 +194,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister
|
|||
// The label to be considered is provided to the struct (ServiceAntiAffinity).
|
||||
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
|
||||
var nsServicePods []*api.Pod
|
||||
if services, err := s.serviceLister.GetPodServices(pod); err == nil {
|
||||
if services, err := s.serviceLister.GetPodServices(pod); err == nil && len(services) > 0 {
|
||||
// just use the first service and get the other pods within the service
|
||||
// TODO: a separate predicate can be created that tries to handle all services for the pod
|
||||
selector := labels.SelectorFromSet(services[0].Spec.Selector)
|
||||
|
|
|
@ -59,7 +59,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
nodes []string
|
||||
rcs []*api.ReplicationController
|
||||
rss []extensions.ReplicaSet
|
||||
services []api.Service
|
||||
services []*api.Service
|
||||
expectedList schedulerapi.HostPriorityList
|
||||
test string
|
||||
}{
|
||||
|
@ -80,7 +80,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 10}},
|
||||
test: "different services",
|
||||
},
|
||||
|
@ -91,7 +91,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "two pods, one service pod",
|
||||
},
|
||||
|
@ -105,7 +105,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "five pods, one service pod in no namespace",
|
||||
},
|
||||
|
@ -118,7 +118,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "four pods, one service pod in default namespace",
|
||||
},
|
||||
|
@ -132,7 +132,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "five pods, one service pod in specific namespace",
|
||||
},
|
||||
|
@ -144,7 +144,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
|
||||
test: "three pods, two service pods on different machines",
|
||||
},
|
||||
|
@ -157,7 +157,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 5}, {Host: "machine2", Score: 0}},
|
||||
test: "four pods, three service pods",
|
||||
},
|
||||
|
@ -169,7 +169,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
test: "service with partial pod label matches",
|
||||
},
|
||||
|
@ -181,8 +181,6 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
// "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to
|
||||
|
@ -198,7 +196,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
|
||||
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
|
@ -212,8 +210,6 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
// Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above.
|
||||
|
@ -228,7 +224,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
|
||||
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
|
@ -350,7 +346,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
|||
nodes []string
|
||||
rcs []*api.ReplicationController
|
||||
rss []extensions.ReplicaSet
|
||||
services []api.Service
|
||||
services []*api.Service
|
||||
expectedList schedulerapi.HostPriorityList
|
||||
test string
|
||||
}{
|
||||
|
@ -382,7 +378,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
|||
{
|
||||
pod: buildPod("", labels1, nil),
|
||||
pods: []*api.Pod{buildPod(nodeMachine1Zone1, labels2, nil)},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 10},
|
||||
{Host: nodeMachine1Zone2, Score: 10},
|
||||
|
@ -399,7 +395,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
|||
buildPod(nodeMachine1Zone1, labels2, nil),
|
||||
buildPod(nodeMachine1Zone2, labels1, nil),
|
||||
},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 10},
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Already have pod on machine
|
||||
|
@ -419,7 +415,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
|||
buildPod(nodeMachine1Zone3, labels2, nil),
|
||||
buildPod(nodeMachine2Zone3, labels1, nil),
|
||||
},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 10},
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
|
||||
|
@ -438,7 +434,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
|||
buildPod(nodeMachine2Zone2, labels2, nil),
|
||||
buildPod(nodeMachine1Zone3, labels1, nil),
|
||||
},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 0}, // Pod on node
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
|
||||
|
@ -457,7 +453,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
|||
buildPod(nodeMachine1Zone3, labels1, nil),
|
||||
buildPod(nodeMachine2Zone2, labels2, nil),
|
||||
},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 0}, // Pod on node
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
|
||||
|
@ -552,7 +548,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
pod *api.Pod
|
||||
pods []*api.Pod
|
||||
nodes map[string]map[string]string
|
||||
services []api.Service
|
||||
services []*api.Service
|
||||
expectedList schedulerapi.HostPriorityList
|
||||
test string
|
||||
}{
|
||||
|
@ -577,7 +573,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
|
||||
nodes: labeledNodes,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10},
|
||||
{Host: "machine21", Score: 10}, {Host: "machine22", Score: 10},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
@ -591,7 +587,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10},
|
||||
{Host: "machine21", Score: 0}, {Host: "machine22", Score: 0},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
@ -605,7 +601,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 5}, {Host: "machine12", Score: 5},
|
||||
{Host: "machine21", Score: 5}, {Host: "machine22", Score: 5},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
@ -620,7 +616,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 0}, {Host: "machine12", Score: 0},
|
||||
{Host: "machine21", Score: 10}, {Host: "machine22", Score: 10},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
@ -635,7 +631,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 6}, {Host: "machine12", Score: 6},
|
||||
{Host: "machine21", Score: 3}, {Host: "machine22", Score: 3},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
@ -649,7 +645,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 3}, {Host: "machine12", Score: 3},
|
||||
{Host: "machine21", Score: 6}, {Host: "machine22", Score: 6},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
@ -664,7 +660,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
|||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 7}, {Host: "machine12", Score: 7},
|
||||
{Host: "machine21", Score: 5}, {Host: "machine22", Score: 5},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
|
|
@ -108,7 +108,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini
|
|||
NodeLister: &cache.StoreToNodeLister{},
|
||||
PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
||||
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
||||
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
schedulerCache: schedulerCache,
|
||||
|
@ -401,7 +401,7 @@ func (f *ConfigFactory) Run() {
|
|||
// Watch and cache all service objects. Scheduler needs to find all pods
|
||||
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
|
||||
// Cache this locally.
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything)
|
||||
|
||||
// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
|
||||
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
|
||||
|
|
Loading…
Reference in New Issue