Merge pull request #68968 from wgliang/scheduler2/cleanup-cache

[scheduler cleanup phase 1]:Move scheduler cache interface and implementation to pkg/scheduler/in…
pull/58/head
k8s-ci-robot 2018-10-06 07:01:57 -07:00 committed by GitHub
commit c63462ada0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 470 additions and 336 deletions

View File

@ -71,6 +71,7 @@
"k8s.io/kubernetes/pkg/scheduler/algorithm", "k8s.io/kubernetes/pkg/scheduler/algorithm",
"k8s.io/kubernetes/pkg/scheduler/api", "k8s.io/kubernetes/pkg/scheduler/api",
"k8s.io/kubernetes/pkg/scheduler/cache", "k8s.io/kubernetes/pkg/scheduler/cache",
"k8s.io/kubernetes/pkg/scheduler/internal/cache",
"k8s.io/kubernetes/pkg/scheduler/util", "k8s.io/kubernetes/pkg/scheduler/util",
"k8s.io/kubernetes/pkg/security/apparmor", "k8s.io/kubernetes/pkg/security/apparmor",
"k8s.io/kubernetes/pkg/serviceaccount", "k8s.io/kubernetes/pkg/serviceaccount",

View File

@ -122,6 +122,7 @@
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util", "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util",
"k8s.io/kubernetes/pkg/scheduler/api", "k8s.io/kubernetes/pkg/scheduler/api",
"k8s.io/kubernetes/pkg/scheduler/cache", "k8s.io/kubernetes/pkg/scheduler/cache",
"k8s.io/kubernetes/pkg/scheduler/internal/cache",
"k8s.io/kubernetes/pkg/scheduler/util", "k8s.io/kubernetes/pkg/scheduler/util",
"k8s.io/kubernetes/pkg/scheduler/volumebinder", "k8s.io/kubernetes/pkg/scheduler/volumebinder",
"k8s.io/kubernetes/pkg/security/apparmor", "k8s.io/kubernetes/pkg/security/apparmor",

View File

@ -13,9 +13,9 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
@ -42,8 +42,8 @@ go_test(
"//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
@ -77,6 +77,7 @@ filegroup(
"//pkg/scheduler/cache:all-srcs", "//pkg/scheduler/cache:all-srcs",
"//pkg/scheduler/core:all-srcs", "//pkg/scheduler/core:all-srcs",
"//pkg/scheduler/factory:all-srcs", "//pkg/scheduler/factory:all-srcs",
"//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs", "//pkg/scheduler/internal/queue:all-srcs",
"//pkg/scheduler/metrics:all-srcs", "//pkg/scheduler/metrics:all-srcs",
"//pkg/scheduler/testing:all-srcs", "//pkg/scheduler/testing:all-srcs",

View File

@ -19,6 +19,7 @@ go_library(
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library",

View File

@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
) )
// NodeFieldSelectorKeys is a map that: the key are node field selector keys; the values are // NodeFieldSelectorKeys is a map that: the key are node field selector keys; the values are
@ -98,7 +99,7 @@ type PodLister interface {
List(labels.Selector) ([]*v1.Pod, error) List(labels.Selector) ([]*v1.Pod, error)
// This is similar to "List()", but the returned slice does not // This is similar to "List()", but the returned slice does not
// contain pods that don't pass `podFilter`. // contain pods that don't pass `podFilter`.
FilteredList(podFilter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) FilteredList(podFilter schedulerinternalcache.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
} }
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. // ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.

View File

@ -3,26 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"cache.go",
"interface.go",
"node_info.go", "node_info.go",
"node_tree.go",
"util.go", "util.go",
], ],
importpath = "k8s.io/kubernetes/pkg/scheduler/cache", importpath = "k8s.io/kubernetes/pkg/scheduler/cache",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
], ],
) )
@ -30,24 +22,17 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"cache_test.go",
"node_info_test.go", "node_info_test.go",
"node_tree_test.go",
"util_test.go", "util_test.go",
], ],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
], ],
) )

View File

@ -36,6 +36,14 @@ var (
generation int64 generation int64
) )
// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
// Size of the image
Size int64
// Used to track how many nodes have this image
NumNodes int
}
// NodeInfo is node level aggregated information. // NodeInfo is node level aggregated information.
type NodeInfo struct { type NodeInfo struct {
// Overall node information. // Overall node information.
@ -66,7 +74,7 @@ type NodeInfo struct {
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
// scheduling cycle. // scheduling cycle.
// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities. // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
TransientInfo *transientSchedulerInfo TransientInfo *TransientSchedulerInfo
// Cached conditions of node for faster lookup. // Cached conditions of node for faster lookup.
memoryPressureCondition v1.ConditionStatus memoryPressureCondition v1.ConditionStatus
@ -99,28 +107,28 @@ type nodeTransientInfo struct {
RequestedVolumes int RequestedVolumes int
} }
// transientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle. // TransientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle.
// It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and // It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and
// priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization // priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization
// on node etc. // on node etc.
// IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure // IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure
// only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle. // only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle.
type transientSchedulerInfo struct { type TransientSchedulerInfo struct {
TransientLock sync.Mutex TransientLock sync.Mutex
// NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here. // NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here.
TransNodeInfo nodeTransientInfo TransNodeInfo nodeTransientInfo
} }
// newTransientSchedulerInfo returns a new scheduler transient structure with initialized values. // NewTransientSchedulerInfo returns a new scheduler transient structure with initialized values.
func newTransientSchedulerInfo() *transientSchedulerInfo { func NewTransientSchedulerInfo() *TransientSchedulerInfo {
tsi := &transientSchedulerInfo{ tsi := &TransientSchedulerInfo{
TransNodeInfo: initializeNodeTransientInfo(), TransNodeInfo: initializeNodeTransientInfo(),
} }
return tsi return tsi
} }
// resetTransientSchedulerInfo resets the transientSchedulerInfo. // ResetTransientSchedulerInfo resets the TransientSchedulerInfo.
func (transientSchedInfo *transientSchedulerInfo) resetTransientSchedulerInfo() { func (transientSchedInfo *TransientSchedulerInfo) ResetTransientSchedulerInfo() {
transientSchedInfo.TransientLock.Lock() transientSchedInfo.TransientLock.Lock()
defer transientSchedInfo.TransientLock.Unlock() defer transientSchedInfo.TransientLock.Unlock()
// Reset TransientNodeInfo. // Reset TransientNodeInfo.
@ -259,7 +267,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
requestedResource: &Resource{}, requestedResource: &Resource{},
nonzeroRequest: &Resource{}, nonzeroRequest: &Resource{},
allocatableResource: &Resource{}, allocatableResource: &Resource{},
TransientInfo: newTransientSchedulerInfo(), TransientInfo: NewTransientSchedulerInfo(),
generation: nextGeneration(), generation: nextGeneration(),
usedPorts: make(util.HostPortInfo), usedPorts: make(util.HostPortInfo),
imageStates: make(map[string]*ImageStateSummary), imageStates: make(map[string]*ImageStateSummary),
@ -286,6 +294,11 @@ func (n *NodeInfo) Pods() []*v1.Pod {
return n.pods return n.pods
} }
// SetPods sets all pods scheduled (including assumed to be) on this node.
func (n *NodeInfo) SetPods(pods []*v1.Pod) {
n.pods = pods
}
// UsedPorts returns used ports on this node. // UsedPorts returns used ports on this node.
func (n *NodeInfo) UsedPorts() util.HostPortInfo { func (n *NodeInfo) UsedPorts() util.HostPortInfo {
if n == nil { if n == nil {
@ -294,6 +307,11 @@ func (n *NodeInfo) UsedPorts() util.HostPortInfo {
return n.usedPorts return n.usedPorts
} }
// SetUsedPorts sets the used ports on this node.
func (n *NodeInfo) SetUsedPorts(newUsedPorts util.HostPortInfo) {
n.usedPorts = newUsedPorts
}
// ImageStates returns the state information of all images. // ImageStates returns the state information of all images.
func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary { func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary {
if n == nil { if n == nil {
@ -302,6 +320,11 @@ func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary {
return n.imageStates return n.imageStates
} }
// SetImageStates sets the state information of all images.
func (n *NodeInfo) SetImageStates(newImageStates map[string]*ImageStateSummary) {
n.imageStates = newImageStates
}
// PodsWithAffinity return all pods with (anti)affinity constraints on this node. // PodsWithAffinity return all pods with (anti)affinity constraints on this node.
func (n *NodeInfo) PodsWithAffinity() []*v1.Pod { func (n *NodeInfo) PodsWithAffinity() []*v1.Pod {
if n == nil { if n == nil {
@ -326,6 +349,11 @@ func (n *NodeInfo) Taints() ([]v1.Taint, error) {
return n.taints, n.taintsErr return n.taints, n.taintsErr
} }
// SetTaints sets the taints list on this node.
func (n *NodeInfo) SetTaints(newTaints []v1.Taint) {
n.taints = newTaints
}
// MemoryPressureCondition returns the memory pressure condition status on this node. // MemoryPressureCondition returns the memory pressure condition status on this node.
func (n *NodeInfo) MemoryPressureCondition() v1.ConditionStatus { func (n *NodeInfo) MemoryPressureCondition() v1.ConditionStatus {
if n == nil { if n == nil {
@ -358,6 +386,11 @@ func (n *NodeInfo) RequestedResource() Resource {
return *n.requestedResource return *n.requestedResource
} }
// SetRequestedResource sets the aggregated resource request of pods on this node.
func (n *NodeInfo) SetRequestedResource(newResource *Resource) {
n.requestedResource = newResource
}
// NonZeroRequest returns aggregated nonzero resource request of pods on this node. // NonZeroRequest returns aggregated nonzero resource request of pods on this node.
func (n *NodeInfo) NonZeroRequest() Resource { func (n *NodeInfo) NonZeroRequest() Resource {
if n == nil { if n == nil {
@ -366,6 +399,11 @@ func (n *NodeInfo) NonZeroRequest() Resource {
return *n.nonzeroRequest return *n.nonzeroRequest
} }
// SetNonZeroRequest sets the aggregated nonzero resource request of pods on this node.
func (n *NodeInfo) SetNonZeroRequest(newResource *Resource) {
n.nonzeroRequest = newResource
}
// AllocatableResource returns allocatable resources on a given node. // AllocatableResource returns allocatable resources on a given node.
func (n *NodeInfo) AllocatableResource() Resource { func (n *NodeInfo) AllocatableResource() Resource {
if n == nil { if n == nil {
@ -380,6 +418,19 @@ func (n *NodeInfo) SetAllocatableResource(allocatableResource *Resource) {
n.generation = nextGeneration() n.generation = nextGeneration()
} }
// GetGeneration returns the generation on this node.
func (n *NodeInfo) GetGeneration() int64 {
if n == nil {
return 0
}
return n.generation
}
// SetGeneration sets the generation on this node. This is for testing only.
func (n *NodeInfo) SetGeneration(newGeneration int64) {
n.generation = newGeneration
}
// Clone returns a copy of this node. // Clone returns a copy of this node.
func (n *NodeInfo) Clone() *NodeInfo { func (n *NodeInfo) Clone() *NodeInfo {
clone := &NodeInfo{ clone := &NodeInfo{
@ -464,20 +515,20 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
} }
// Consume ports when pods added. // Consume ports when pods added.
n.updateUsedPorts(pod, true) n.UpdateUsedPorts(pod, true)
n.generation = nextGeneration() n.generation = nextGeneration()
} }
// RemovePod subtracts pod information from this NodeInfo. // RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error { func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
k1, err := getPodKey(pod) k1, err := GetPodKey(pod)
if err != nil { if err != nil {
return err return err
} }
for i := range n.podsWithAffinity { for i := range n.podsWithAffinity {
k2, err := getPodKey(n.podsWithAffinity[i]) k2, err := GetPodKey(n.podsWithAffinity[i])
if err != nil { if err != nil {
glog.Errorf("Cannot get pod key, err: %v", err) glog.Errorf("Cannot get pod key, err: %v", err)
continue continue
@ -490,7 +541,7 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
} }
} }
for i := range n.pods { for i := range n.pods {
k2, err := getPodKey(n.pods[i]) k2, err := GetPodKey(n.pods[i])
if err != nil { if err != nil {
glog.Errorf("Cannot get pod key, err: %v", err) glog.Errorf("Cannot get pod key, err: %v", err)
continue continue
@ -515,7 +566,7 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
n.nonzeroRequest.Memory -= non0Mem n.nonzeroRequest.Memory -= non0Mem
// Release ports when remove Pods. // Release ports when remove Pods.
n.updateUsedPorts(pod, false) n.UpdateUsedPorts(pod, false)
n.generation = nextGeneration() n.generation = nextGeneration()
@ -539,7 +590,8 @@ func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64)
return return
} }
func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) { // UpdateUsedPorts updates the UsedPorts of NodeInfo.
func (n *NodeInfo) UpdateUsedPorts(pod *v1.Pod, add bool) {
for j := range pod.Spec.Containers { for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j] container := &pod.Spec.Containers[j]
for k := range container.Ports { for k := range container.Ports {
@ -573,7 +625,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
// We ignore other conditions. // We ignore other conditions.
} }
} }
n.TransientInfo = newTransientSchedulerInfo() n.TransientInfo = NewTransientSchedulerInfo()
n.generation = nextGeneration() n.generation = nextGeneration()
return nil return nil
} }
@ -614,9 +666,9 @@ func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod {
continue continue
} }
// If pod is on the given node, add it to 'filtered' only if it is present in nodeInfo. // If pod is on the given node, add it to 'filtered' only if it is present in nodeInfo.
podKey, _ := getPodKey(p) podKey, _ := GetPodKey(p)
for _, np := range n.Pods() { for _, np := range n.Pods() {
npodkey, _ := getPodKey(np) npodkey, _ := GetPodKey(np)
if npodkey == podKey { if npodkey == podKey {
filtered = append(filtered, p) filtered = append(filtered, p)
break break
@ -626,8 +678,8 @@ func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod {
return filtered return filtered
} }
// getPodKey returns the string key of a pod. // GetPodKey returns the string key of a pod.
func getPodKey(pod *v1.Pod) (string, error) { func GetPodKey(pod *v1.Pod) (string, error) {
uid := string(pod.UID) uid := string(pod.UID)
if len(uid) == 0 { if len(uid) == 0 {
return "", errors.New("Cannot get cache key for pod with empty UID") return "", errors.New("Cannot get cache key for pod with empty UID")

View File

@ -19,6 +19,7 @@ package cache
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -240,6 +241,43 @@ func TestSetMaxResource(t *testing.T) {
} }
} }
type testingMode interface {
Fatalf(format string, args ...interface{})
}
func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort) *v1.Pod {
req := v1.ResourceList{}
if cpu != "" {
req = v1.ResourceList{
v1.ResourceCPU: resource.MustParse(cpu),
v1.ResourceMemory: resource.MustParse(mem),
}
if extended != "" {
parts := strings.Split(extended, ":")
if len(parts) != 2 {
t.Fatalf("Invalid extended resource string: \"%s\"", extended)
}
req[v1.ResourceName(parts[0])] = resource.MustParse(parts[1])
}
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(objName),
Namespace: "node_info_cache_test",
Name: objName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Resources: v1.ResourceRequirements{
Requests: req,
},
Ports: ports,
}},
NodeName: nodeName,
},
}
}
func TestNewNodeInfo(t *testing.T) { func TestNewNodeInfo(t *testing.T) {
nodeName := "test-node" nodeName := "test-node"
pods := []*v1.Pod{ pods := []*v1.Pod{
@ -262,7 +300,7 @@ func TestNewNodeInfo(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: newTransientSchedulerInfo(), TransientInfo: NewTransientSchedulerInfo(),
allocatableResource: &Resource{}, allocatableResource: &Resource{},
generation: 2, generation: 2,
usedPorts: util.HostPortInfo{ usedPorts: util.HostPortInfo{
@ -351,7 +389,7 @@ func TestNodeInfoClone(t *testing.T) {
nodeInfo: &NodeInfo{ nodeInfo: &NodeInfo{
requestedResource: &Resource{}, requestedResource: &Resource{},
nonzeroRequest: &Resource{}, nonzeroRequest: &Resource{},
TransientInfo: newTransientSchedulerInfo(), TransientInfo: NewTransientSchedulerInfo(),
allocatableResource: &Resource{}, allocatableResource: &Resource{},
generation: 2, generation: 2,
usedPorts: util.HostPortInfo{ usedPorts: util.HostPortInfo{
@ -421,7 +459,7 @@ func TestNodeInfoClone(t *testing.T) {
expected: &NodeInfo{ expected: &NodeInfo{
requestedResource: &Resource{}, requestedResource: &Resource{},
nonzeroRequest: &Resource{}, nonzeroRequest: &Resource{},
TransientInfo: newTransientSchedulerInfo(), TransientInfo: NewTransientSchedulerInfo(),
allocatableResource: &Resource{}, allocatableResource: &Resource{},
generation: 2, generation: 2,
usedPorts: util.HostPortInfo{ usedPorts: util.HostPortInfo{
@ -580,7 +618,7 @@ func TestNodeInfoAddPod(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: newTransientSchedulerInfo(), TransientInfo: NewTransientSchedulerInfo(),
allocatableResource: &Resource{}, allocatableResource: &Resource{},
generation: 2, generation: 2,
usedPorts: util.HostPortInfo{ usedPorts: util.HostPortInfo{
@ -699,7 +737,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: newTransientSchedulerInfo(), TransientInfo: NewTransientSchedulerInfo(),
allocatableResource: &Resource{}, allocatableResource: &Resource{},
generation: 2, generation: 2,
usedPorts: util.HostPortInfo{ usedPorts: util.HostPortInfo{
@ -816,7 +854,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
AllowedPodNumber: 0, AllowedPodNumber: 0,
ScalarResources: map[v1.ResourceName]int64(nil), ScalarResources: map[v1.ResourceName]int64(nil),
}, },
TransientInfo: newTransientSchedulerInfo(), TransientInfo: NewTransientSchedulerInfo(),
allocatableResource: &Resource{}, allocatableResource: &Resource{},
generation: 3, generation: 3,
usedPorts: util.HostPortInfo{ usedPorts: util.HostPortInfo{
@ -865,7 +903,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
err := ni.RemovePod(test.pod) err := ni.RemovePod(test.pod)
if err != nil { if err != nil {
if test.errExpected { if test.errExpected {
expectedErrorMsg := fmt.Errorf("no corresponding pod %s in pods of node %s", test.pod.Name, ni.node.Name) expectedErrorMsg := fmt.Errorf("no corresponding pod %s in pods of node %s", test.pod.Name, ni.Node().Name)
if expectedErrorMsg == err { if expectedErrorMsg == err {
t.Errorf("expected error: %v, got: %v", expectedErrorMsg, err) t.Errorf("expected error: %v, got: %v", expectedErrorMsg, err)
} }
@ -887,10 +925,10 @@ func TestNodeInfoRemovePod(t *testing.T) {
func fakeNodeInfo(pods ...*v1.Pod) *NodeInfo { func fakeNodeInfo(pods ...*v1.Pod) *NodeInfo {
ni := NewNodeInfo(pods...) ni := NewNodeInfo(pods...)
ni.node = &v1.Node{ ni.SetNode(&v1.Node{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-node", Name: "test-node",
}, },
} })
return ni return ni
} }

View File

@ -1,39 +1,4 @@
package(default_visibility = ["//visibility:public"]) load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"extender_test.go",
"generic_scheduler_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
go_library( go_library(
name = "go_default_library", name = "go_default_library",
@ -42,12 +7,14 @@ go_library(
"generic_scheduler.go", "generic_scheduler.go",
], ],
importpath = "k8s.io/kubernetes/pkg/scheduler/core", importpath = "k8s.io/kubernetes/pkg/scheduler/core",
visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
@ -67,6 +34,36 @@ go_library(
], ],
) )
go_test(
name = "go_default_test",
srcs = [
"extender_test.go",
"generic_scheduler_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
filegroup( filegroup(
name = "package-srcs", name = "package-srcs",
srcs = glob(["**"]), srcs = glob(["**"]),
@ -81,4 +78,5 @@ filegroup(
"//pkg/scheduler/core/equivalence:all-srcs", "//pkg/scheduler/core/equivalence:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"],
) )

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
@ -498,7 +499,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders { for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii]) extenders = append(extenders, &test.extenders[ii])
} }
cache := schedulercache.New(time.Duration(0), wait.NeverStop) cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop)
for _, name := range test.nodes { for _, name := range test.nodes {
cache.AddNode(createNode(name)) cache.AddNode(createNode(name))
} }

View File

@ -41,6 +41,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence" "k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
@ -95,7 +96,7 @@ func (f *FitError) Error() string {
} }
type genericScheduler struct { type genericScheduler struct {
cache schedulercache.Cache cache schedulerinternalcache.Cache
equivalenceCache *equivalence.Cache equivalenceCache *equivalence.Cache
schedulingQueue internalqueue.SchedulingQueue schedulingQueue internalqueue.SchedulingQueue
predicates map[string]algorithm.FitPredicate predicates map[string]algorithm.FitPredicate
@ -1139,7 +1140,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
// NewGenericScheduler creates a genericScheduler object. // NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler( func NewGenericScheduler(
cache schedulercache.Cache, cache schedulerinternalcache.Cache,
eCache *equivalence.Cache, eCache *equivalence.Cache,
podQueue internalqueue.SchedulingQueue, podQueue internalqueue.SchedulingQueue,
predicates map[string]algorithm.FitPredicate, predicates map[string]algorithm.FitPredicate,

View File

@ -41,6 +41,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence" "k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
) )
@ -433,7 +434,7 @@ func TestGenericScheduler(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
cache := schedulercache.New(time.Duration(0), wait.NeverStop) cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop)
for _, pod := range test.pods { for _, pod := range test.pods {
cache.AddPod(pod) cache.AddPod(pod)
} }
@ -475,7 +476,7 @@ func TestGenericScheduler(t *testing.T) {
// makeScheduler makes a simple genericScheduler for testing. // makeScheduler makes a simple genericScheduler for testing.
func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Node) *genericScheduler { func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Node) *genericScheduler {
algorithmpredicates.SetPredicatesOrdering(order) algorithmpredicates.SetPredicatesOrdering(order)
cache := schedulercache.New(time.Duration(0), wait.NeverStop) cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes { for _, n := range nodes {
cache.AddNode(n) cache.AddNode(n)
} }
@ -1382,7 +1383,7 @@ func TestPreempt(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
stop := make(chan struct{}) stop := make(chan struct{})
cache := schedulercache.New(time.Duration(0), stop) cache := schedulerinternalcache.New(time.Duration(0), stop)
for _, pod := range test.pods { for _, pod := range test.pods {
cache.AddPod(pod) cache.AddPod(pod)
} }
@ -1460,7 +1461,7 @@ func TestPreempt(t *testing.T) {
// syncingMockCache delegates method calls to an actual Cache, // syncingMockCache delegates method calls to an actual Cache,
// but calls to UpdateNodeNameToInfoMap synchronize with the test. // but calls to UpdateNodeNameToInfoMap synchronize with the test.
type syncingMockCache struct { type syncingMockCache struct {
schedulercache.Cache schedulerinternalcache.Cache
cycleStart, cacheInvalidated chan struct{} cycleStart, cacheInvalidated chan struct{}
once sync.Once once sync.Once
} }
@ -1498,7 +1499,7 @@ func TestCacheInvalidationRace(t *testing.T) {
} }
// Set up the mock cache. // Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop) cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop)
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
cache.AddNode(testNode) cache.AddNode(testNode)
mockCache := &syncingMockCache{ mockCache := &syncingMockCache{
@ -1586,7 +1587,7 @@ func TestCacheInvalidationRace2(t *testing.T) {
} }
// Set up the mock cache. // Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop) cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop)
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
cache.AddNode(testNode) cache.AddNode(testNode)

View File

@ -1,10 +1,4 @@
package(default_visibility = ["//visibility:public"]) load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library( go_library(
name = "go_default_library", name = "go_default_library",
@ -16,6 +10,7 @@ go_library(
"signal_windows.go", "signal_windows.go",
], ],
importpath = "k8s.io/kubernetes/pkg/scheduler/factory", importpath = "k8s.io/kubernetes/pkg/scheduler/factory",
visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core/helper:go_default_library", "//pkg/apis/core/helper:go_default_library",
@ -30,6 +25,7 @@ go_library(
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library", "//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
@ -103,4 +99,5 @@ filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [":package-srcs"],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"],
) )

View File

@ -25,15 +25,15 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
) )
type cacheComparer struct { type cacheComparer struct {
nodeLister corelisters.NodeLister nodeLister corelisters.NodeLister
podLister corelisters.PodLister podLister corelisters.PodLister
cache schedulercache.Cache cache schedulerinternalcache.Cache
podQueue internalqueue.SchedulingQueue podQueue internalqueue.SchedulingQueue
compareStrategy compareStrategy
} }

View File

@ -57,9 +57,9 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/api/validation" "k8s.io/kubernetes/pkg/scheduler/api/validation"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence" "k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -111,7 +111,7 @@ type configFactory struct {
scheduledPodsHasSynced cache.InformerSynced scheduledPodsHasSynced cache.InformerSynced
schedulerCache schedulercache.Cache schedulerCache schedulerinternalcache.Cache
// SchedulerName of a scheduler is used to select which pods will be // SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's "spec.schedulerName". // processed by this scheduler, based on pods's "spec.schedulerName".
@ -166,7 +166,7 @@ type ConfigFactoryArgs struct {
// return the interface. // return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator { func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
stopEverything := make(chan struct{}) stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything) schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate // storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister var storageClassLister storagelisters.StorageClassLister

59
pkg/scheduler/internal/cache/BUILD vendored Normal file
View File

@ -0,0 +1,59 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cache.go",
"interface.go",
"node_tree.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/cache",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"cache_test.go",
"node_tree_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -57,7 +58,7 @@ type schedulerCache struct {
assumedPods map[string]bool assumedPods map[string]bool
// a map from pod key to podState. // a map from pod key to podState.
podStates map[string]*podState podStates map[string]*podState
nodes map[string]*NodeInfo nodes map[string]*schedulercache.NodeInfo
nodeTree *NodeTree nodeTree *NodeTree
// A map from image name to its imageState. // A map from image name to its imageState.
imageStates map[string]*imageState imageStates map[string]*imageState
@ -78,17 +79,9 @@ type imageState struct {
nodes sets.String nodes sets.String
} }
// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
// Size of the image
Size int64
// Used to track how many nodes have this image
NumNodes int
}
// createImageStateSummary returns a summarizing snapshot of the given image's state. // createImageStateSummary returns a summarizing snapshot of the given image's state.
func (cache *schedulerCache) createImageStateSummary(state *imageState) *ImageStateSummary { func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulercache.ImageStateSummary {
return &ImageStateSummary{ return &schedulercache.ImageStateSummary{
Size: state.size, Size: state.size,
NumNodes: len(state.nodes), NumNodes: len(state.nodes),
} }
@ -100,7 +93,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
period: period, period: period,
stop: stop, stop: stop,
nodes: make(map[string]*NodeInfo), nodes: make(map[string]*schedulercache.NodeInfo),
nodeTree: newNodeTree(nil), nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool), assumedPods: make(map[string]bool),
podStates: make(map[string]*podState), podStates: make(map[string]*podState),
@ -108,13 +101,13 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
} }
} }
// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact, // Snapshot takes a snapshot of the current schedulerinternalcache. The method has performance impact,
// and should be only used in non-critical path. // and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot { func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.RLock() cache.mu.RLock()
defer cache.mu.RUnlock() defer cache.mu.RUnlock()
nodes := make(map[string]*NodeInfo) nodes := make(map[string]*schedulercache.NodeInfo)
for k, v := range cache.nodes { for k, v := range cache.nodes {
nodes[k] = v.Clone() nodes[k] = v.Clone()
} }
@ -130,16 +123,16 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
} }
} }
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*schedulercache.NodeInfo) error {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
for name, info := range cache.nodes { for name, info := range cache.nodes {
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
// Transient scheduler info is reset here. // Transient scheduler info is reset here.
info.TransientInfo.resetTransientSchedulerInfo() info.TransientInfo.ResetTransientSchedulerInfo()
} }
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation { if current, ok := nodeNameToInfo[name]; !ok || current.GetGeneration() != info.GetGeneration() {
nodeNameToInfo[name] = info.Clone() nodeNameToInfo[name] = info.Clone()
} }
} }
@ -164,11 +157,11 @@ func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.S
// pre-allocating capacity. // pre-allocating capacity.
maxSize := 0 maxSize := 0
for _, info := range cache.nodes { for _, info := range cache.nodes {
maxSize += len(info.pods) maxSize += len(info.Pods())
} }
pods := make([]*v1.Pod, 0, maxSize) pods := make([]*v1.Pod, 0, maxSize)
for _, info := range cache.nodes { for _, info := range cache.nodes {
for _, pod := range info.pods { for _, pod := range info.Pods() {
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod) pods = append(pods, pod)
} }
@ -178,7 +171,7 @@ func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.S
} }
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := getPodKey(pod) key, err := schedulercache.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
} }
@ -204,7 +197,7 @@ func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
// finishBinding exists to make tests determinitistic by injecting now as an argument // finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := getPodKey(pod) key, err := schedulercache.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
} }
@ -223,7 +216,7 @@ func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
} }
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
key, err := getPodKey(pod) key, err := schedulercache.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
} }
@ -255,7 +248,7 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
func (cache *schedulerCache) addPod(pod *v1.Pod) { func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName] n, ok := cache.nodes[pod.Spec.NodeName]
if !ok { if !ok {
n = NewNodeInfo() n = schedulercache.NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n cache.nodes[pod.Spec.NodeName] = n
} }
n.AddPod(pod) n.AddPod(pod)
@ -276,14 +269,14 @@ func (cache *schedulerCache) removePod(pod *v1.Pod) error {
if err := n.RemovePod(pod); err != nil { if err := n.RemovePod(pod); err != nil {
return err return err
} }
if len(n.pods) == 0 && n.node == nil { if len(n.Pods()) == 0 && n.Node() == nil {
delete(cache.nodes, pod.Spec.NodeName) delete(cache.nodes, pod.Spec.NodeName)
} }
return nil return nil
} }
func (cache *schedulerCache) AddPod(pod *v1.Pod) error { func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := getPodKey(pod) key, err := schedulercache.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
} }
@ -318,7 +311,7 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
} }
func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
key, err := getPodKey(oldPod) key, err := schedulercache.GetPodKey(oldPod)
if err != nil { if err != nil {
return err return err
} }
@ -346,7 +339,7 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
} }
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
key, err := getPodKey(pod) key, err := schedulercache.GetPodKey(pod)
if err != nil { if err != nil {
return err return err
} }
@ -375,7 +368,7 @@ func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
} }
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
key, err := getPodKey(pod) key, err := schedulercache.GetPodKey(pod)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -391,7 +384,7 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
} }
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
key, err := getPodKey(pod) key, err := schedulercache.GetPodKey(pod)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -413,10 +406,10 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
n, ok := cache.nodes[node.Name] n, ok := cache.nodes[node.Name]
if !ok { if !ok {
n = NewNodeInfo() n = schedulercache.NewNodeInfo()
cache.nodes[node.Name] = n cache.nodes[node.Name] = n
} else { } else {
cache.removeNodeImageStates(n.node) cache.removeNodeImageStates(n.Node())
} }
cache.nodeTree.AddNode(node) cache.nodeTree.AddNode(node)
@ -430,10 +423,10 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
n, ok := cache.nodes[newNode.Name] n, ok := cache.nodes[newNode.Name]
if !ok { if !ok {
n = NewNodeInfo() n = schedulercache.NewNodeInfo()
cache.nodes[newNode.Name] = n cache.nodes[newNode.Name] = n
} else { } else {
cache.removeNodeImageStates(n.node) cache.removeNodeImageStates(n.Node())
} }
cache.nodeTree.UpdateNode(oldNode, newNode) cache.nodeTree.UpdateNode(oldNode, newNode)
@ -453,7 +446,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
// We can't do it unconditionally, because notifications about pods are delivered // We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though // in a different watch, and thus can potentially be observed later, even though
// they happened before node removal. // they happened before node removal.
if len(n.pods) == 0 && n.node == nil { if len(n.Pods()) == 0 && n.Node() == nil {
delete(cache.nodes, node.Name) delete(cache.nodes, node.Name)
} }
@ -464,8 +457,8 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired. // scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInfo) { func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulercache.NodeInfo) {
newSum := make(map[string]*ImageStateSummary) newSum := make(map[string]*schedulercache.ImageStateSummary)
for _, image := range node.Status.Images { for _, image := range node.Status.Images {
for _, name := range image.Names { for _, name := range image.Names {
@ -486,7 +479,7 @@ func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInf
} }
} }
} }
nodeInfo.imageStates = newSum nodeInfo.SetImageStates(newSum)
} }
// removeNodeImageStates removes the given node record from image entries having the node // removeNodeImageStates removes the given node record from image entries having the node

View File

@ -31,13 +31,18 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/util"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
) )
func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *NodeInfo) { func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *schedulercache.NodeInfo) {
// Ignore generation field. // Ignore generation field.
if actual != nil { if actual != nil {
actual.generation = 0 actual.SetGeneration(0)
}
if expected != nil {
expected.SetGeneration(0)
} }
if !reflect.DeepEqual(actual, expected) { if !reflect.DeepEqual(actual, expected) {
t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected) t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected)
@ -70,6 +75,21 @@ func (b *hostPortInfoBuilder) build() schedutil.HostPortInfo {
return res return res
} }
func newNodeInfo(requestedResource *schedulercache.Resource,
nonzeroRequest *schedulercache.Resource,
pods []*v1.Pod,
usedPorts util.HostPortInfo,
imageStates map[string]*schedulercache.ImageStateSummary,
) *schedulercache.NodeInfo {
nodeInfo := schedulercache.NewNodeInfo(pods...)
nodeInfo.SetRequestedResource(requestedResource)
nodeInfo.SetNonZeroRequest(nonzeroRequest)
nodeInfo.SetUsedPorts(usedPorts)
nodeInfo.SetImageStates(imageStates)
return nodeInfo
}
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
// on node level. // on node level.
func TestAssumePodScheduled(t *testing.T) { func TestAssumePodScheduled(t *testing.T) {
@ -89,111 +109,99 @@ func TestAssumePodScheduled(t *testing.T) {
tests := []struct { tests := []struct {
pods []*v1.Pod pods []*v1.Pod
wNodeInfo *NodeInfo wNodeInfo *schedulercache.NodeInfo
}{{ }{{
pods: []*v1.Pod{testPods[0]}, pods: []*v1.Pod{testPods[0]},
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[0]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{testPods[0]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}, { }, {
pods: []*v1.Pod{testPods[1], testPods[2]}, pods: []*v1.Pod{testPods[1], testPods[2]},
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 300, MilliCPU: 300,
Memory: 1524, Memory: 1524,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 300, MilliCPU: 300,
Memory: 1524, Memory: 1524,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[1], testPods[2]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
pods: []*v1.Pod{testPods[1], testPods[2]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}, { // test non-zero request }, { // test non-zero request
pods: []*v1.Pod{testPods[3]}, pods: []*v1.Pod{testPods[3]},
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 0, MilliCPU: 0,
Memory: 0, Memory: 0,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: priorityutil.DefaultMilliCPURequest, MilliCPU: priorityutil.DefaultMilliCPURequest,
Memory: priorityutil.DefaultMemoryRequest, Memory: priorityutil.DefaultMemoryRequest,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[3]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{testPods[3]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}, { }, {
pods: []*v1.Pod{testPods[4]}, pods: []*v1.Pod{testPods[4]},
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 3}, ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 3},
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[4]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{testPods[4]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}, { }, {
pods: []*v1.Pod{testPods[4], testPods[5]}, pods: []*v1.Pod{testPods[4], testPods[5]},
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 300, MilliCPU: 300,
Memory: 1524, Memory: 1524,
ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 8}, ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 8},
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 300, MilliCPU: 300,
Memory: 1524, Memory: 1524,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[4], testPods[5]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
pods: []*v1.Pod{testPods[4], testPods[5]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}, { }, {
pods: []*v1.Pod{testPods[6]}, pods: []*v1.Pod{testPods[6]},
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[6]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().build(),
pods: []*v1.Pod{testPods[6]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}, },
} }
@ -246,7 +254,7 @@ func TestExpirePod(t *testing.T) {
pods []*testExpirePodStruct pods []*testExpirePodStruct
cleanupTime time.Time cleanupTime time.Time
wNodeInfo *NodeInfo wNodeInfo *schedulercache.NodeInfo
}{{ // assumed pod would expires }{{ // assumed pod would expires
pods: []*testExpirePodStruct{ pods: []*testExpirePodStruct{
{pod: testPods[0], assumedTime: now}, {pod: testPods[0], assumedTime: now},
@ -259,21 +267,19 @@ func TestExpirePod(t *testing.T) {
{pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)}, {pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)},
}, },
cleanupTime: now.Add(2 * ttl), cleanupTime: now.Add(2 * ttl),
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 1024, Memory: 1024,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 1024, Memory: 1024,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[1]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
pods: []*v1.Pod{testPods[1]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}} }}
for i, tt := range tests { for i, tt := range tests {
@ -308,25 +314,23 @@ func TestAddPodWillConfirm(t *testing.T) {
podsToAssume []*v1.Pod podsToAssume []*v1.Pod
podsToAdd []*v1.Pod podsToAdd []*v1.Pod
wNodeInfo *NodeInfo wNodeInfo *schedulercache.NodeInfo
}{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed.
podsToAssume: []*v1.Pod{testPods[0], testPods[1]}, podsToAssume: []*v1.Pod{testPods[0], testPods[1]},
podsToAdd: []*v1.Pod{testPods[0]}, podsToAdd: []*v1.Pod{testPods[0]},
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[0]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{testPods[0]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}} }}
for i, tt := range tests { for i, tt := range tests {
@ -402,28 +406,26 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
podsToAdd []*v1.Pod podsToAdd []*v1.Pod
podsToUpdate [][]*v1.Pod podsToUpdate [][]*v1.Pod
wNodeInfo map[string]*NodeInfo wNodeInfo map[string]*schedulercache.NodeInfo
}{{ }{{
podsToAssume: []*v1.Pod{assumedPod.DeepCopy()}, podsToAssume: []*v1.Pod{assumedPod.DeepCopy()},
podsToAdd: []*v1.Pod{addedPod.DeepCopy()}, podsToAdd: []*v1.Pod{addedPod.DeepCopy()},
podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}}, podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}},
wNodeInfo: map[string]*NodeInfo{ wNodeInfo: map[string]*schedulercache.NodeInfo{
"assumed-node": nil, "assumed-node": nil,
"actual-node": { "actual-node": newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{updatedPod.DeepCopy()},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
pods: []*v1.Pod{updatedPod.DeepCopy()}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}, },
}} }}
@ -462,24 +464,22 @@ func TestAddPodAfterExpiration(t *testing.T) {
tests := []struct { tests := []struct {
pod *v1.Pod pod *v1.Pod
wNodeInfo *NodeInfo wNodeInfo *schedulercache.NodeInfo
}{{ }{{
pod: basePod, pod: basePod,
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{basePod},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{basePod}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}} }}
now := time.Now() now := time.Now()
@ -517,39 +517,35 @@ func TestUpdatePod(t *testing.T) {
podsToAdd []*v1.Pod podsToAdd []*v1.Pod
podsToUpdate []*v1.Pod podsToUpdate []*v1.Pod
wNodeInfo []*NodeInfo wNodeInfo []*schedulercache.NodeInfo
}{{ // add a pod and then update it twice }{{ // add a pod and then update it twice
podsToAdd: []*v1.Pod{testPods[0]}, podsToAdd: []*v1.Pod{testPods[0]},
podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]}, podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
wNodeInfo: []*NodeInfo{{ wNodeInfo: []*schedulercache.NodeInfo{newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 1024, Memory: 1024,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 1024, Memory: 1024,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[1]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
pods: []*v1.Pod{testPods[1]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), ), newNodeInfo(
imageStates: make(map[string]*ImageStateSummary), &schedulercache.Resource{
}, {
requestedResource: &Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[0]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{testPods[0]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), )},
imageStates: make(map[string]*ImageStateSummary),
}},
}} }}
for _, tt := range tests { for _, tt := range tests {
@ -648,40 +644,36 @@ func TestExpireAddUpdatePod(t *testing.T) {
podsToAdd []*v1.Pod podsToAdd []*v1.Pod
podsToUpdate []*v1.Pod podsToUpdate []*v1.Pod
wNodeInfo []*NodeInfo wNodeInfo []*schedulercache.NodeInfo
}{{ // Pod is assumed, expired, and added. Then it would be updated twice. }{{ // Pod is assumed, expired, and added. Then it would be updated twice.
podsToAssume: []*v1.Pod{testPods[0]}, podsToAssume: []*v1.Pod{testPods[0]},
podsToAdd: []*v1.Pod{testPods[0]}, podsToAdd: []*v1.Pod{testPods[0]},
podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]}, podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]},
wNodeInfo: []*NodeInfo{{ wNodeInfo: []*schedulercache.NodeInfo{newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 1024, Memory: 1024,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 200, MilliCPU: 200,
Memory: 1024, Memory: 1024,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[1]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
pods: []*v1.Pod{testPods[1]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), ), newNodeInfo(
imageStates: make(map[string]*ImageStateSummary), &schedulercache.Resource{
}, {
requestedResource: &Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{testPods[0]},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{testPods[0]}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), )},
imageStates: make(map[string]*ImageStateSummary),
}},
}} }}
now := time.Now() now := time.Now()
@ -742,24 +734,22 @@ func TestEphemeralStorageResource(t *testing.T) {
podE := makePodWithEphemeralStorage(nodeName, "500") podE := makePodWithEphemeralStorage(nodeName, "500")
tests := []struct { tests := []struct {
pod *v1.Pod pod *v1.Pod
wNodeInfo *NodeInfo wNodeInfo *schedulercache.NodeInfo
}{ }{
{ {
pod: podE, pod: podE,
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
EphemeralStorage: 500, EphemeralStorage: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: priorityutil.DefaultMilliCPURequest, MilliCPU: priorityutil.DefaultMilliCPURequest,
Memory: priorityutil.DefaultMemoryRequest, Memory: priorityutil.DefaultMemoryRequest,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{podE},
allocatableResource: &Resource{}, schedutil.HostPortInfo{},
pods: []*v1.Pod{podE}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: schedutil.HostPortInfo{}, ),
imageStates: make(map[string]*ImageStateSummary),
},
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -789,24 +779,22 @@ func TestRemovePod(t *testing.T) {
basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}})
tests := []struct { tests := []struct {
pod *v1.Pod pod *v1.Pod
wNodeInfo *NodeInfo wNodeInfo *schedulercache.NodeInfo
}{{ }{{
pod: basePod, pod: basePod,
wNodeInfo: &NodeInfo{ wNodeInfo: newNodeInfo(
requestedResource: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
nonzeroRequest: &Resource{ &schedulercache.Resource{
MilliCPU: 100, MilliCPU: 100,
Memory: 500, Memory: 500,
}, },
TransientInfo: newTransientSchedulerInfo(), []*v1.Pod{basePod},
allocatableResource: &Resource{}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
pods: []*v1.Pod{basePod}, make(map[string]*schedulercache.ImageStateSummary),
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), ),
imageStates: make(map[string]*ImageStateSummary),
},
}} }}
for i, tt := range tests { for i, tt := range tests {
@ -885,7 +873,7 @@ func TestForgetPod(t *testing.T) {
// getResourceRequest returns the resource request of all containers in Pods; // getResourceRequest returns the resource request of all containers in Pods;
// excuding initContainers. // excuding initContainers.
func getResourceRequest(pod *v1.Pod) v1.ResourceList { func getResourceRequest(pod *v1.Pod) v1.ResourceList {
result := &Resource{} result := &schedulercache.Resource{}
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
result.Add(container.Resources.Requests) result.Add(container.Resources.Requests)
} }
@ -894,22 +882,30 @@ func getResourceRequest(pod *v1.Pod) v1.ResourceList {
} }
// buildNodeInfo creates a NodeInfo by simulating node operations in cache. // buildNodeInfo creates a NodeInfo by simulating node operations in cache.
func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *NodeInfo { func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *schedulercache.NodeInfo {
expected := NewNodeInfo() expected := schedulercache.NewNodeInfo()
// Simulate SetNode. // Simulate SetNode.
expected.node = node expected.SetNode(node)
expected.allocatableResource = NewResource(node.Status.Allocatable)
expected.taints = node.Spec.Taints expected.SetAllocatableResource(schedulercache.NewResource(node.Status.Allocatable))
expected.generation++ expected.SetTaints(node.Spec.Taints)
expected.SetGeneration(expected.GetGeneration() + 1)
for _, pod := range pods { for _, pod := range pods {
// Simulate AddPod // Simulate AddPod
expected.pods = append(expected.pods, pod) pods := append(expected.Pods(), pod)
expected.requestedResource.Add(getResourceRequest(pod)) expected.SetPods(pods)
expected.nonzeroRequest.Add(getResourceRequest(pod)) requestedResource := expected.RequestedResource()
expected.updateUsedPorts(pod, true) newRequestedResource := &requestedResource
expected.generation++ newRequestedResource.Add(getResourceRequest(pod))
expected.SetRequestedResource(newRequestedResource)
nonZeroRequest := expected.NonZeroRequest()
newNonZeroRequest := &nonZeroRequest
newNonZeroRequest.Add(getResourceRequest(pod))
expected.SetNonZeroRequest(newNonZeroRequest)
expected.UpdateUsedPorts(pod, true)
expected.SetGeneration(expected.GetGeneration() + 1)
} }
return expected return expected
@ -1060,42 +1056,46 @@ func TestNodeOperators(t *testing.T) {
// Case 1: the node was added into cache successfully. // Case 1: the node was added into cache successfully.
got, found := cache.nodes[node.Name] got, found := cache.nodes[node.Name]
if !found { if !found {
t.Errorf("Failed to find node %v in schedulercache.", node.Name) t.Errorf("Failed to find node %v in schedulerinternalcache.", node.Name)
} }
if cache.nodeTree.NumNodes != 1 || cache.nodeTree.Next() != node.Name { if cache.nodeTree.NumNodes != 1 || cache.nodeTree.Next() != node.Name {
t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name) t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
} }
// Generations are globally unique. We check in our unit tests that they are incremented correctly. // Generations are globally unique. We check in our unit tests that they are incremented correctly.
expected.generation = got.generation expected.SetGeneration(got.GetGeneration())
if !reflect.DeepEqual(got, expected) { if !reflect.DeepEqual(got, expected) {
t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
} }
// Case 2: dump cached nodes successfully. // Case 2: dump cached nodes successfully.
cachedNodes := map[string]*NodeInfo{} cachedNodes := map[string]*schedulercache.NodeInfo{}
cache.UpdateNodeNameToInfoMap(cachedNodes) cache.UpdateNodeNameToInfoMap(cachedNodes)
newNode, found := cachedNodes[node.Name] newNode, found := cachedNodes[node.Name]
if !found || len(cachedNodes) != 1 { if !found || len(cachedNodes) != 1 {
t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
} }
expected.generation = newNode.generation expected.SetGeneration(newNode.GetGeneration())
if !reflect.DeepEqual(newNode, expected) { if !reflect.DeepEqual(newNode, expected) {
t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected) t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected)
} }
// Case 3: update node attribute successfully. // Case 3: update node attribute successfully.
node.Status.Allocatable[v1.ResourceMemory] = mem50m node.Status.Allocatable[v1.ResourceMemory] = mem50m
expected.allocatableResource.Memory = mem50m.Value() allocatableResource := expected.AllocatableResource()
newAllocatableResource := &allocatableResource
newAllocatableResource.Memory = mem50m.Value()
expected.SetAllocatableResource(newAllocatableResource)
cache.UpdateNode(nil, node) cache.UpdateNode(nil, node)
got, found = cache.nodes[node.Name] got, found = cache.nodes[node.Name]
if !found { if !found {
t.Errorf("Failed to find node %v in schedulercache after UpdateNode.", node.Name) t.Errorf("Failed to find node %v in schedulercache after UpdateNode.", node.Name)
} }
if got.generation <= expected.generation { if got.GetGeneration() <= expected.GetGeneration() {
t.Errorf("generation is not incremented. got: %v, expected: %v", got.generation, expected.generation) t.Errorf("Generation is not incremented. got: %v, expected: %v", got.GetGeneration(), expected.GetGeneration())
} }
expected.generation = got.generation expected.SetGeneration(got.GetGeneration())
if !reflect.DeepEqual(got, expected) { if !reflect.DeepEqual(got, expected) {
t.Errorf("Failed to update node in schedulercache:\n got: %+v \nexpected: %+v", got, expected) t.Errorf("Failed to update node in schedulercache:\n got: %+v \nexpected: %+v", got, expected)
@ -1132,7 +1132,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b) cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
cachedNodes := map[string]*NodeInfo{} cachedNodes := map[string]*schedulercache.NodeInfo{}
cache.UpdateNodeNameToInfoMap(cachedNodes) cache.UpdateNodeNameToInfoMap(cachedNodes)
} }
} }

View File

@ -19,6 +19,7 @@ package cache
import ( import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
) )
// PodFilter is a function to filter a pod. If pod passed return true else return false. // PodFilter is a function to filter a pod. If pod passed return true else return false.
@ -99,7 +100,7 @@ type Cache interface {
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache. // UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be) // The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node. // on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*NodeInfo) error UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error
// List lists all cached pods (including assumed ones). // List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*v1.Pod, error) List(labels.Selector) ([]*v1.Pod, error)
@ -117,5 +118,5 @@ type Cache interface {
// Snapshot is a snapshot of cache state // Snapshot is a snapshot of cache state
type Snapshot struct { type Snapshot struct {
AssumedPods map[string]bool AssumedPods map[string]bool
Nodes map[string]*NodeInfo Nodes map[string]*schedulercache.NodeInfo
} }

View File

@ -59,6 +59,7 @@ func (na *nodeArray) next() (nodeName string, exhausted bool) {
return nodeName, false return nodeName, false
} }
// newNodeTree creates a NodeTree from nodes.
func newNodeTree(nodes []*v1.Node) *NodeTree { func newNodeTree(nodes []*v1.Node) *NodeTree {
nt := &NodeTree{ nt := &NodeTree{
tree: make(map[string]*nodeArray), tree: make(map[string]*nodeArray),

View File

@ -31,9 +31,9 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence" "k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
@ -75,7 +75,7 @@ func (sched *Scheduler) StopEverything() {
} }
// Cache returns the cache in scheduler for test to check the data in scheduler. // Cache returns the cache in scheduler for test to check the data in scheduler.
func (sched *Scheduler) Cache() schedulercache.Cache { func (sched *Scheduler) Cache() schedulerinternalcache.Cache {
return sched.config.SchedulerCache return sched.config.SchedulerCache
} }
@ -110,7 +110,7 @@ type Configurator interface {
type Config struct { type Config struct {
// It is expected that changes made via SchedulerCache will be observed // It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm. // by NodeLister and Algorithm.
SchedulerCache schedulercache.Cache SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after // Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod // successfully binding a pod
Ecache *equivalence.Cache Ecache *equivalence.Cache

View File

@ -38,8 +38,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
@ -265,7 +265,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(100*time.Millisecond, stop) scache := schedulerinternalcache.New(100*time.Millisecond, stop)
pod := podWithPort("pod.Name", "", 8080) pod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node) scache.AddNode(&node)
@ -323,7 +323,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(10*time.Minute, stop) scache := schedulerinternalcache.New(10*time.Minute, stop)
firstPod := podWithPort("pod.Name", "", 8080) firstPod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node) scache.AddNode(&node)
@ -410,7 +410,7 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
} { } {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(test.CacheTTL, stop) scache := schedulerinternalcache.New(test.CacheTTL, stop)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node) scache.AddNode(&node)
@ -446,7 +446,7 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
// queuedPodStore: pods queued before processing. // queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods. // cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache,
nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) { nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil)
@ -478,7 +478,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(10*time.Minute, stop) scache := schedulerinternalcache.New(10*time.Minute, stop)
// Design the baseline for the pods, and we will make nodes that dont fit it later. // Design the baseline for the pods, and we will make nodes that dont fit it later.
var cpu = int64(4) var cpu = int64(4)
@ -549,7 +549,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing. // queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods. // scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
nil, nil,
@ -600,7 +600,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
return sched, bindingChan, errChan return sched, bindingChan, errChan
} }
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
nil, nil,
@ -656,7 +656,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode}) nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode})
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
queuedPodStore.Add(podWithID("foo", "")) queuedPodStore.Add(podWithID("foo", ""))
scache := schedulercache.New(10*time.Minute, stop) scache := schedulerinternalcache.New(10*time.Minute, stop)
scache.AddNode(&testNode) scache.AddNode(&testNode)
predicateMap := map[string]algorithm.FitPredicate{ predicateMap := map[string]algorithm.FitPredicate{

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/apis/core/install:go_default_library", "//pkg/apis/core/install:go_default_library",
"//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
) )
// FakeCache is used for testing // FakeCache is used for testing
@ -82,14 +83,14 @@ func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.N
func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil } func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
// FilteredList is a fake method for testing. // FilteredList is a fake method for testing.
func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { func (f *FakeCache) FilteredList(filter schedulerinternalcache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return nil, nil return nil, nil
} }
// Snapshot is a fake method for testing // Snapshot is a fake method for testing
func (f *FakeCache) Snapshot() *schedulercache.Snapshot { func (f *FakeCache) Snapshot() *schedulerinternalcache.Snapshot {
return &schedulercache.Snapshot{} return &schedulerinternalcache.Snapshot{}
} }
// NodeTree is a fake method for testing. // NodeTree is a fake method for testing.
func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil } func (f *FakeCache) NodeTree() *schedulerinternalcache.NodeTree { return nil }

View File

@ -26,7 +26,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
) )
var _ algorithm.NodeLister = &FakeNodeLister{} var _ algorithm.NodeLister = &FakeNodeLister{}
@ -55,7 +55,7 @@ func (f FakePodLister) List(s labels.Selector) (selected []*v1.Pod, err error) {
} }
// FilteredList returns pods matching a pod filter and a label selector. // FilteredList returns pods matching a pod filter and a label selector.
func (f FakePodLister) FilteredList(podFilter schedulercache.PodFilter, s labels.Selector) (selected []*v1.Pod, err error) { func (f FakePodLister) FilteredList(podFilter schedulerinternalcache.PodFilter, s labels.Selector) (selected []*v1.Pod, err error) {
for _, pod := range f { for _, pod := range f {
if podFilter(pod) && s.Matches(labels.Set(pod.Labels)) { if podFilter(pod) && s.Matches(labels.Set(pod.Labels)) {
selected = append(selected, pod) selected = append(selected, pod)