From 187e2e01c971771ab4729eed06726ee8ada66acd Mon Sep 17 00:00:00 2001 From: Guoliang Wang Date: Sat, 22 Sep 2018 08:30:18 +0800 Subject: [PATCH] Move scheduler cache interface and implementation to pkg/scheduler/internal/cache --- cmd/kubeadm/.import-restrictions | 1 + pkg/kubectl/.import-restrictions | 1 + pkg/scheduler/BUILD | 5 +- pkg/scheduler/algorithm/BUILD | 1 + pkg/scheduler/algorithm/types.go | 3 +- pkg/scheduler/cache/BUILD | 15 - pkg/scheduler/cache/node_info.go | 92 ++++- pkg/scheduler/cache/node_info_test.go | 56 ++- pkg/scheduler/core/BUILD | 70 ++-- pkg/scheduler/core/extender_test.go | 3 +- pkg/scheduler/core/generic_scheduler.go | 5 +- pkg/scheduler/core/generic_scheduler_test.go | 13 +- pkg/scheduler/factory/BUILD | 11 +- pkg/scheduler/factory/cache_comparer.go | 4 +- pkg/scheduler/factory/factory.go | 6 +- pkg/scheduler/internal/cache/BUILD | 59 +++ pkg/scheduler/{ => internal}/cache/cache.go | 67 ++-- .../{ => internal}/cache/cache_test.go | 350 +++++++++--------- .../{ => internal}/cache/interface.go | 5 +- .../{ => internal}/cache/node_tree.go | 1 + .../{ => internal}/cache/node_tree_test.go | 0 pkg/scheduler/scheduler.go | 6 +- pkg/scheduler/scheduler_test.go | 18 +- pkg/scheduler/testing/BUILD | 1 + pkg/scheduler/testing/fake_cache.go | 9 +- pkg/scheduler/testing/fake_lister.go | 4 +- 26 files changed, 470 insertions(+), 336 deletions(-) create mode 100644 pkg/scheduler/internal/cache/BUILD rename pkg/scheduler/{ => internal}/cache/cache.go (90%) rename pkg/scheduler/{ => internal}/cache/cache_test.go (80%) rename pkg/scheduler/{ => internal}/cache/interface.go (96%) rename pkg/scheduler/{ => internal}/cache/node_tree.go (99%) rename pkg/scheduler/{ => internal}/cache/node_tree_test.go (100%) diff --git a/cmd/kubeadm/.import-restrictions b/cmd/kubeadm/.import-restrictions index d6c7b8b48b..44489b8895 100644 --- a/cmd/kubeadm/.import-restrictions +++ b/cmd/kubeadm/.import-restrictions @@ -71,6 +71,7 @@ "k8s.io/kubernetes/pkg/scheduler/algorithm", "k8s.io/kubernetes/pkg/scheduler/api", "k8s.io/kubernetes/pkg/scheduler/cache", + "k8s.io/kubernetes/pkg/scheduler/internal/cache", "k8s.io/kubernetes/pkg/scheduler/util", "k8s.io/kubernetes/pkg/security/apparmor", "k8s.io/kubernetes/pkg/serviceaccount", diff --git a/pkg/kubectl/.import-restrictions b/pkg/kubectl/.import-restrictions index 79f6b5a938..6cc204640d 100644 --- a/pkg/kubectl/.import-restrictions +++ b/pkg/kubectl/.import-restrictions @@ -122,6 +122,7 @@ "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util", "k8s.io/kubernetes/pkg/scheduler/api", "k8s.io/kubernetes/pkg/scheduler/cache", + "k8s.io/kubernetes/pkg/scheduler/internal/cache", "k8s.io/kubernetes/pkg/scheduler/util", "k8s.io/kubernetes/pkg/scheduler/volumebinder", "k8s.io/kubernetes/pkg/security/apparmor", diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 2d849fef88..e8a336120a 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -13,9 +13,9 @@ go_library( "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/api:go_default_library", - "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core/equivalence:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", @@ -42,8 +42,8 @@ go_test( "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/api:go_default_library", - "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/core:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -77,6 +77,7 @@ filegroup( "//pkg/scheduler/cache:all-srcs", "//pkg/scheduler/core:all-srcs", "//pkg/scheduler/factory:all-srcs", + "//pkg/scheduler/internal/cache:all-srcs", "//pkg/scheduler/internal/queue:all-srcs", "//pkg/scheduler/metrics:all-srcs", "//pkg/scheduler/testing:all-srcs", diff --git a/pkg/scheduler/algorithm/BUILD b/pkg/scheduler/algorithm/BUILD index 3af3c59a4d..96bf026af0 100644 --- a/pkg/scheduler/algorithm/BUILD +++ b/pkg/scheduler/algorithm/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/apis/core:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/cache:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", diff --git a/pkg/scheduler/algorithm/types.go b/pkg/scheduler/algorithm/types.go index 0ad777e916..16a0ce4f8a 100644 --- a/pkg/scheduler/algorithm/types.go +++ b/pkg/scheduler/algorithm/types.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/labels" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) // NodeFieldSelectorKeys is a map that: the key are node field selector keys; the values are @@ -98,7 +99,7 @@ type PodLister interface { List(labels.Selector) ([]*v1.Pod, error) // This is similar to "List()", but the returned slice does not // contain pods that don't pass `podFilter`. - FilteredList(podFilter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) + FilteredList(podFilter schedulerinternalcache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) } // ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. diff --git a/pkg/scheduler/cache/BUILD b/pkg/scheduler/cache/BUILD index 68f5f39c7d..84d3e6ca9b 100644 --- a/pkg/scheduler/cache/BUILD +++ b/pkg/scheduler/cache/BUILD @@ -3,26 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ - "cache.go", - "interface.go", "node_info.go", - "node_tree.go", "util.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/cache", visibility = ["//visibility:public"], deps = [ "//pkg/apis/core/v1/helper:go_default_library", - "//pkg/features:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/util:go_default_library", - "//pkg/util/node:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) @@ -30,24 +22,17 @@ go_library( go_test( name = "go_default_test", srcs = [ - "cache_test.go", "node_info_test.go", - "node_tree_test.go", "util_test.go", ], embed = [":go_default_library"], deps = [ - "//pkg/features:go_default_library", - "//pkg/kubelet/apis:go_default_library", - "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/pkg/scheduler/cache/node_info.go b/pkg/scheduler/cache/node_info.go index c6b5f96cd8..8bc5923c61 100644 --- a/pkg/scheduler/cache/node_info.go +++ b/pkg/scheduler/cache/node_info.go @@ -36,6 +36,14 @@ var ( generation int64 ) +// ImageStateSummary provides summarized information about the state of an image. +type ImageStateSummary struct { + // Size of the image + Size int64 + // Used to track how many nodes have this image + NumNodes int +} + // NodeInfo is node level aggregated information. type NodeInfo struct { // Overall node information. @@ -66,7 +74,7 @@ type NodeInfo struct { // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of // scheduling cycle. // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities. - TransientInfo *transientSchedulerInfo + TransientInfo *TransientSchedulerInfo // Cached conditions of node for faster lookup. memoryPressureCondition v1.ConditionStatus @@ -99,28 +107,28 @@ type nodeTransientInfo struct { RequestedVolumes int } -// transientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle. +// TransientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle. // It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and // priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization // on node etc. // IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure // only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle. -type transientSchedulerInfo struct { +type TransientSchedulerInfo struct { TransientLock sync.Mutex // NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here. TransNodeInfo nodeTransientInfo } -// newTransientSchedulerInfo returns a new scheduler transient structure with initialized values. -func newTransientSchedulerInfo() *transientSchedulerInfo { - tsi := &transientSchedulerInfo{ +// NewTransientSchedulerInfo returns a new scheduler transient structure with initialized values. +func NewTransientSchedulerInfo() *TransientSchedulerInfo { + tsi := &TransientSchedulerInfo{ TransNodeInfo: initializeNodeTransientInfo(), } return tsi } -// resetTransientSchedulerInfo resets the transientSchedulerInfo. -func (transientSchedInfo *transientSchedulerInfo) resetTransientSchedulerInfo() { +// ResetTransientSchedulerInfo resets the TransientSchedulerInfo. +func (transientSchedInfo *TransientSchedulerInfo) ResetTransientSchedulerInfo() { transientSchedInfo.TransientLock.Lock() defer transientSchedInfo.TransientLock.Unlock() // Reset TransientNodeInfo. @@ -259,7 +267,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { requestedResource: &Resource{}, nonzeroRequest: &Resource{}, allocatableResource: &Resource{}, - TransientInfo: newTransientSchedulerInfo(), + TransientInfo: NewTransientSchedulerInfo(), generation: nextGeneration(), usedPorts: make(util.HostPortInfo), imageStates: make(map[string]*ImageStateSummary), @@ -286,6 +294,11 @@ func (n *NodeInfo) Pods() []*v1.Pod { return n.pods } +// SetPods sets all pods scheduled (including assumed to be) on this node. +func (n *NodeInfo) SetPods(pods []*v1.Pod) { + n.pods = pods +} + // UsedPorts returns used ports on this node. func (n *NodeInfo) UsedPorts() util.HostPortInfo { if n == nil { @@ -294,6 +307,11 @@ func (n *NodeInfo) UsedPorts() util.HostPortInfo { return n.usedPorts } +// SetUsedPorts sets the used ports on this node. +func (n *NodeInfo) SetUsedPorts(newUsedPorts util.HostPortInfo) { + n.usedPorts = newUsedPorts +} + // ImageStates returns the state information of all images. func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary { if n == nil { @@ -302,6 +320,11 @@ func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary { return n.imageStates } +// SetImageStates sets the state information of all images. +func (n *NodeInfo) SetImageStates(newImageStates map[string]*ImageStateSummary) { + n.imageStates = newImageStates +} + // PodsWithAffinity return all pods with (anti)affinity constraints on this node. func (n *NodeInfo) PodsWithAffinity() []*v1.Pod { if n == nil { @@ -326,6 +349,11 @@ func (n *NodeInfo) Taints() ([]v1.Taint, error) { return n.taints, n.taintsErr } +// SetTaints sets the taints list on this node. +func (n *NodeInfo) SetTaints(newTaints []v1.Taint) { + n.taints = newTaints +} + // MemoryPressureCondition returns the memory pressure condition status on this node. func (n *NodeInfo) MemoryPressureCondition() v1.ConditionStatus { if n == nil { @@ -358,6 +386,11 @@ func (n *NodeInfo) RequestedResource() Resource { return *n.requestedResource } +// SetRequestedResource sets the aggregated resource request of pods on this node. +func (n *NodeInfo) SetRequestedResource(newResource *Resource) { + n.requestedResource = newResource +} + // NonZeroRequest returns aggregated nonzero resource request of pods on this node. func (n *NodeInfo) NonZeroRequest() Resource { if n == nil { @@ -366,6 +399,11 @@ func (n *NodeInfo) NonZeroRequest() Resource { return *n.nonzeroRequest } +// SetNonZeroRequest sets the aggregated nonzero resource request of pods on this node. +func (n *NodeInfo) SetNonZeroRequest(newResource *Resource) { + n.nonzeroRequest = newResource +} + // AllocatableResource returns allocatable resources on a given node. func (n *NodeInfo) AllocatableResource() Resource { if n == nil { @@ -380,6 +418,19 @@ func (n *NodeInfo) SetAllocatableResource(allocatableResource *Resource) { n.generation = nextGeneration() } +// GetGeneration returns the generation on this node. +func (n *NodeInfo) GetGeneration() int64 { + if n == nil { + return 0 + } + return n.generation +} + +// SetGeneration sets the generation on this node. This is for testing only. +func (n *NodeInfo) SetGeneration(newGeneration int64) { + n.generation = newGeneration +} + // Clone returns a copy of this node. func (n *NodeInfo) Clone() *NodeInfo { clone := &NodeInfo{ @@ -464,20 +515,20 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) { } // Consume ports when pods added. - n.updateUsedPorts(pod, true) + n.UpdateUsedPorts(pod, true) n.generation = nextGeneration() } // RemovePod subtracts pod information from this NodeInfo. func (n *NodeInfo) RemovePod(pod *v1.Pod) error { - k1, err := getPodKey(pod) + k1, err := GetPodKey(pod) if err != nil { return err } for i := range n.podsWithAffinity { - k2, err := getPodKey(n.podsWithAffinity[i]) + k2, err := GetPodKey(n.podsWithAffinity[i]) if err != nil { glog.Errorf("Cannot get pod key, err: %v", err) continue @@ -490,7 +541,7 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error { } } for i := range n.pods { - k2, err := getPodKey(n.pods[i]) + k2, err := GetPodKey(n.pods[i]) if err != nil { glog.Errorf("Cannot get pod key, err: %v", err) continue @@ -515,7 +566,7 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error { n.nonzeroRequest.Memory -= non0Mem // Release ports when remove Pods. - n.updateUsedPorts(pod, false) + n.UpdateUsedPorts(pod, false) n.generation = nextGeneration() @@ -539,7 +590,8 @@ func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64) return } -func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) { +// UpdateUsedPorts updates the UsedPorts of NodeInfo. +func (n *NodeInfo) UpdateUsedPorts(pod *v1.Pod, add bool) { for j := range pod.Spec.Containers { container := &pod.Spec.Containers[j] for k := range container.Ports { @@ -573,7 +625,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { // We ignore other conditions. } } - n.TransientInfo = newTransientSchedulerInfo() + n.TransientInfo = NewTransientSchedulerInfo() n.generation = nextGeneration() return nil } @@ -614,9 +666,9 @@ func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod { continue } // If pod is on the given node, add it to 'filtered' only if it is present in nodeInfo. - podKey, _ := getPodKey(p) + podKey, _ := GetPodKey(p) for _, np := range n.Pods() { - npodkey, _ := getPodKey(np) + npodkey, _ := GetPodKey(np) if npodkey == podKey { filtered = append(filtered, p) break @@ -626,8 +678,8 @@ func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod { return filtered } -// getPodKey returns the string key of a pod. -func getPodKey(pod *v1.Pod) (string, error) { +// GetPodKey returns the string key of a pod. +func GetPodKey(pod *v1.Pod) (string, error) { uid := string(pod.UID) if len(uid) == 0 { return "", errors.New("Cannot get cache key for pod with empty UID") diff --git a/pkg/scheduler/cache/node_info_test.go b/pkg/scheduler/cache/node_info_test.go index 0bfbbb19a5..e45537e3a2 100644 --- a/pkg/scheduler/cache/node_info_test.go +++ b/pkg/scheduler/cache/node_info_test.go @@ -19,6 +19,7 @@ package cache import ( "fmt" "reflect" + "strings" "testing" "k8s.io/api/core/v1" @@ -240,6 +241,43 @@ func TestSetMaxResource(t *testing.T) { } } +type testingMode interface { + Fatalf(format string, args ...interface{}) +} + +func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort) *v1.Pod { + req := v1.ResourceList{} + if cpu != "" { + req = v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(mem), + } + if extended != "" { + parts := strings.Split(extended, ":") + if len(parts) != 2 { + t.Fatalf("Invalid extended resource string: \"%s\"", extended) + } + req[v1.ResourceName(parts[0])] = resource.MustParse(parts[1]) + } + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(objName), + Namespace: "node_info_cache_test", + Name: objName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Resources: v1.ResourceRequirements{ + Requests: req, + }, + Ports: ports, + }}, + NodeName: nodeName, + }, + } +} + func TestNewNodeInfo(t *testing.T) { nodeName := "test-node" pods := []*v1.Pod{ @@ -262,7 +300,7 @@ func TestNewNodeInfo(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, - TransientInfo: newTransientSchedulerInfo(), + TransientInfo: NewTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -351,7 +389,7 @@ func TestNodeInfoClone(t *testing.T) { nodeInfo: &NodeInfo{ requestedResource: &Resource{}, nonzeroRequest: &Resource{}, - TransientInfo: newTransientSchedulerInfo(), + TransientInfo: NewTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -421,7 +459,7 @@ func TestNodeInfoClone(t *testing.T) { expected: &NodeInfo{ requestedResource: &Resource{}, nonzeroRequest: &Resource{}, - TransientInfo: newTransientSchedulerInfo(), + TransientInfo: NewTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -580,7 +618,7 @@ func TestNodeInfoAddPod(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, - TransientInfo: newTransientSchedulerInfo(), + TransientInfo: NewTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -699,7 +737,7 @@ func TestNodeInfoRemovePod(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, - TransientInfo: newTransientSchedulerInfo(), + TransientInfo: NewTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 2, usedPorts: util.HostPortInfo{ @@ -816,7 +854,7 @@ func TestNodeInfoRemovePod(t *testing.T) { AllowedPodNumber: 0, ScalarResources: map[v1.ResourceName]int64(nil), }, - TransientInfo: newTransientSchedulerInfo(), + TransientInfo: NewTransientSchedulerInfo(), allocatableResource: &Resource{}, generation: 3, usedPorts: util.HostPortInfo{ @@ -865,7 +903,7 @@ func TestNodeInfoRemovePod(t *testing.T) { err := ni.RemovePod(test.pod) if err != nil { if test.errExpected { - expectedErrorMsg := fmt.Errorf("no corresponding pod %s in pods of node %s", test.pod.Name, ni.node.Name) + expectedErrorMsg := fmt.Errorf("no corresponding pod %s in pods of node %s", test.pod.Name, ni.Node().Name) if expectedErrorMsg == err { t.Errorf("expected error: %v, got: %v", expectedErrorMsg, err) } @@ -887,10 +925,10 @@ func TestNodeInfoRemovePod(t *testing.T) { func fakeNodeInfo(pods ...*v1.Pod) *NodeInfo { ni := NewNodeInfo(pods...) - ni.node = &v1.Node{ + ni.SetNode(&v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "test-node", }, - } + }) return ni } diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 160ceff5bc..1415c43027 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -1,39 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_test( - name = "go_default_test", - srcs = [ - "extender_test.go", - "generic_scheduler_test.go", - ], - embed = [":go_default_library"], - deps = [ - "//pkg/scheduler/algorithm:go_default_library", - "//pkg/scheduler/algorithm/predicates:go_default_library", - "//pkg/scheduler/algorithm/priorities:go_default_library", - "//pkg/scheduler/algorithm/priorities/util:go_default_library", - "//pkg/scheduler/api:go_default_library", - "//pkg/scheduler/cache:go_default_library", - "//pkg/scheduler/core/equivalence:go_default_library", - "//pkg/scheduler/internal/queue:go_default_library", - "//pkg/scheduler/testing:go_default_library", - "//pkg/scheduler/util:go_default_library", - "//staging/src/k8s.io/api/apps/v1:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - ], -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -42,12 +7,14 @@ go_library( "generic_scheduler.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/core", + visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/core/equivalence:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", @@ -67,6 +34,36 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = [ + "extender_test.go", + "generic_scheduler_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/algorithm:go_default_library", + "//pkg/scheduler/algorithm/predicates:go_default_library", + "//pkg/scheduler/algorithm/priorities:go_default_library", + "//pkg/scheduler/algorithm/priorities/util:go_default_library", + "//pkg/scheduler/api:go_default_library", + "//pkg/scheduler/cache:go_default_library", + "//pkg/scheduler/core/equivalence:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", + "//pkg/scheduler/internal/queue:go_default_library", + "//pkg/scheduler/testing:go_default_library", + "//pkg/scheduler/util:go_default_library", + "//staging/src/k8s.io/api/apps/v1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -81,4 +78,5 @@ filegroup( "//pkg/scheduler/core/equivalence:all-srcs", ], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 43687da793..c2b93eafe5 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/util" @@ -498,7 +499,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) } - cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop) for _, name := range test.nodes { cache.AddNode(createNode(name)) } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 57984abe26..dbadf0147b 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -41,6 +41,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core/equivalence" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" @@ -95,7 +96,7 @@ func (f *FitError) Error() string { } type genericScheduler struct { - cache schedulercache.Cache + cache schedulerinternalcache.Cache equivalenceCache *equivalence.Cache schedulingQueue internalqueue.SchedulingQueue predicates map[string]algorithm.FitPredicate @@ -1139,7 +1140,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla // NewGenericScheduler creates a genericScheduler object. func NewGenericScheduler( - cache schedulercache.Cache, + cache schedulerinternalcache.Cache, eCache *equivalence.Cache, podQueue internalqueue.SchedulingQueue, predicates map[string]algorithm.FitPredicate, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index b94150633d..eae28ac6e9 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -41,6 +41,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core/equivalence" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -433,7 +434,7 @@ func TestGenericScheduler(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop) for _, pod := range test.pods { cache.AddPod(pod) } @@ -475,7 +476,7 @@ func TestGenericScheduler(t *testing.T) { // makeScheduler makes a simple genericScheduler for testing. func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Node) *genericScheduler { algorithmpredicates.SetPredicatesOrdering(order) - cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop) for _, n := range nodes { cache.AddNode(n) } @@ -1382,7 +1383,7 @@ func TestPreempt(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { stop := make(chan struct{}) - cache := schedulercache.New(time.Duration(0), stop) + cache := schedulerinternalcache.New(time.Duration(0), stop) for _, pod := range test.pods { cache.AddPod(pod) } @@ -1460,7 +1461,7 @@ func TestPreempt(t *testing.T) { // syncingMockCache delegates method calls to an actual Cache, // but calls to UpdateNodeNameToInfoMap synchronize with the test. type syncingMockCache struct { - schedulercache.Cache + schedulerinternalcache.Cache cycleStart, cacheInvalidated chan struct{} once sync.Once } @@ -1498,7 +1499,7 @@ func TestCacheInvalidationRace(t *testing.T) { } // Set up the mock cache. - cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop) testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} cache.AddNode(testNode) mockCache := &syncingMockCache{ @@ -1586,7 +1587,7 @@ func TestCacheInvalidationRace2(t *testing.T) { } // Set up the mock cache. - cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop) testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} cache.AddNode(testNode) diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 2accd76653..e4d2c4183f 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -1,10 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -16,6 +10,7 @@ go_library( "signal_windows.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/factory", + visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", "//pkg/apis/core/helper:go_default_library", @@ -30,6 +25,7 @@ go_library( "//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core/equivalence:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", @@ -103,4 +99,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/scheduler/factory/cache_comparer.go b/pkg/scheduler/factory/cache_comparer.go index aee5c958bd..966a93049a 100644 --- a/pkg/scheduler/factory/cache_comparer.go +++ b/pkg/scheduler/factory/cache_comparer.go @@ -25,15 +25,15 @@ import ( "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" ) type cacheComparer struct { nodeLister corelisters.NodeLister podLister corelisters.PodLister - cache schedulercache.Cache + cache schedulerinternalcache.Cache podQueue internalqueue.SchedulingQueue - compareStrategy } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index e0cf384cf2..8952a30595 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -57,9 +57,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/api/validation" - schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core/equivalence" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" @@ -111,7 +111,7 @@ type configFactory struct { scheduledPodsHasSynced cache.InformerSynced - schedulerCache schedulercache.Cache + schedulerCache schedulerinternalcache.Cache // SchedulerName of a scheduler is used to select which pods will be // processed by this scheduler, based on pods's "spec.schedulerName". @@ -166,7 +166,7 @@ type ConfigFactoryArgs struct { // return the interface. func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator { stopEverything := make(chan struct{}) - schedulerCache := schedulercache.New(30*time.Second, stopEverything) + schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything) // storageClassInformer is only enabled through VolumeScheduling feature gate var storageClassLister storagelisters.StorageClassLister diff --git a/pkg/scheduler/internal/cache/BUILD b/pkg/scheduler/internal/cache/BUILD new file mode 100644 index 0000000000..a573dfabb9 --- /dev/null +++ b/pkg/scheduler/internal/cache/BUILD @@ -0,0 +1,59 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "cache.go", + "interface.go", + "node_tree.go", + ], + importpath = "k8s.io/kubernetes/pkg/scheduler/internal/cache", + visibility = ["//visibility:public"], + deps = [ + "//pkg/features:go_default_library", + "//pkg/scheduler/cache:go_default_library", + "//pkg/util/node:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "cache_test.go", + "node_tree_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//pkg/features:go_default_library", + "//pkg/kubelet/apis:go_default_library", + "//pkg/scheduler/algorithm/priorities/util:go_default_library", + "//pkg/scheduler/cache:go_default_library", + "//pkg/scheduler/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/internal/cache/cache.go similarity index 90% rename from pkg/scheduler/cache/cache.go rename to pkg/scheduler/internal/cache/cache.go index 439b93596c..16f7b25799 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" + schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "github.com/golang/glog" ) @@ -57,7 +58,7 @@ type schedulerCache struct { assumedPods map[string]bool // a map from pod key to podState. podStates map[string]*podState - nodes map[string]*NodeInfo + nodes map[string]*schedulercache.NodeInfo nodeTree *NodeTree // A map from image name to its imageState. imageStates map[string]*imageState @@ -78,17 +79,9 @@ type imageState struct { nodes sets.String } -// ImageStateSummary provides summarized information about the state of an image. -type ImageStateSummary struct { - // Size of the image - Size int64 - // Used to track how many nodes have this image - NumNodes int -} - // createImageStateSummary returns a summarizing snapshot of the given image's state. -func (cache *schedulerCache) createImageStateSummary(state *imageState) *ImageStateSummary { - return &ImageStateSummary{ +func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulercache.ImageStateSummary { + return &schedulercache.ImageStateSummary{ Size: state.size, NumNodes: len(state.nodes), } @@ -100,7 +93,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul period: period, stop: stop, - nodes: make(map[string]*NodeInfo), + nodes: make(map[string]*schedulercache.NodeInfo), nodeTree: newNodeTree(nil), assumedPods: make(map[string]bool), podStates: make(map[string]*podState), @@ -108,13 +101,13 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } -// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact, +// Snapshot takes a snapshot of the current schedulerinternalcache. The method has performance impact, // and should be only used in non-critical path. func (cache *schedulerCache) Snapshot() *Snapshot { cache.mu.RLock() defer cache.mu.RUnlock() - nodes := make(map[string]*NodeInfo) + nodes := make(map[string]*schedulercache.NodeInfo) for k, v := range cache.nodes { nodes[k] = v.Clone() } @@ -130,16 +123,16 @@ func (cache *schedulerCache) Snapshot() *Snapshot { } } -func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { +func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*schedulercache.NodeInfo) error { cache.mu.Lock() defer cache.mu.Unlock() for name, info := range cache.nodes { if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { // Transient scheduler info is reset here. - info.TransientInfo.resetTransientSchedulerInfo() + info.TransientInfo.ResetTransientSchedulerInfo() } - if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation { + if current, ok := nodeNameToInfo[name]; !ok || current.GetGeneration() != info.GetGeneration() { nodeNameToInfo[name] = info.Clone() } } @@ -164,11 +157,11 @@ func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.S // pre-allocating capacity. maxSize := 0 for _, info := range cache.nodes { - maxSize += len(info.pods) + maxSize += len(info.Pods()) } pods := make([]*v1.Pod, 0, maxSize) for _, info := range cache.nodes { - for _, pod := range info.pods { + for _, pod := range info.Pods() { if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { pods = append(pods, pod) } @@ -178,7 +171,7 @@ func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.S } func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { - key, err := getPodKey(pod) + key, err := schedulercache.GetPodKey(pod) if err != nil { return err } @@ -204,7 +197,7 @@ func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { // finishBinding exists to make tests determinitistic by injecting now as an argument func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { - key, err := getPodKey(pod) + key, err := schedulercache.GetPodKey(pod) if err != nil { return err } @@ -223,7 +216,7 @@ func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { } func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { - key, err := getPodKey(pod) + key, err := schedulercache.GetPodKey(pod) if err != nil { return err } @@ -255,7 +248,7 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { func (cache *schedulerCache) addPod(pod *v1.Pod) { n, ok := cache.nodes[pod.Spec.NodeName] if !ok { - n = NewNodeInfo() + n = schedulercache.NewNodeInfo() cache.nodes[pod.Spec.NodeName] = n } n.AddPod(pod) @@ -276,14 +269,14 @@ func (cache *schedulerCache) removePod(pod *v1.Pod) error { if err := n.RemovePod(pod); err != nil { return err } - if len(n.pods) == 0 && n.node == nil { + if len(n.Pods()) == 0 && n.Node() == nil { delete(cache.nodes, pod.Spec.NodeName) } return nil } func (cache *schedulerCache) AddPod(pod *v1.Pod) error { - key, err := getPodKey(pod) + key, err := schedulercache.GetPodKey(pod) if err != nil { return err } @@ -318,7 +311,7 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error { } func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { - key, err := getPodKey(oldPod) + key, err := schedulercache.GetPodKey(oldPod) if err != nil { return err } @@ -346,7 +339,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { } func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { - key, err := getPodKey(pod) + key, err := schedulercache.GetPodKey(pod) if err != nil { return err } @@ -375,7 +368,7 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { } func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { - key, err := getPodKey(pod) + key, err := schedulercache.GetPodKey(pod) if err != nil { return false, err } @@ -391,7 +384,7 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { } func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { - key, err := getPodKey(pod) + key, err := schedulercache.GetPodKey(pod) if err != nil { return nil, err } @@ -413,10 +406,10 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error { n, ok := cache.nodes[node.Name] if !ok { - n = NewNodeInfo() + n = schedulercache.NewNodeInfo() cache.nodes[node.Name] = n } else { - cache.removeNodeImageStates(n.node) + cache.removeNodeImageStates(n.Node()) } cache.nodeTree.AddNode(node) @@ -430,10 +423,10 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { n, ok := cache.nodes[newNode.Name] if !ok { - n = NewNodeInfo() + n = schedulercache.NewNodeInfo() cache.nodes[newNode.Name] = n } else { - cache.removeNodeImageStates(n.node) + cache.removeNodeImageStates(n.Node()) } cache.nodeTree.UpdateNode(oldNode, newNode) @@ -453,7 +446,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { // We can't do it unconditionally, because notifications about pods are delivered // in a different watch, and thus can potentially be observed later, even though // they happened before node removal. - if len(n.pods) == 0 && n.node == nil { + if len(n.Pods()) == 0 && n.Node() == nil { delete(cache.nodes, node.Name) } @@ -464,8 +457,8 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in // scheduler cache. This function assumes the lock to scheduler cache has been acquired. -func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInfo) { - newSum := make(map[string]*ImageStateSummary) +func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulercache.NodeInfo) { + newSum := make(map[string]*schedulercache.ImageStateSummary) for _, image := range node.Status.Images { for _, name := range image.Names { @@ -486,7 +479,7 @@ func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInf } } } - nodeInfo.imageStates = newSum + nodeInfo.SetImageStates(newSum) } // removeNodeImageStates removes the given node record from image entries having the node diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go similarity index 80% rename from pkg/scheduler/cache/cache_test.go rename to pkg/scheduler/internal/cache/cache_test.go index f78aa915de..858a3ec710 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -31,13 +31,18 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" + schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) -func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *NodeInfo) { +func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *schedulercache.NodeInfo) { // Ignore generation field. if actual != nil { - actual.generation = 0 + actual.SetGeneration(0) + } + if expected != nil { + expected.SetGeneration(0) } if !reflect.DeepEqual(actual, expected) { t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected) @@ -70,6 +75,21 @@ func (b *hostPortInfoBuilder) build() schedutil.HostPortInfo { return res } +func newNodeInfo(requestedResource *schedulercache.Resource, + nonzeroRequest *schedulercache.Resource, + pods []*v1.Pod, + usedPorts util.HostPortInfo, + imageStates map[string]*schedulercache.ImageStateSummary, +) *schedulercache.NodeInfo { + nodeInfo := schedulercache.NewNodeInfo(pods...) + nodeInfo.SetRequestedResource(requestedResource) + nodeInfo.SetNonZeroRequest(nonzeroRequest) + nodeInfo.SetUsedPorts(usedPorts) + nodeInfo.SetImageStates(imageStates) + + return nodeInfo +} + // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated // on node level. func TestAssumePodScheduled(t *testing.T) { @@ -89,111 +109,99 @@ func TestAssumePodScheduled(t *testing.T) { tests := []struct { pods []*v1.Pod - wNodeInfo *NodeInfo + wNodeInfo *schedulercache.NodeInfo }{{ pods: []*v1.Pod{testPods[0]}, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[0]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[0]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }, { pods: []*v1.Pod{testPods[1], testPods[2]}, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 300, Memory: 1524, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 300, Memory: 1524, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[1], testPods[2]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[1], testPods[2]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }, { // test non-zero request pods: []*v1.Pod{testPods[3]}, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 0, Memory: 0, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: priorityutil.DefaultMilliCPURequest, Memory: priorityutil.DefaultMemoryRequest, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[3]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[3]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }, { pods: []*v1.Pod{testPods[4]}, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 3}, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[4]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[4]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }, { pods: []*v1.Pod{testPods[4], testPods[5]}, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 300, Memory: 1524, ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 8}, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 300, Memory: 1524, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[4], testPods[5]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[4], testPods[5]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }, { pods: []*v1.Pod{testPods[6]}, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[6]}, - usedPorts: newHostPortInfoBuilder().build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[6]}, + newHostPortInfoBuilder().build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }, } @@ -246,7 +254,7 @@ func TestExpirePod(t *testing.T) { pods []*testExpirePodStruct cleanupTime time.Time - wNodeInfo *NodeInfo + wNodeInfo *schedulercache.NodeInfo }{{ // assumed pod would expires pods: []*testExpirePodStruct{ {pod: testPods[0], assumedTime: now}, @@ -259,21 +267,19 @@ func TestExpirePod(t *testing.T) { {pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)}, }, cleanupTime: now.Add(2 * ttl), - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 200, Memory: 1024, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 200, Memory: 1024, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[1]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[1]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }} for i, tt := range tests { @@ -308,25 +314,23 @@ func TestAddPodWillConfirm(t *testing.T) { podsToAssume []*v1.Pod podsToAdd []*v1.Pod - wNodeInfo *NodeInfo + wNodeInfo *schedulercache.NodeInfo }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. podsToAssume: []*v1.Pod{testPods[0], testPods[1]}, podsToAdd: []*v1.Pod{testPods[0]}, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[0]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{testPods[0]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }} for i, tt := range tests { @@ -402,28 +406,26 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { podsToAdd []*v1.Pod podsToUpdate [][]*v1.Pod - wNodeInfo map[string]*NodeInfo + wNodeInfo map[string]*schedulercache.NodeInfo }{{ podsToAssume: []*v1.Pod{assumedPod.DeepCopy()}, podsToAdd: []*v1.Pod{addedPod.DeepCopy()}, podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}}, - wNodeInfo: map[string]*NodeInfo{ + wNodeInfo: map[string]*schedulercache.NodeInfo{ "assumed-node": nil, - "actual-node": { - requestedResource: &Resource{ + "actual-node": newNodeInfo( + &schedulercache.Resource{ MilliCPU: 200, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 200, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{updatedPod.DeepCopy()}, - usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{updatedPod.DeepCopy()}, + newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }, }} @@ -462,24 +464,22 @@ func TestAddPodAfterExpiration(t *testing.T) { tests := []struct { pod *v1.Pod - wNodeInfo *NodeInfo + wNodeInfo *schedulercache.NodeInfo }{{ pod: basePod, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{basePod}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{basePod}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }} now := time.Now() @@ -517,39 +517,35 @@ func TestUpdatePod(t *testing.T) { podsToAdd []*v1.Pod podsToUpdate []*v1.Pod - wNodeInfo []*NodeInfo + wNodeInfo []*schedulercache.NodeInfo }{{ // add a pod and then update it twice podsToAdd: []*v1.Pod{testPods[0]}, podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]}, - wNodeInfo: []*NodeInfo{{ - requestedResource: &Resource{ + wNodeInfo: []*schedulercache.NodeInfo{newNodeInfo( + &schedulercache.Resource{ MilliCPU: 200, Memory: 1024, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 200, Memory: 1024, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[1]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageStates: make(map[string]*ImageStateSummary), - }, { - requestedResource: &Resource{ + []*v1.Pod{testPods[1]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[0]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }}, + []*v1.Pod{testPods[0]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + )}, }} for _, tt := range tests { @@ -648,40 +644,36 @@ func TestExpireAddUpdatePod(t *testing.T) { podsToAdd []*v1.Pod podsToUpdate []*v1.Pod - wNodeInfo []*NodeInfo + wNodeInfo []*schedulercache.NodeInfo }{{ // Pod is assumed, expired, and added. Then it would be updated twice. podsToAssume: []*v1.Pod{testPods[0]}, podsToAdd: []*v1.Pod{testPods[0]}, podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]}, - wNodeInfo: []*NodeInfo{{ - requestedResource: &Resource{ + wNodeInfo: []*schedulercache.NodeInfo{newNodeInfo( + &schedulercache.Resource{ MilliCPU: 200, Memory: 1024, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 200, Memory: 1024, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[1]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageStates: make(map[string]*ImageStateSummary), - }, { - requestedResource: &Resource{ + []*v1.Pod{testPods[1]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{testPods[0]}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }}, + []*v1.Pod{testPods[0]}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + )}, }} now := time.Now() @@ -742,24 +734,22 @@ func TestEphemeralStorageResource(t *testing.T) { podE := makePodWithEphemeralStorage(nodeName, "500") tests := []struct { pod *v1.Pod - wNodeInfo *NodeInfo + wNodeInfo *schedulercache.NodeInfo }{ { pod: podE, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ EphemeralStorage: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: priorityutil.DefaultMilliCPURequest, Memory: priorityutil.DefaultMemoryRequest, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{podE}, - usedPorts: schedutil.HostPortInfo{}, - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{podE}, + schedutil.HostPortInfo{}, + make(map[string]*schedulercache.ImageStateSummary), + ), }, } for i, tt := range tests { @@ -789,24 +779,22 @@ func TestRemovePod(t *testing.T) { basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) tests := []struct { pod *v1.Pod - wNodeInfo *NodeInfo + wNodeInfo *schedulercache.NodeInfo }{{ pod: basePod, - wNodeInfo: &NodeInfo{ - requestedResource: &Resource{ + wNodeInfo: newNodeInfo( + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - nonzeroRequest: &Resource{ + &schedulercache.Resource{ MilliCPU: 100, Memory: 500, }, - TransientInfo: newTransientSchedulerInfo(), - allocatableResource: &Resource{}, - pods: []*v1.Pod{basePod}, - usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageStates: make(map[string]*ImageStateSummary), - }, + []*v1.Pod{basePod}, + newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), + make(map[string]*schedulercache.ImageStateSummary), + ), }} for i, tt := range tests { @@ -885,7 +873,7 @@ func TestForgetPod(t *testing.T) { // getResourceRequest returns the resource request of all containers in Pods; // excuding initContainers. func getResourceRequest(pod *v1.Pod) v1.ResourceList { - result := &Resource{} + result := &schedulercache.Resource{} for _, container := range pod.Spec.Containers { result.Add(container.Resources.Requests) } @@ -894,22 +882,30 @@ func getResourceRequest(pod *v1.Pod) v1.ResourceList { } // buildNodeInfo creates a NodeInfo by simulating node operations in cache. -func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *NodeInfo { - expected := NewNodeInfo() +func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *schedulercache.NodeInfo { + expected := schedulercache.NewNodeInfo() // Simulate SetNode. - expected.node = node - expected.allocatableResource = NewResource(node.Status.Allocatable) - expected.taints = node.Spec.Taints - expected.generation++ + expected.SetNode(node) + + expected.SetAllocatableResource(schedulercache.NewResource(node.Status.Allocatable)) + expected.SetTaints(node.Spec.Taints) + expected.SetGeneration(expected.GetGeneration() + 1) for _, pod := range pods { // Simulate AddPod - expected.pods = append(expected.pods, pod) - expected.requestedResource.Add(getResourceRequest(pod)) - expected.nonzeroRequest.Add(getResourceRequest(pod)) - expected.updateUsedPorts(pod, true) - expected.generation++ + pods := append(expected.Pods(), pod) + expected.SetPods(pods) + requestedResource := expected.RequestedResource() + newRequestedResource := &requestedResource + newRequestedResource.Add(getResourceRequest(pod)) + expected.SetRequestedResource(newRequestedResource) + nonZeroRequest := expected.NonZeroRequest() + newNonZeroRequest := &nonZeroRequest + newNonZeroRequest.Add(getResourceRequest(pod)) + expected.SetNonZeroRequest(newNonZeroRequest) + expected.UpdateUsedPorts(pod, true) + expected.SetGeneration(expected.GetGeneration() + 1) } return expected @@ -1060,42 +1056,46 @@ func TestNodeOperators(t *testing.T) { // Case 1: the node was added into cache successfully. got, found := cache.nodes[node.Name] if !found { - t.Errorf("Failed to find node %v in schedulercache.", node.Name) + t.Errorf("Failed to find node %v in schedulerinternalcache.", node.Name) } if cache.nodeTree.NumNodes != 1 || cache.nodeTree.Next() != node.Name { t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name) } // Generations are globally unique. We check in our unit tests that they are incremented correctly. - expected.generation = got.generation + expected.SetGeneration(got.GetGeneration()) if !reflect.DeepEqual(got, expected) { t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) } // Case 2: dump cached nodes successfully. - cachedNodes := map[string]*NodeInfo{} + cachedNodes := map[string]*schedulercache.NodeInfo{} cache.UpdateNodeNameToInfoMap(cachedNodes) newNode, found := cachedNodes[node.Name] if !found || len(cachedNodes) != 1 { t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) } - expected.generation = newNode.generation + expected.SetGeneration(newNode.GetGeneration()) if !reflect.DeepEqual(newNode, expected) { t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected) } // Case 3: update node attribute successfully. node.Status.Allocatable[v1.ResourceMemory] = mem50m - expected.allocatableResource.Memory = mem50m.Value() + allocatableResource := expected.AllocatableResource() + newAllocatableResource := &allocatableResource + newAllocatableResource.Memory = mem50m.Value() + expected.SetAllocatableResource(newAllocatableResource) + cache.UpdateNode(nil, node) got, found = cache.nodes[node.Name] if !found { t.Errorf("Failed to find node %v in schedulercache after UpdateNode.", node.Name) } - if got.generation <= expected.generation { - t.Errorf("generation is not incremented. got: %v, expected: %v", got.generation, expected.generation) + if got.GetGeneration() <= expected.GetGeneration() { + t.Errorf("Generation is not incremented. got: %v, expected: %v", got.GetGeneration(), expected.GetGeneration()) } - expected.generation = got.generation + expected.SetGeneration(got.GetGeneration()) if !reflect.DeepEqual(got, expected) { t.Errorf("Failed to update node in schedulercache:\n got: %+v \nexpected: %+v", got, expected) @@ -1132,7 +1132,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() for n := 0; n < b.N; n++ { - cachedNodes := map[string]*NodeInfo{} + cachedNodes := map[string]*schedulercache.NodeInfo{} cache.UpdateNodeNameToInfoMap(cachedNodes) } } diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/internal/cache/interface.go similarity index 96% rename from pkg/scheduler/cache/interface.go rename to pkg/scheduler/internal/cache/interface.go index 14aa485f89..878c2aa074 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -19,6 +19,7 @@ package cache import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) // PodFilter is a function to filter a pod. If pod passed return true else return false. @@ -99,7 +100,7 @@ type Cache interface { // UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache. // The node info contains aggregated information of pods scheduled (including assumed to be) // on this node. - UpdateNodeNameToInfoMap(infoMap map[string]*NodeInfo) error + UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error // List lists all cached pods (including assumed ones). List(labels.Selector) ([]*v1.Pod, error) @@ -117,5 +118,5 @@ type Cache interface { // Snapshot is a snapshot of cache state type Snapshot struct { AssumedPods map[string]bool - Nodes map[string]*NodeInfo + Nodes map[string]*schedulercache.NodeInfo } diff --git a/pkg/scheduler/cache/node_tree.go b/pkg/scheduler/internal/cache/node_tree.go similarity index 99% rename from pkg/scheduler/cache/node_tree.go rename to pkg/scheduler/internal/cache/node_tree.go index 4a8e08d22a..5dbd49da81 100644 --- a/pkg/scheduler/cache/node_tree.go +++ b/pkg/scheduler/internal/cache/node_tree.go @@ -59,6 +59,7 @@ func (na *nodeArray) next() (nodeName string, exhausted bool) { return nodeName, false } +// newNodeTree creates a NodeTree from nodes. func newNodeTree(nodes []*v1.Node) *NodeTree { nt := &NodeTree{ tree: make(map[string]*nodeArray), diff --git a/pkg/scheduler/cache/node_tree_test.go b/pkg/scheduler/internal/cache/node_tree_test.go similarity index 100% rename from pkg/scheduler/cache/node_tree_test.go rename to pkg/scheduler/internal/cache/node_tree_test.go diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 165c7ba815..51250217e3 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -31,9 +31,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" - schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core/equivalence" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" @@ -75,7 +75,7 @@ func (sched *Scheduler) StopEverything() { } // Cache returns the cache in scheduler for test to check the data in scheduler. -func (sched *Scheduler) Cache() schedulercache.Cache { +func (sched *Scheduler) Cache() schedulerinternalcache.Cache { return sched.config.SchedulerCache } @@ -110,7 +110,7 @@ type Configurator interface { type Config struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. - SchedulerCache schedulercache.Cache + SchedulerCache schedulerinternalcache.Cache // Ecache is used for optimistically invalid affected cache items after // successfully binding a pod Ecache *equivalence.Cache diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index a5158bb371..3bd608b9cd 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -38,8 +38,8 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/api" - schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "k8s.io/kubernetes/pkg/scheduler/core" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -265,7 +265,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { stop := make(chan struct{}) defer close(stop) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := schedulercache.New(100*time.Millisecond, stop) + scache := schedulerinternalcache.New(100*time.Millisecond, stop) pod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) @@ -323,7 +323,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { stop := make(chan struct{}) defer close(stop) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := schedulercache.New(10*time.Minute, stop) + scache := schedulerinternalcache.New(10*time.Minute, stop) firstPod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) @@ -410,7 +410,7 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := schedulercache.New(test.CacheTTL, stop) + scache := schedulerinternalcache.New(test.CacheTTL, stop) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) @@ -446,7 +446,7 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. -func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, +func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) { scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) @@ -478,7 +478,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { stop := make(chan struct{}) defer close(stop) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := schedulercache.New(10*time.Minute, stop) + scache := schedulerinternalcache.New(10*time.Minute, stop) // Design the baseline for the pods, and we will make nodes that dont fit it later. var cpu = int64(4) @@ -549,7 +549,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, nil, @@ -600,7 +600,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. return sched, bindingChan, errChan } -func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { +func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { algo := core.NewGenericScheduler( scache, nil, @@ -656,7 +656,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode}) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore.Add(podWithID("foo", "")) - scache := schedulercache.New(10*time.Minute, stop) + scache := schedulerinternalcache.New(10*time.Minute, stop) scache.AddNode(&testNode) predicateMap := map[string]algorithm.FitPredicate{ diff --git a/pkg/scheduler/testing/BUILD b/pkg/scheduler/testing/BUILD index d1fa65c61d..a5b353f2e3 100644 --- a/pkg/scheduler/testing/BUILD +++ b/pkg/scheduler/testing/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/apis/core/install:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/cache:go_default_library", + "//pkg/scheduler/internal/cache:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index c7a4d1bfde..91a1402a1d 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -20,6 +20,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) // FakeCache is used for testing @@ -82,14 +83,14 @@ func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.N func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil } // FilteredList is a fake method for testing. -func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { +func (f *FakeCache) FilteredList(filter schedulerinternalcache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { return nil, nil } // Snapshot is a fake method for testing -func (f *FakeCache) Snapshot() *schedulercache.Snapshot { - return &schedulercache.Snapshot{} +func (f *FakeCache) Snapshot() *schedulerinternalcache.Snapshot { + return &schedulerinternalcache.Snapshot{} } // NodeTree is a fake method for testing. -func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil } +func (f *FakeCache) NodeTree() *schedulerinternalcache.NodeTree { return nil } diff --git a/pkg/scheduler/testing/fake_lister.go b/pkg/scheduler/testing/fake_lister.go index 8468aa25b6..5fb3652597 100644 --- a/pkg/scheduler/testing/fake_lister.go +++ b/pkg/scheduler/testing/fake_lister.go @@ -26,7 +26,7 @@ import ( "k8s.io/apimachinery/pkg/labels" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/kubernetes/pkg/scheduler/algorithm" - schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" + schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) var _ algorithm.NodeLister = &FakeNodeLister{} @@ -55,7 +55,7 @@ func (f FakePodLister) List(s labels.Selector) (selected []*v1.Pod, err error) { } // FilteredList returns pods matching a pod filter and a label selector. -func (f FakePodLister) FilteredList(podFilter schedulercache.PodFilter, s labels.Selector) (selected []*v1.Pod, err error) { +func (f FakePodLister) FilteredList(podFilter schedulerinternalcache.PodFilter, s labels.Selector) (selected []*v1.Pod, err error) { for _, pod := range f { if podFilter(pod) && s.Matches(labels.Set(pod.Labels)) { selected = append(selected, pod)