Merge pull request #41918 from ncdc/shared-informers-14-scheduler

Automatic merge from submit-queue (batch tested with PRs 41714, 41510, 42052, 41918, 31515)

Switch scheduler to use generated listers/informers

Where possible, switch the scheduler to use generated listers and
informers. There are still some places where it probably makes more
sense to use one-off reflectors/informers (listing/watching just a
single node, listing/watching scheduled & unscheduled pods using a field
selector).

I think this can wait until master is open for 1.7 pulls, given that we're close to the 1.6 freeze.

After this and #41482 go in, the only code left that references legacylisters will be federation, and 1 bit in a stateful set unit test (which I'll clean up in a follow-up).

@resouer I imagine this will conflict with your equivalence class work, so one of us will be doing some rebasing 😄 

cc @wojtek-t @gmarek  @timothysc @jayunit100 @smarterclayton @deads2k @liggitt @sttts @derekwaynecarr @kubernetes/sig-scheduling-pr-reviews @kubernetes/sig-scalability-pr-reviews
pull/6/head
Kubernetes Submit Queue 2017-02-25 02:17:55 -08:00 committed by GitHub
commit 8e6af485f9
24 changed files with 417 additions and 221 deletions

View File

@ -195,7 +195,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s0", "333", v1.ServiceTypeLoadBalancer),
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s0", "333", v1.ServiceTypeLoadBalancer), nodes},
{Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
{
@ -206,9 +206,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s2", "666", v1.ServiceTypeLoadBalancer),
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s0", "444", v1.ServiceTypeLoadBalancer), nodes},
{newService("s1", "555", v1.ServiceTypeLoadBalancer), nodes},
{newService("s2", "666", v1.ServiceTypeLoadBalancer), nodes},
{Service: newService("s0", "444", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s1", "555", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
{
@ -220,8 +220,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
newService("s4", "123", v1.ServiceTypeClusterIP),
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s1", "888", v1.ServiceTypeLoadBalancer), nodes},
{newService("s3", "999", v1.ServiceTypeLoadBalancer), nodes},
{Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: nodes},
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
{
@ -231,7 +231,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
nil,
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{newService("s0", "234", v1.ServiceTypeLoadBalancer), nodes},
{Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
}

View File

@ -43,7 +43,7 @@ go_library(
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
"//pkg/capabilities:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/features:go_default_library",
"//pkg/fieldpath:go_default_library",

View File

@ -55,7 +55,7 @@ import (
"k8s.io/kubernetes/pkg/apis/componentconfig"
componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/api"
@ -374,21 +374,21 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
dockerExecHandler = &dockertools.NativeExecHandler{}
}
serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(serviceLW, &v1.Service{}, serviceStore, 0).Run()
cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0).Run()
}
serviceLister := &listers.StoreToServiceLister{Indexer: serviceStore}
serviceLister := corelisters.NewServiceLister(serviceIndexer)
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
cache.NewReflector(nodeLW, &v1.Node{}, nodeStore, 0).Run()
cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0).Run()
}
nodeLister := &listers.StoreToNodeLister{Store: nodeStore}
nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}
nodeLister := corelisters.NewNodeLister(nodeIndexer)
nodeInfo := &predicates.CachedNodeInfo{NodeLister: nodeLister}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
@ -803,7 +803,7 @@ type serviceLister interface {
}
type nodeLister interface {
List() (machines v1.NodeList, err error)
List(labels.Selector) ([]*v1.Node, error)
}
// Kubelet is the main kubelet implementation.

View File

@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
@ -401,26 +402,24 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
}
type testNodeLister struct {
nodes []v1.Node
nodes []*v1.Node
}
type testNodeInfo struct {
nodes []v1.Node
nodes []*v1.Node
}
func (ls testNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
for _, node := range ls.nodes {
if node.Name == id {
return &node, nil
return node, nil
}
}
return nil, fmt.Errorf("Node with name: %s does not exist", id)
}
func (ls testNodeLister) List() (v1.NodeList, error) {
return v1.NodeList{
Items: ls.nodes,
}, nil
func (ls testNodeLister) List(selector labels.Selector) ([]*v1.Node, error) {
return ls.nodes, nil
}
// Tests that we handle port conflicts correctly by setting the failed status in status map.
@ -432,7 +431,7 @@ func TestHandlePortConflicts(t *testing.T) {
testKubelet.fakeCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kl.nodeLister = testNodeLister{nodes: []v1.Node{
kl.nodeLister = testNodeLister{nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{
@ -442,7 +441,7 @@ func TestHandlePortConflicts(t *testing.T) {
},
},
}}
kl.nodeInfo = testNodeInfo{nodes: []v1.Node{
kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{
@ -487,7 +486,7 @@ func TestHandleHostNameConflicts(t *testing.T) {
testKubelet.fakeCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kl.nodeLister = testNodeLister{nodes: []v1.Node{
kl.nodeLister = testNodeLister{nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: "127.0.0.1"},
Status: v1.NodeStatus{
@ -497,7 +496,7 @@ func TestHandleHostNameConflicts(t *testing.T) {
},
},
}}
kl.nodeInfo = testNodeInfo{nodes: []v1.Node{
kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: "127.0.0.1"},
Status: v1.NodeStatus{
@ -535,7 +534,7 @@ func TestHandleNodeSelector(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
nodes := []v1.Node{
nodes := []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}},
Status: v1.NodeStatus{
@ -576,7 +575,7 @@ func TestHandleMemExceeded(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
nodes := []v1.Node{
nodes := []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI),
@ -1823,7 +1822,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
kl.nodeLister = testNodeLister{nodes: []v1.Node{
kl.nodeLister = testNodeLister{nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{
@ -1833,7 +1832,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
},
},
}}
kl.nodeInfo = testNodeInfo{nodes: []v1.Node{
kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{

View File

@ -18,6 +18,9 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library",
"//pkg/client/leaderelection:go_default_library",
"//pkg/client/leaderelection/resourcelock:go_default_library",
"//pkg/util/configz:go_default_library",

View File

@ -21,6 +21,8 @@ import (
"io/ioutil"
"os"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
"k8s.io/apimachinery/pkg/runtime"
@ -69,8 +71,28 @@ func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) {
}
// createScheduler encapsulates the entire creation of a runnable scheduler.
func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight)
func createScheduler(
s *options.SchedulerServer,
kubecli *clientset.Clientset,
nodeInformer coreinformers.NodeInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer extensionsinformers.ReplicaSetInformer,
serviceInformer coreinformers.ServiceInformer,
recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(
s.SchedulerName,
kubecli,
nodeInformer,
pvInformer,
pvcInformer,
replicationControllerInformer,
replicaSetInformer,
serviceInformer,
s.HardPodAffinitySymmetricWeight,
)
// Rebuild the configurator with a default Create(...) method.
configurator = &schedulerConfigurator{

View File

@ -14,6 +14,7 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/componentconfig/install:go_default_library",
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
"//pkg/client/leaderelection:go_default_library",
"//pkg/features:go_default_library",

View File

@ -27,6 +27,8 @@ import (
// add the kubernetes feature gates
_ "k8s.io/kubernetes/pkg/features"
// install the componentconfig api so we get its defaulting and conversion functions
_ "k8s.io/kubernetes/pkg/apis/componentconfig/install"
"github.com/spf13/pflag"
)

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/server/healthz"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/util/configz"
@ -67,24 +68,47 @@ func Run(s *options.SchedulerServer) error {
if err != nil {
return fmt.Errorf("unable to create kube client: %v", err)
}
recorder := createRecorder(kubecli, s)
sched, err := createScheduler(s, kubecli, recorder)
informerFactory := informers.NewSharedInformerFactory(kubecli, 0)
sched, err := createScheduler(
s,
kubecli,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
recorder,
)
if err != nil {
return fmt.Errorf("error creating scheduler: %v", err)
}
go startHTTP(s)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
run := func(_ <-chan struct{}) {
sched.Run()
select {}
}
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %v", err)
}
// TODO: enable other lock types
rl := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
@ -97,6 +121,7 @@ func Run(s *options.SchedulerServer) error {
EventRecorder: recorder,
},
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
@ -109,6 +134,7 @@ func Run(s *options.SchedulerServer) error {
},
},
})
panic("unreachable")
}

View File

@ -40,6 +40,7 @@ go_library(
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/metrics:go_default_library",

View File

@ -19,12 +19,13 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",

View File

@ -25,12 +25,13 @@ import (
"time"
"github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/legacylisters"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
@ -58,13 +59,22 @@ type PersistentVolumeInfo interface {
GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error)
}
// CachedPersistentVolumeInfo implements PersistentVolumeInfo
type CachedPersistentVolumeInfo struct {
corelisters.PersistentVolumeLister
}
func (c *CachedPersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) {
return c.Get(pvID)
}
type PersistentVolumeClaimInfo interface {
GetPersistentVolumeClaimInfo(namespace string, name string) (*v1.PersistentVolumeClaim, error)
}
// CachedPersistentVolumeClaimInfo implements PersistentVolumeClaimInfo
type CachedPersistentVolumeClaimInfo struct {
*listers.StoreToPersistentVolumeClaimLister
corelisters.PersistentVolumeClaimLister
}
// GetPersistentVolumeClaimInfo fetches the claim in specified namespace with specified name
@ -73,22 +83,22 @@ func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace
}
type CachedNodeInfo struct {
*listers.StoreToNodeLister
corelisters.NodeLister
}
// GetNodeInfo returns cached data for the node 'id'.
func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
node, exists, err := c.Get(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: id}})
node, err := c.Get(id)
if apierrors.IsNotFound(err) {
return nil, fmt.Errorf("node '%v' not found", id)
}
if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}
if !exists {
return nil, fmt.Errorf("node '%v' not found", id)
}
return node.(*v1.Node), nil
return node, nil
}
// Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file

View File

@ -35,6 +35,7 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/latest:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
@ -345,8 +346,19 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory := informers.NewSharedInformerFactory(client, 0)
if _, err := factory.NewConfigFactory(client, "some-scheduler-name", v1.DefaultHardPodAffinitySymmetricWeight).CreateFromConfig(policy); err != nil {
if _, err := factory.NewConfigFactory(
"some-scheduler-name",
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
).CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
continue
}

View File

@ -17,10 +17,11 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
@ -56,6 +57,7 @@ go_test(
"//pkg/api/testing:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/latest:go_default_library",

View File

@ -32,10 +32,11 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
"k8s.io/kubernetes/pkg/controller/informers"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -58,32 +59,26 @@ type ConfigFactory struct {
// queue for pods that need scheduling
podQueue *cache.FIFO
// a means to list all known scheduled pods.
scheduledPodLister *listers.StoreToPodLister
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
podLister algorithm.PodLister
// a means to list all nodes
nodeLister *listers.StoreToNodeLister
nodeLister corelisters.NodeLister
// a means to list all PersistentVolumes
pVLister *listers.StoreToPVFetcher
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
pVCLister *listers.StoreToPersistentVolumeClaimLister
pVCLister corelisters.PersistentVolumeClaimLister
// a means to list all services
serviceLister *listers.StoreToServiceLister
serviceLister corelisters.ServiceLister
// a means to list all controllers
controllerLister *listers.StoreToReplicationControllerLister
controllerLister corelisters.ReplicationControllerLister
// a means to list all replicasets
replicaSetLister *listers.StoreToReplicaSetLister
replicaSetLister extensionslisters.ReplicaSetLister
// Close this to stop all reflectors
StopEverything chan struct{}
informerFactory informers.SharedInformerFactory
scheduledPodPopulator cache.Controller
nodePopulator cache.Controller
pvPopulator cache.Controller
pvcPopulator cache.Controller
servicePopulator cache.Controller
controllerPopulator cache.Controller
schedulerCache schedulercache.Cache
@ -102,40 +97,41 @@ type ConfigFactory struct {
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int) scheduler.Configurator {
func NewConfigFactory(
schedulerName string,
client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer extensionsinformers.ReplicaSetInformer,
serviceInformer coreinformers.ServiceInformer,
hardPodAffinitySymmetricWeight int,
) scheduler.Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
// TODO: pass this in as an argument...
informerFactory := informers.NewSharedInformerFactory(client, nil, 0)
pvcInformer := informerFactory.PersistentVolumeClaims()
c := &ConfigFactory{
client: client,
podLister: schedulerCache,
podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
scheduledPodLister: &listers.StoreToPodLister{},
informerFactory: informerFactory,
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeLister: &listers.StoreToNodeLister{},
pVLister: &listers.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
pVLister: pvInformer.Lister(),
pVCLister: pvcInformer.Lister(),
pvcPopulator: pvcInformer.Informer().GetController(),
serviceLister: &listers.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
controllerLister: &listers.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
replicaSetLister: &listers.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
serviceLister: serviceInformer.Lister(),
controllerLister: replicationControllerInformer.Lister(),
replicaSetLister: replicaSetInformer.Lister(),
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: schedulerName,
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
}
c.podLister = schedulerCache
// On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because
// ScheduledPodLister is something we provide to plug in functions that
// they may need to call.
c.scheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
var scheduledPodIndexer cache.Indexer
scheduledPodIndexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
c.createAssignedNonTerminatedPodLW(),
&v1.Pod{},
0,
@ -146,48 +142,27 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
c.scheduledPodLister = corelisters.NewPodLister(scheduledPodIndexer)
c.nodeLister.Store, c.nodePopulator = cache.NewInformer(
c.createNodeLW(),
&v1.Node{},
0,
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
0,
)
c.nodeLister = nodeInformer.Lister()
// TODO(harryz) need to fill all the handlers here and below for equivalence cache
c.pVLister.Store, c.pvPopulator = cache.NewInformer(
c.createPersistentVolumeLW(),
&v1.PersistentVolume{},
0,
cache.ResourceEventHandlerFuncs{},
)
c.serviceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
c.createServiceLW(),
&v1.Service{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
c.controllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer(
c.createControllerLW(),
&v1.ReplicationController{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return c
}
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
func (c *ConfigFactory) GetNodeStore() cache.Store {
return c.nodeLister.Store
func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister {
return c.nodeLister
}
func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int {
@ -204,8 +179,8 @@ func (f *ConfigFactory) GetClient() clientset.Interface {
}
// GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests.
func (c *ConfigFactory) GetScheduledPodListerIndexer() cache.Indexer {
return c.scheduledPodLister.Indexer
func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister {
return c.scheduledPodLister
}
// TODO(harryz) need to update all the handlers here and below for equivalence cache
@ -394,7 +369,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()),
NodeLister: &nodePredicateLister{f.nodeLister},
Algorithm: algo,
Binder: &binder{f.client},
PodConditionUpdater: &podConditionUpdater{f.client},
@ -406,6 +381,14 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
}, nil
}
type nodePredicateLister struct {
corelisters.NodeLister
}
func (n *nodePredicateLister) List() ([]*v1.Node, error) {
return n.ListWithPredicate(getNodeConditionPredicate())
}
func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
@ -448,10 +431,10 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
ControllerLister: f.controllerLister,
ReplicaSetLister: f.replicaSetLister,
// All fit predicates only need to consider schedulable nodes.
NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.nodeLister},
PVInfo: f.pVLister,
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.pVCLister},
NodeLister: &nodePredicateLister{f.nodeLister},
NodeInfo: &predicates.CachedNodeInfo{NodeLister: f.nodeLister},
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: f.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: f.pVCLister},
HardPodAffinitySymmetricWeight: f.hardPodAffinitySymmetricWeight,
}, nil
}
@ -462,27 +445,6 @@ func (f *ConfigFactory) Run() {
// Begin populating scheduled pods.
go f.scheduledPodPopulator.Run(f.StopEverything)
// Begin populating nodes.
go f.nodePopulator.Run(f.StopEverything)
// Begin populating pv & pvc
go f.pvPopulator.Run(f.StopEverything)
go f.pvcPopulator.Run(f.StopEverything)
// Begin populating services
go f.servicePopulator.Run(f.StopEverything)
// Begin populating controllers
go f.controllerPopulator.Run(f.StopEverything)
// start informers...
f.informerFactory.Start(f.StopEverything)
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.replicaSetLister.Indexer, 0).RunUntil(f.StopEverything)
}
func (f *ConfigFactory) getNextPod() *v1.Pod {
@ -499,7 +461,7 @@ func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool {
return f.schedulerName == pod.Spec.SchedulerName
}
func getNodeConditionPredicate() listers.NodeConditionPredicate {
func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
return func(node *v1.Node) bool {
for i := range node.Status.Conditions {
cond := &node.Status.Conditions[i]
@ -542,39 +504,6 @@ func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatc
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", metav1.NamespaceAll, selector)
}
// createNodeLW returns a cache.ListWatch that gets all changes to nodes.
func (factory *ConfigFactory) createNodeLW() *cache.ListWatch {
// all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups
// the NodeCondition is used to filter out the nodes that are not ready or unschedulable
// the filtered list is used as the super set of nodes to consider for scheduling
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes.
func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumes", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims.
func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumeClaims", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "services", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to controllers.
func (factory *ConfigFactory) createControllerLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "replicationControllers", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to replicasets.
func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Extensions().RESTClient(), "replicasets", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
return func(pod *v1.Pod, err error) {
if err == core.ErrNoNodesAvailable {

View File

@ -33,6 +33,7 @@ import (
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
@ -49,7 +50,18 @@ func TestCreate(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
factory.Create()
}
@ -67,7 +79,18 @@ func TestCreateFromConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
@ -108,7 +131,18 @@ func TestCreateFromEmptyConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
configData = []byte(`{}`)
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
@ -150,7 +184,19 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle(testapi.Default.ResourcePath("pods", "bar", "foo"), &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue)
@ -247,9 +293,30 @@ func TestResponsibleForPod(t *testing.T) {
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
// factory of "default-scheduler"
factoryDefaultScheduler := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factoryDefaultScheduler := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
// factory of "foo-scheduler"
factoryFooScheduler := NewConfigFactory(client, "foo-scheduler", v1.DefaultHardPodAffinitySymmetricWeight)
factoryFooScheduler := NewConfigFactory(
"foo-scheduler",
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
// scheduler annotations to be tested
schedulerFitsDefault := "default-scheduler"
schedulerFitsFoo := "foo-scheduler"
@ -305,7 +372,18 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
// defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
// factory of "default-scheduler"
factory := NewConfigFactory(client, v1.DefaultSchedulerName, -1)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
-1,
)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: invalid hardPodAffinitySymmetricWeight, got nothing")
@ -337,7 +415,18 @@ func TestInvalidFactoryArgs(t *testing.T) {
}
for _, test := range testCases {
factory := NewConfigFactory(client, v1.DefaultSchedulerName, test.hardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
test.hardPodAffinitySymmetricWeight,
)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: %s, got nothing", test.expectErr)

View File

@ -24,6 +24,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
@ -69,9 +70,9 @@ type Configurator interface {
ResponsibleForPod(pod *v1.Pod) bool
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeStore() cache.Store
GetNodeLister() corelisters.NodeLister
GetClient() clientset.Interface
GetScheduledPodListerIndexer() cache.Indexer
GetScheduledPodLister() corelisters.PodLister
Run()
Create() (*Config, error)

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
@ -239,7 +240,18 @@ func TestSchedulerExtender(t *testing.T) {
}
policy.APIVersion = api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()
schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
schedulerConfigFactory := factory.NewConfigFactory(
v1.DefaultSchedulerName,
clientSet,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy)
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
@ -247,7 +259,9 @@ func TestSchedulerExtender(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientSet.Core().RESTClient()).Events("")})
scheduler.New(schedulerConfig).Run()
scheduler := scheduler.New(schedulerConfig)
informerFactory.Start(schedulerConfig.StopEverything)
scheduler.Run()
defer close(schedulerConfig.StopEverything)

View File

@ -37,6 +37,8 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
@ -44,7 +46,7 @@ import (
"k8s.io/kubernetes/test/integration/framework"
)
type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeStore cache.Store, c clientset.Interface)
type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface)
type nodeStateManager struct {
makeSchedulable nodeMutationFunc
@ -59,8 +61,19 @@ func TestUnschedulableNodes(t *testing.T) {
defer framework.DeleteTestingNamespace(ns, s, t)
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
schedulerConfigFactory := factory.NewConfigFactory(
v1.DefaultSchedulerName,
clientSet,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
@ -68,11 +81,12 @@ func TestUnschedulableNodes(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName})
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")})
informerFactory.Start(schedulerConfig.StopEverything)
scheduler.New(schedulerConfig).Run()
defer close(schedulerConfig.StopEverything)
DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.GetNodeStore())
DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.GetNodeLister())
}
func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
@ -94,23 +108,23 @@ func podScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond
// Wait till the passFunc confirms that the object it expects to see is in the store.
// Used to observe reflected events.
func waitForReflection(t *testing.T, s cache.Store, key string, passFunc func(n interface{}) bool) error {
func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, passFunc func(n interface{}) bool) error {
nodes := []*v1.Node{}
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
if n, _, err := s.GetByKey(key); err == nil && passFunc(n) {
n, err := nodeLister.Get(key)
switch {
case err == nil && passFunc(n):
return true, nil
} else {
if err != nil {
t.Errorf("Unexpected error: %v", err)
} else {
if n == nil {
case errors.IsNotFound(err):
nodes = append(nodes, nil)
} else {
nodes = append(nodes, n.(*v1.Node))
}
case err != nil:
t.Errorf("Unexpected error: %v", err)
default:
nodes = append(nodes, n)
}
return false, nil
}
})
if err != nil {
t.Logf("Logging consecutive node versions received from store:")
@ -121,7 +135,7 @@ func waitForReflection(t *testing.T, s cache.Store, key string, passFunc func(n
return err
}
func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Namespace, nodeStore cache.Store) {
func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Namespace, nodeLister corelisters.NodeLister) {
// NOTE: This test cannot run in parallel, because it is creating and deleting
// non-namespaced objects (Nodes).
defer cs.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{})
@ -167,12 +181,12 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names
nodeModifications := []nodeStateManager{
// Test node.Spec.Unschedulable=true/false
{
makeUnSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) {
makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
n.Spec.Unschedulable = true
if _, err := c.Core().Nodes().Update(n); err != nil {
t.Fatalf("Failed to update node with unschedulable=true: %v", err)
}
err = waitForReflection(t, s, nodeKey, func(node interface{}) bool {
err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
// An unschedulable node should still be present in the store
// Nodes that are unschedulable or that are not ready or
// have their disk full (Node.Spec.Conditions) are excluded
@ -183,12 +197,12 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names
t.Fatalf("Failed to observe reflected update for setting unschedulable=true: %v", err)
}
},
makeSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) {
makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
n.Spec.Unschedulable = false
if _, err := c.Core().Nodes().Update(n); err != nil {
t.Fatalf("Failed to update node with unschedulable=false: %v", err)
}
err = waitForReflection(t, s, nodeKey, func(node interface{}) bool {
err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
return node != nil && node.(*v1.Node).Spec.Unschedulable == false
})
if err != nil {
@ -198,7 +212,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names
},
// Test node.Status.Conditions=ConditionTrue/Unknown
{
makeUnSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) {
makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
n.Status = v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
@ -208,14 +222,14 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names
if _, err = c.Core().Nodes().UpdateStatus(n); err != nil {
t.Fatalf("Failed to update node with bad status condition: %v", err)
}
err = waitForReflection(t, s, nodeKey, func(node interface{}) bool {
err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
return node != nil && node.(*v1.Node).Status.Conditions[0].Status == v1.ConditionUnknown
})
if err != nil {
t.Fatalf("Failed to observe reflected update for status condition update: %v", err)
}
},
makeSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) {
makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
n.Status = v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
@ -225,7 +239,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names
if _, err = c.Core().Nodes().UpdateStatus(n); err != nil {
t.Fatalf("Failed to update node with healthy status condition: %v", err)
}
err = waitForReflection(t, s, nodeKey, func(node interface{}) bool {
err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
return node != nil && node.(*v1.Node).Status.Conditions[0].Status == v1.ConditionTrue
})
if err != nil {
@ -242,7 +256,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names
}
// Apply the unschedulable modification to the node, and wait for the reflection
mod.makeUnSchedulable(t, unSchedNode, nodeStore, cs)
mod.makeUnSchedulable(t, unSchedNode, nodeLister, cs)
// Create the new pod, note that this needs to happen post unschedulable
// modification or we have a race in the test.
@ -273,7 +287,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names
if err != nil {
t.Fatalf("Failed to get node: %v", err)
}
mod.makeSchedulable(t, schedNode, nodeStore, cs)
mod.makeSchedulable(t, schedNode, nodeLister, cs)
// Wait until the pod is scheduled.
err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(cs, myPod.Namespace, myPod.Name))
@ -329,7 +343,18 @@ func TestMultiScheduler(t *testing.T) {
// non-namespaced objects (Nodes).
defer clientSet.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{})
schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
schedulerConfigFactory := factory.NewConfigFactory(
v1.DefaultSchedulerName,
clientSet,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
@ -337,6 +362,7 @@ func TestMultiScheduler(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName})
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")})
informerFactory.Start(schedulerConfig.StopEverything)
scheduler.New(schedulerConfig).Run()
// default-scheduler will be stopped later
@ -399,8 +425,19 @@ func TestMultiScheduler(t *testing.T) {
// 5. create and start a scheduler with name "foo-scheduler"
clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory2 := informers.NewSharedInformerFactory(clientSet, 0)
schedulerConfigFactory2 := factory.NewConfigFactory(clientSet2, "foo-scheduler", v1.DefaultHardPodAffinitySymmetricWeight)
schedulerConfigFactory2 := factory.NewConfigFactory(
"foo-scheduler",
clientSet2,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
schedulerConfig2, err := schedulerConfigFactory2.Create()
if err != nil {
t.Errorf("Couldn't create scheduler config: %v", err)
@ -408,6 +445,7 @@ func TestMultiScheduler(t *testing.T) {
eventBroadcaster2 := record.NewBroadcaster()
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(api.Scheme, clientv1.EventSource{Component: "foo-scheduler"})
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet2.Core().RESTClient()).Events("")})
informerFactory2.Start(schedulerConfig2.StopEverything)
scheduler.New(schedulerConfig2).Run()
defer close(schedulerConfig2.StopEverything)
@ -490,12 +528,23 @@ func TestAllocatable(t *testing.T) {
// 1. create and start default-scheduler
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
// NOTE: This test cannot run in parallel, because it is creating and deleting
// non-namespaced objects (Nodes).
defer clientSet.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{})
schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
schedulerConfigFactory := factory.NewConfigFactory(
v1.DefaultSchedulerName,
clientSet,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
@ -503,6 +552,7 @@ func TestAllocatable(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName})
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")})
informerFactory.Start(schedulerConfig.StopEverything)
scheduler.New(schedulerConfig).Run()
// default-scheduler will be stopped later
defer close(schedulerConfig.StopEverything)

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
"//plugin/pkg/scheduler/algorithmprovider:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library",
@ -43,6 +44,7 @@ go_test(
"//test/utils:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
],
)

View File

@ -20,6 +20,7 @@ import (
"testing"
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
@ -74,7 +75,10 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) {
podCreator.CreatePods()
for {
scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List()
scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything())
if err != nil {
glog.Fatalf("%v", err)
}
if len(scheduled) >= numScheduledPods {
break
}
@ -89,7 +93,10 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) {
for {
// This can potentially affect performance of scheduler, since List() is done under mutex.
// TODO: Setup watch on apiserver and wait until all pods scheduled.
scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List()
scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything())
if err != nil {
glog.Fatalf("%v", err)
}
if len(scheduled) >= numScheduledPods+b.N {
break
}

View File

@ -23,6 +23,7 @@ import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
@ -209,7 +210,10 @@ func schedulePods(config *testConfig) int32 {
// Bake in time for the first pod scheduling event.
for {
time.Sleep(50 * time.Millisecond)
scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List()
scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything())
if err != nil {
glog.Fatalf("%v", err)
}
// 30,000 pods -> wait till @ least 300 are scheduled to start measuring.
// TODO Find out why sometimes there may be scheduling blips in the beggining.
if len(scheduled) > config.numPods/100 {
@ -224,7 +228,10 @@ func schedulePods(config *testConfig) int32 {
// This can potentially affect performance of scheduler, since List() is done under mutex.
// Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler.
// TODO: Setup watch on apiserver and wait until all pods scheduled.
scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List()
scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything())
if err != nil {
glog.Fatalf("%v", err)
}
// We will be completed when all pods are done being scheduled.
// return the worst-case-scenario interval that was seen during this time.

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
@ -58,7 +59,19 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy
Burst: 5000,
})
schedulerConfigurator = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
schedulerConfigurator = factory.NewConfigFactory(
v1.DefaultSchedulerName,
clientSet,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")})
@ -70,11 +83,15 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy
glog.Fatalf("Error creating scheduler: %v", err)
}
stop := make(chan struct{})
informerFactory.Start(stop)
sched.Run()
destroyFunc = func() {
glog.Infof("destroying")
sched.StopEverything()
close(stop)
s.Close()
glog.Infof("destroyed")
}