add ReplicaSet support in scheduler

pull/6/head
mqliang 2016-02-16 00:13:38 +08:00
parent 15893a48d9
commit 0aab44a00d
8 changed files with 167 additions and 12 deletions

View File

@ -20,6 +20,8 @@ import (
"fmt" "fmt"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
) )
@ -140,3 +142,56 @@ func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api
return return
} }
// ReplicaSetLister interface represents anything that can produce a list of ReplicaSet; the list is consumed by a scheduler.
type ReplicaSetLister interface {
// Lists all the replicasets
List() ([]extensions.ReplicaSet, error)
// Gets the replicasets for the given pod
GetPodReplicaSets(*api.Pod) ([]extensions.ReplicaSet, error)
}
// EmptyReplicaSetLister implements ReplicaSetLister on []extensions.ReplicaSet returning empty data
type EmptyReplicaSetLister struct{}
// List returns nil
func (f EmptyReplicaSetLister) List() ([]extensions.ReplicaSet, error) {
return nil, nil
}
// GetPodReplicaSets returns nil
func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
return nil, nil
}
// FakeReplicaSetLister implements ControllerLister on []extensions.ReplicaSet for test purposes.
type FakeReplicaSetLister []extensions.ReplicaSet
// List returns []extensions.ReplicaSet, the list of all ReplicaSets.
func (f FakeReplicaSetLister) List() ([]extensions.ReplicaSet, error) {
return f, nil
}
// GetPodReplicaSets gets the ReplicaSets that have the selector that match the labels on the given pod
func (f FakeReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
var selector labels.Selector
for _, rs := range f {
if rs.Namespace != pod.Namespace {
continue
}
selector, err = unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return
}
if selector.Matches(labels.Set(pod.Labels)) {
rss = append(rss, rs)
}
}
if len(rss) == 0 {
err = fmt.Errorf("Could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
@ -141,7 +142,7 @@ func TestZeroRequest(t *testing.T) {
// This should match the configuration in defaultPriorities() in // This should match the configuration in defaultPriorities() in
// plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production. // to test what's actually in production.
[]algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}}, []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{}), algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})), Weight: 1}},
algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}), []algorithm.SchedulerExtender{}) algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}), []algorithm.SchedulerExtender{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)

View File

@ -38,13 +38,15 @@ type SelectorSpread struct {
podLister algorithm.PodLister podLister algorithm.PodLister
serviceLister algorithm.ServiceLister serviceLister algorithm.ServiceLister
controllerLister algorithm.ControllerLister controllerLister algorithm.ControllerLister
replicaSetLister algorithm.ReplicaSetLister
} }
func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction { func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister) algorithm.PriorityFunction {
selectorSpread := &SelectorSpread{ selectorSpread := &SelectorSpread{
podLister: podLister, podLister: podLister,
serviceLister: serviceLister, serviceLister: serviceLister,
controllerLister: controllerLister, controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
} }
return selectorSpread.CalculateSpreadPriority return selectorSpread.CalculateSpreadPriority
} }
@ -86,10 +88,18 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
} }
} }
controllers, err := s.controllerLister.GetPodControllers(pod) rcs, err := s.controllerLister.GetPodControllers(pod)
if err == nil { if err == nil {
for _, controller := range controllers { for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(controller.Spec.Selector)) selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
}
}
rss, err := s.replicaSetLister.GetPodReplicaSets(pod)
if err == nil {
for _, rs := range rss {
if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
selectors = append(selectors, selector)
}
} }
} }

View File

@ -22,7 +22,9 @@ import (
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
wellknownlabels "k8s.io/kubernetes/pkg/api/unversioned" wellknownlabels "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -48,6 +50,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
pods []*api.Pod pods []*api.Pod
nodes []string nodes []string
rcs []api.ReplicationController rcs []api.ReplicationController
rss []extensions.ReplicaSet
services []api.Service services []api.Service
expectedList schedulerapi.HostPriorityList expectedList schedulerapi.HostPriorityList
test string test string
@ -177,6 +180,20 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "service with partial pod label matches with service and replication controller", test: "service with partial pod label matches with service and replication controller",
}, },
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "service with partial pod label matches with service and replication controller",
},
{ {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}},
pods: []*api.Pod{ pods: []*api.Pod{
@ -191,6 +208,20 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "disjoined service and replication controller should be treated equally", test: "disjoined service and replication controller should be treated equally",
}, },
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "disjoined service and replication controller should be treated equally",
},
{ {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{ pods: []*api.Pod{
@ -204,6 +235,19 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}},
test: "Replication controller with partial pod label matches", test: "Replication controller with partial pod label matches",
}, },
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}},
test: "Replication controller with partial pod label matches",
},
{ {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{ pods: []*api.Pod{
@ -216,11 +260,24 @@ func TestSelectorSpreadPriority(t *testing.T) {
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "Replication controller with partial pod label matches", test: "Replication controller with partial pod label matches",
}, },
{
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "Replication controller with partial pod label matches",
},
} }
for _, test := range tests { for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeNodeList(test.nodes))) list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -273,6 +330,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
pods []*api.Pod pods []*api.Pod
nodes []string nodes []string
rcs []api.ReplicationController rcs []api.ReplicationController
rss []extensions.ReplicaSet
services []api.Service services []api.Service
expectedList schedulerapi.HostPriorityList expectedList schedulerapi.HostPriorityList
test string test string
@ -420,7 +478,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
for _, test := range tests { for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes))) list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes)))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)

View File

@ -20,10 +20,14 @@ import (
"reflect" "reflect"
"testing" "testing"
"k8s.io/kubernetes/pkg/api/testapi"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory" "k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"net/http/httptest"
) )
func TestCompatibility_v1_Scheduler(t *testing.T) { func TestCompatibility_v1_Scheduler(t *testing.T) {
@ -100,7 +104,18 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
if !reflect.DeepEqual(policy, tc.ExpectedPolicy) { if !reflect.DeepEqual(policy, tc.ExpectedPolicy) {
t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy) t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy)
} }
if _, err := factory.NewConfigFactory(nil, "some-scheduler-name").CreateFromConfig(policy); err != nil {
handler := utiltesting.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
// TODO: Uncomment when fix #19254
// defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
if _, err := factory.NewConfigFactory(client, "some-scheduler-name").CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err) t.Errorf("%s: Error constructing: %v", v, err)
continue continue
} }

View File

@ -68,7 +68,7 @@ func init() {
"ServiceSpreadingPriority", "ServiceSpreadingPriority",
factory.PriorityConfigFactory{ factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{}) return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{})
}, },
Weight: 1, Weight: 1,
}, },
@ -141,7 +141,7 @@ func defaultPriorities() sets.String {
"SelectorSpreadPriority", "SelectorSpreadPriority",
factory.PriorityConfigFactory{ factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister) return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister, args.ReplicaSetLister)
}, },
Weight: 1, Weight: 1,
}, },

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/api/validation" "k8s.io/kubernetes/plugin/pkg/scheduler/api/validation"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/extensions"
) )
const ( const (
@ -66,6 +67,8 @@ type ConfigFactory struct {
ServiceLister *cache.StoreToServiceLister ServiceLister *cache.StoreToServiceLister
// a means to list all controllers // a means to list all controllers
ControllerLister *cache.StoreToReplicationControllerLister ControllerLister *cache.StoreToReplicationControllerLister
// a means to list all replicasets
ReplicaSetLister *cache.StoreToReplicaSetLister
// Close this to stop all reflectors // Close this to stop all reflectors
StopEverything chan struct{} StopEverything chan struct{}
@ -91,6 +94,7 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}), StopEverything: make(chan struct{}),
SchedulerName: schedulerName, SchedulerName: schedulerName,
} }
@ -188,6 +192,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
PodLister: f.PodLister, PodLister: f.PodLister,
ServiceLister: f.ServiceLister, ServiceLister: f.ServiceLister,
ControllerLister: f.ControllerLister, ControllerLister: f.ControllerLister,
ReplicaSetLister: f.ReplicaSetLister,
// All fit predicates only need to consider schedulable nodes. // All fit predicates only need to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: &predicates.CachedNodeInfo{f.NodeLister}, NodeInfo: &predicates.CachedNodeInfo{f.NodeLister},
@ -220,15 +225,20 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything) cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything)
// Watch and cache all service objects. Scheduler needs to find all pods // Watch and cache all service objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers, so that it can spread them correctly. // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally. // Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything) cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)
// Watch and cache all ReplicationController objects. Scheduler needs to find all pods // Watch and cache all ReplicationController objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers, so that it can spread them correctly. // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally. // Cache this locally.
cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Store, 0).RunUntil(f.StopEverything) cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Store, 0).RunUntil(f.StopEverything)
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything)
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r) algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r)
@ -332,6 +342,11 @@ func (factory *ConfigFactory) createControllerLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client, "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie("")) return cache.NewListWatchFromClient(factory.Client, "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie(""))
} }
// Returns a cache.ListWatch that gets all changes to replicasets.
func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.ExtensionsClient, "replicasets", api.NamespaceAll, fields.ParseSelectorOrDie(""))
}
func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
return func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) {
if err == scheduler.ErrNoNodesAvailable { if err == scheduler.ErrNoNodesAvailable {

View File

@ -36,6 +36,7 @@ type PluginFactoryArgs struct {
PodLister algorithm.PodLister PodLister algorithm.PodLister
ServiceLister algorithm.ServiceLister ServiceLister algorithm.ServiceLister
ControllerLister algorithm.ControllerLister ControllerLister algorithm.ControllerLister
ReplicaSetLister algorithm.ReplicaSetLister
NodeLister algorithm.NodeLister NodeLister algorithm.NodeLister
NodeInfo predicates.NodeInfo NodeInfo predicates.NodeInfo
PVInfo predicates.PersistentVolumeInfo PVInfo predicates.PersistentVolumeInfo