Merge pull request #69317 from wgliang/feature/move-cachecompare

[scheduler cleanup phase 1]: Move CacheComparer to pkg/scheduler/inte…
pull/58/head
k8s-ci-robot 2018-10-10 20:24:46 -07:00 committed by GitHub
commit 256fdb93c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 30 deletions

View File

@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cache_comparer.go",
"factory.go",
"plugins.go",
"signal.go",
@ -22,10 +21,10 @@ go_library(
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/validation:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/comparer:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
@ -57,7 +56,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"cache_comparer_test.go",
"factory_test.go",
"plugins_test.go",
],
@ -78,7 +76,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -60,6 +60,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachecomparer "k8s.io/kubernetes/pkg/scheduler/internal/cache/comparer"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -305,12 +306,12 @@ func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
}
// Setup cache comparer
comparer := &cacheComparer{
podLister: args.PodInformer.Lister(),
nodeLister: args.NodeInformer.Lister(),
cache: c.schedulerCache,
podQueue: c.podQueue,
}
comparer := cachecomparer.New(
args.NodeInformer.Lister(),
args.PodInformer.Lister(),
c.schedulerCache,
c.podQueue,
)
ch := make(chan os.Signal, 1)
signal.Notify(ch, compareSignal)

View File

@ -55,6 +55,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/scheduler/internal/cache/comparer:all-srcs",
"//pkg/scheduler/internal/cache/fake:all-srcs",
],
tags = ["automanaged"],

View File

@ -0,0 +1,42 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["comparer.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/cache/comparer",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = [
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["comparer_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
package comparer
import (
"sort"
@ -29,31 +29,47 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
)
type cacheComparer struct {
nodeLister corelisters.NodeLister
podLister corelisters.PodLister
cache schedulerinternalcache.Cache
podQueue internalqueue.SchedulingQueue
compareStrategy
// CacheComparer is an implementation of the Scheduler's cache comparer.
type CacheComparer struct {
NodeLister corelisters.NodeLister
PodLister corelisters.PodLister
Cache schedulerinternalcache.Cache
PodQueue internalqueue.SchedulingQueue
}
func (c *cacheComparer) Compare() error {
// New creates a CacheComparer.
func New(
nodeLister corelisters.NodeLister,
podLister corelisters.PodLister,
cache schedulerinternalcache.Cache,
podQueue internalqueue.SchedulingQueue,
) *CacheComparer {
return &CacheComparer{
NodeLister: nodeLister,
PodLister: podLister,
Cache: cache,
PodQueue: podQueue,
}
}
// Compare compares the nodes and pods of NodeLister with Cache.Snapshot.
func (c *CacheComparer) Compare() error {
glog.V(3).Info("cache comparer started")
defer glog.V(3).Info("cache comparer finished")
nodes, err := c.nodeLister.List(labels.Everything())
nodes, err := c.NodeLister.List(labels.Everything())
if err != nil {
return err
}
pods, err := c.podLister.List(labels.Everything())
pods, err := c.PodLister.List(labels.Everything())
if err != nil {
return err
}
snapshot := c.cache.Snapshot()
snapshot := c.Cache.Snapshot()
waitingPods := c.podQueue.WaitingPods()
waitingPods := c.PodQueue.WaitingPods()
if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 {
glog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant)
@ -66,10 +82,8 @@ func (c *cacheComparer) Compare() error {
return nil
}
type compareStrategy struct {
}
func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
// CompareNodes compares actual nodes with cached nodes.
func (c *CacheComparer) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, node := range nodes {
actual = append(actual, node.Name)
@ -83,7 +97,8 @@ func (c compareStrategy) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*sc
return compareStrings(actual, cached)
}
func (c compareStrategy) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
// ComparePods compares actual pods with cached pods.
func (c *CacheComparer) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, pod := range pods {
actual = append(actual, string(pod.UID))

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
package comparer
import (
"reflect"
@ -64,7 +64,7 @@ func TestCompareNodes(t *testing.T) {
}
func testCompareNodes(actual, cached, missing, redundant []string, t *testing.T) {
compare := compareStrategy{}
compare := CacheComparer{}
nodes := []*v1.Node{}
for _, nodeName := range actual {
node := &v1.Node{}
@ -155,7 +155,7 @@ func TestComparePods(t *testing.T) {
}
func testComparePods(actual, cached, queued, missing, redundant []string, t *testing.T) {
compare := compareStrategy{}
compare := CacheComparer{}
pods := []*v1.Pod{}
for _, uid := range actual {
pod := &v1.Pod{}