mirror of https://github.com/k3s-io/k3s
Merge pull request #6546 from lavalamp/fix
Add to controller framework; use in schedulerpull/6/head
commit
66d55e0959
|
@ -924,6 +924,7 @@ func runSchedulerNoPhantomPodsTest(client *client.Client) {
|
|||
}
|
||||
|
||||
// Delete a pod to free up room.
|
||||
glog.Infof("Deleting pod %v", bar.Name)
|
||||
err = client.Pods(api.NamespaceDefault).Delete(bar.Name)
|
||||
if err != nil {
|
||||
glog.Fatalf("FAILED: couldn't delete pod %q: %v", bar.Name, err)
|
||||
|
|
|
@ -59,11 +59,18 @@ func (k KeyError) Error() string {
|
|||
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
|
||||
}
|
||||
|
||||
// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
|
||||
// the object but not the object itself.
|
||||
type ExplicitKey string
|
||||
|
||||
// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
|
||||
// keys for API objects which implement meta.Interface.
|
||||
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
|
||||
// it's just <name>.
|
||||
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
||||
if key, ok := obj.(ExplicitKey); ok {
|
||||
return string(key), nil
|
||||
}
|
||||
meta, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("object has no meta: %v", err)
|
||||
|
|
|
@ -74,8 +74,9 @@ func New(c *Config) *Controller {
|
|||
|
||||
// Run begins processing items, and will continue until a value is sent down stopCh.
|
||||
// It's an error to call Run more than once.
|
||||
// Run does not block.
|
||||
// Run blocks; call via go.
|
||||
func (c *Controller) Run(stopCh <-chan struct{}) {
|
||||
defer util.HandleCrash()
|
||||
cache.NewReflector(
|
||||
c.config.ListerWatcher,
|
||||
c.config.ObjectType,
|
||||
|
@ -83,7 +84,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
|
|||
c.config.FullResyncPeriod,
|
||||
).RunUntil(stopCh)
|
||||
|
||||
go util.Until(c.processLoop, time.Second, stopCh)
|
||||
util.Until(c.processLoop, time.Second, stopCh)
|
||||
}
|
||||
|
||||
// processLoop drains the work queue.
|
||||
|
@ -102,3 +103,126 @@ func (c *Controller) processLoop() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ResourceEventHandler can handle notifications for events that happen to a
|
||||
// resource. The events are informational only, so you can't return an
|
||||
// error.
|
||||
// * OnAdd is called when an object is added.
|
||||
// * OnUpdate is called when an object is modified. Note that oldObj is the
|
||||
// last known state of the object-- it is possible that several changes
|
||||
// were combined together, so you can't use this to see every single
|
||||
// change. OnUpdate is also called when a re-list happens, and it will
|
||||
// get called even if nothing changed. This is useful for periodically
|
||||
// evaluating or syncing something.
|
||||
// * OnDelete will get the final state of the item if it is known, otherwise
|
||||
// it will get an object of type cache.DeletedFinalStateUnknown.
|
||||
type ResourceEventHandler interface {
|
||||
OnAdd(obj interface{})
|
||||
OnUpdate(oldObj, newObj interface{})
|
||||
OnDelete(obj interface{})
|
||||
}
|
||||
|
||||
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
|
||||
// as few of the notification functions as you want while still implementing
|
||||
// ResourceEventHandler.
|
||||
type ResourceEventHandlerFuncs struct {
|
||||
AddFunc func(obj interface{})
|
||||
UpdateFunc func(oldObj, newObj interface{})
|
||||
DeleteFunc func(obj interface{})
|
||||
}
|
||||
|
||||
// OnAdd calls AddFunc if it's not nil.
|
||||
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
|
||||
if r.AddFunc != nil {
|
||||
r.AddFunc(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// OnUpdate calls UpdateFunc if it's not nil.
|
||||
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
|
||||
if r.UpdateFunc != nil {
|
||||
r.UpdateFunc(oldObj, newObj)
|
||||
}
|
||||
}
|
||||
|
||||
// OnDelete calls DeleteFunc if it's not nil.
|
||||
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
|
||||
if r.DeleteFunc != nil {
|
||||
r.DeleteFunc(obj)
|
||||
}
|
||||
}
|
||||
|
||||
// DeletionHandlingMetaNamespaceKeyFunc checks for
|
||||
// cache.DeletedFinalStateUnknown objects before calling
|
||||
// cache.MetaNamespaceKeyFunc.
|
||||
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
||||
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
return d.Key, nil
|
||||
}
|
||||
return cache.MetaNamespaceKeyFunc(obj)
|
||||
}
|
||||
|
||||
// NewInformer returns a cache.Store and a controller for populating the store
|
||||
// while also providing event notifications. You should only used the returned
|
||||
// cache.Store for Get/List operations; Add/Modify/Deletes will cause the event
|
||||
// notifications to be faulty.
|
||||
//
|
||||
// Parameters:
|
||||
// * lw is list and watch functions for the source of the resource you want to
|
||||
// be informed of.
|
||||
// * objType is an object of the type that you expect to receieve.
|
||||
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
|
||||
// calls, even if nothing changed). Otherwise, re-list will be delayed as
|
||||
// long as possible (until the upstream source closes the watch or times out,
|
||||
// or you stop the controller).
|
||||
// * h is the object you want notifications sent to.
|
||||
//
|
||||
func NewInformer(
|
||||
lw cache.ListerWatcher,
|
||||
objType runtime.Object,
|
||||
resyncPeriod time.Duration,
|
||||
h ResourceEventHandler,
|
||||
) (cache.Store, *Controller) {
|
||||
// This will hold the client state, as we know it.
|
||||
clientState := cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc)
|
||||
|
||||
// This will hold incoming changes. Note how we pass clientState in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, clientState)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
ListerWatcher: lw,
|
||||
ObjectType: objType,
|
||||
FullResyncPeriod: resyncPeriod,
|
||||
RetryOnError: false,
|
||||
|
||||
Process: func(obj interface{}) error {
|
||||
// from oldest to newest
|
||||
for _, d := range obj.(cache.Deltas) {
|
||||
switch d.Type {
|
||||
case cache.Sync, cache.Added, cache.Updated:
|
||||
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
|
||||
if err := clientState.Update(d.Object); err != nil {
|
||||
return err
|
||||
}
|
||||
h.OnUpdate(old, d.Object)
|
||||
} else {
|
||||
if err := clientState.Add(d.Object); err != nil {
|
||||
return err
|
||||
}
|
||||
h.OnAdd(d.Object)
|
||||
}
|
||||
case cache.Deleted:
|
||||
if err := clientState.Delete(d.Object); err != nil {
|
||||
return err
|
||||
}
|
||||
h.OnDelete(d.Object)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
return clientState, New(cfg)
|
||||
}
|
||||
|
|
|
@ -18,15 +18,18 @@ package framework_test
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
// "testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
|
||||
"github.com/google/gofuzz"
|
||||
)
|
||||
|
||||
func Example() {
|
||||
|
@ -34,7 +37,7 @@ func Example() {
|
|||
source := framework.NewFakeControllerSource()
|
||||
|
||||
// This will hold the downstream state, as we know it.
|
||||
downstream := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)
|
||||
|
||||
// This will hold incoming changes. Note how we pass downstream in as a
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
|
@ -91,7 +94,7 @@ func Example() {
|
|||
|
||||
// Create the controller and run it until we close stop.
|
||||
stop := make(chan struct{})
|
||||
framework.New(cfg).Run(stop)
|
||||
go framework.New(cfg).Run(stop)
|
||||
|
||||
// Let's add a few objects to the source.
|
||||
for _, name := range []string{"a-hello", "b-controller", "c-framework"} {
|
||||
|
@ -113,3 +116,265 @@ func Example() {
|
|||
// b-controller
|
||||
// c-framework
|
||||
}
|
||||
|
||||
func ExampleInformer() {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := framework.NewFakeControllerSource()
|
||||
|
||||
// Let's do threadsafe output to get predictable test results.
|
||||
outputSetLock := sync.Mutex{}
|
||||
outputSet := util.StringSet{}
|
||||
|
||||
// Make a controller that immediately deletes anything added to it, and
|
||||
// logs anything deleted.
|
||||
_, controller := framework.NewInformer(
|
||||
source,
|
||||
&api.Pod{},
|
||||
time.Millisecond*100,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
source.Delete(obj.(runtime.Object))
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
key = "oops something went wrong with the key"
|
||||
}
|
||||
|
||||
// Record some output when items are deleted.
|
||||
outputSetLock.Lock()
|
||||
defer outputSetLock.Unlock()
|
||||
outputSet.Insert(key)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// Run the controller and run it until we close stop.
|
||||
stop := make(chan struct{})
|
||||
go controller.Run(stop)
|
||||
|
||||
// Let's add a few objects to the source.
|
||||
for _, name := range []string{"a-hello", "b-controller", "c-framework"} {
|
||||
// Note that these pods are not valid-- the fake source doesn't
|
||||
// call validation or perform any other checking.
|
||||
source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
|
||||
}
|
||||
|
||||
// Let's wait for the controller to process the things we just added.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
close(stop)
|
||||
|
||||
outputSetLock.Lock()
|
||||
for _, key := range outputSet.List() {
|
||||
fmt.Println(key)
|
||||
}
|
||||
// Output:
|
||||
// a-hello
|
||||
// b-controller
|
||||
// c-framework
|
||||
}
|
||||
|
||||
func TestHammerController(t *testing.T) {
|
||||
// This test executes a bunch of requests through the fake source and
|
||||
// controller framework to make sure there's no locking/threading
|
||||
// errors. If an error happens, it should hang forever or trigger the
|
||||
// race detector.
|
||||
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := framework.NewFakeControllerSource()
|
||||
|
||||
// Let's do threadsafe output to get predictable test results.
|
||||
outputSetLock := sync.Mutex{}
|
||||
// map of key to operations done on the key
|
||||
outputSet := map[string][]string{}
|
||||
|
||||
recordFunc := func(eventType string, obj interface{}) {
|
||||
key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
t.Errorf("something wrong with key: %v", err)
|
||||
key = "oops something went wrong with the key"
|
||||
}
|
||||
|
||||
// Record some output when items are deleted.
|
||||
outputSetLock.Lock()
|
||||
defer outputSetLock.Unlock()
|
||||
outputSet[key] = append(outputSet[key], eventType)
|
||||
}
|
||||
|
||||
// Make a controller which just logs all the changes it gets.
|
||||
_, controller := framework.NewInformer(
|
||||
source,
|
||||
&api.Pod{},
|
||||
time.Millisecond*100,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { recordFunc("add", obj) },
|
||||
UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
|
||||
DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
|
||||
},
|
||||
)
|
||||
|
||||
// Run the controller and run it until we close stop.
|
||||
stop := make(chan struct{})
|
||||
go controller.Run(stop)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
const threads = 3
|
||||
wg.Add(threads)
|
||||
for i := 0; i < threads; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Let's add a few objects to the source.
|
||||
currentNames := util.StringSet{}
|
||||
rs := rand.NewSource(rand.Int63())
|
||||
f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
|
||||
r := rand.New(rs) // Mustn't use r and f concurrently!
|
||||
for i := 0; i < 100; i++ {
|
||||
var name string
|
||||
var isNew bool
|
||||
if currentNames.Len() == 0 || r.Intn(3) == 1 {
|
||||
f.Fuzz(&name)
|
||||
isNew = true
|
||||
} else {
|
||||
l := currentNames.List()
|
||||
name = l[r.Intn(len(l))]
|
||||
}
|
||||
|
||||
pod := &api.Pod{}
|
||||
f.Fuzz(pod)
|
||||
pod.ObjectMeta.Name = name
|
||||
pod.ObjectMeta.Namespace = "default"
|
||||
// Add, update, or delete randomly.
|
||||
// Note that these pods are not valid-- the fake source doesn't
|
||||
// call validation or perform any other checking.
|
||||
if isNew {
|
||||
currentNames.Insert(name)
|
||||
source.Add(pod)
|
||||
continue
|
||||
}
|
||||
switch r.Intn(2) {
|
||||
case 0:
|
||||
currentNames.Insert(name)
|
||||
source.Modify(pod)
|
||||
case 1:
|
||||
currentNames.Delete(name)
|
||||
source.Delete(pod)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Let's wait for the controller to finish processing the things we just added.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
close(stop)
|
||||
|
||||
outputSetLock.Lock()
|
||||
t.Logf("got: %#v", outputSet)
|
||||
}
|
||||
|
||||
func TestUpdate(t *testing.T) {
|
||||
// This test is going to exercise the various paths that result in a
|
||||
// call to update.
|
||||
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := framework.NewFakeControllerSource()
|
||||
|
||||
const (
|
||||
FROM = "from"
|
||||
ADD_MISSED = "missed the add event"
|
||||
TO = "to"
|
||||
)
|
||||
|
||||
// These are the transitions we expect to see; because this is
|
||||
// asynchronous, there are a lot of valid possibilities.
|
||||
type pair struct{ from, to string }
|
||||
allowedTransitions := map[pair]bool{
|
||||
pair{FROM, TO}: true,
|
||||
pair{FROM, ADD_MISSED}: true,
|
||||
pair{ADD_MISSED, TO}: true,
|
||||
|
||||
// Because a resync can happen when we've already observed one
|
||||
// of the above but before the item is deleted.
|
||||
pair{TO, TO}: true,
|
||||
// Because a resync could happen before we observe an update.
|
||||
pair{FROM, FROM}: true,
|
||||
}
|
||||
|
||||
var testDoneWG sync.WaitGroup
|
||||
|
||||
// Make a controller that immediately deletes anything added to it, and
|
||||
// logs anything deleted.
|
||||
_, controller := framework.NewInformer(
|
||||
source,
|
||||
&api.Pod{},
|
||||
time.Millisecond*1,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
o, n := oldObj.(*api.Pod), newObj.(*api.Pod)
|
||||
from, to := o.Labels["check"], n.Labels["check"]
|
||||
if !allowedTransitions[pair{from, to}] {
|
||||
t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
|
||||
}
|
||||
source.Delete(n)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
testDoneWG.Done()
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// Run the controller and run it until we close stop.
|
||||
stop := make(chan struct{})
|
||||
go controller.Run(stop)
|
||||
|
||||
pod := func(name, check string) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: map[string]string{"check": check},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
tests := []func(string){
|
||||
func(name string) {
|
||||
name = "a-" + name
|
||||
source.Add(pod(name, FROM))
|
||||
source.Modify(pod(name, TO))
|
||||
},
|
||||
func(name string) {
|
||||
name = "b-" + name
|
||||
source.Add(pod(name, FROM))
|
||||
source.ModifyDropWatch(pod(name, TO))
|
||||
},
|
||||
func(name string) {
|
||||
name = "c-" + name
|
||||
source.AddDropWatch(pod(name, FROM))
|
||||
source.Modify(pod(name, ADD_MISSED))
|
||||
source.Modify(pod(name, TO))
|
||||
},
|
||||
func(name string) {
|
||||
name = "d-" + name
|
||||
source.Add(pod(name, FROM))
|
||||
},
|
||||
}
|
||||
|
||||
// run every test a few times, in parallel
|
||||
const threads = 3
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(threads * len(tests))
|
||||
testDoneWG.Add(threads * len(tests))
|
||||
for i := 0; i < threads; i++ {
|
||||
for j, f := range tests {
|
||||
go func(name string, f func(string)) {
|
||||
defer wg.Done()
|
||||
f(name)
|
||||
}(fmt.Sprintf("%v-%v", i, j), f)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Let's wait for the controller to process the things we just added.
|
||||
testDoneWG.Wait()
|
||||
close(stop)
|
||||
}
|
||||
|
|
|
@ -18,10 +18,12 @@ package framework
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
|
@ -51,26 +53,49 @@ type nnu struct {
|
|||
// Add adds an object to the set and sends an add event to watchers.
|
||||
// obj's ResourceVersion is set.
|
||||
func (f *FakeControllerSource) Add(obj runtime.Object) {
|
||||
f.change(watch.Event{watch.Added, obj})
|
||||
f.Change(watch.Event{watch.Added, obj}, 1)
|
||||
}
|
||||
|
||||
// Modify updates an object in the set and sends a modified event to watchers.
|
||||
// obj's ResourceVersion is set.
|
||||
func (f *FakeControllerSource) Modify(obj runtime.Object) {
|
||||
f.change(watch.Event{watch.Modified, obj})
|
||||
f.Change(watch.Event{watch.Modified, obj}, 1)
|
||||
}
|
||||
|
||||
// Delete deletes an object from the set and sends a delete event to watchers.
|
||||
// obj's ResourceVersion is set.
|
||||
func (f *FakeControllerSource) Delete(lastValue runtime.Object) {
|
||||
f.change(watch.Event{watch.Deleted, lastValue})
|
||||
f.Change(watch.Event{watch.Deleted, lastValue}, 1)
|
||||
}
|
||||
|
||||
// AddDropWatch adds an object to the set but forgets to send an add event to
|
||||
// watchers.
|
||||
// obj's ResourceVersion is set.
|
||||
func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) {
|
||||
f.Change(watch.Event{watch.Added, obj}, 0)
|
||||
}
|
||||
|
||||
// ModifyDropWatch updates an object in the set but forgets to send a modify
|
||||
// event to watchers.
|
||||
// obj's ResourceVersion is set.
|
||||
func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) {
|
||||
f.Change(watch.Event{watch.Modified, obj}, 0)
|
||||
}
|
||||
|
||||
// DeleteDropWatch deletes an object from the set but forgets to send a delete
|
||||
// event to watchers.
|
||||
// obj's ResourceVersion is set.
|
||||
func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) {
|
||||
f.Change(watch.Event{watch.Deleted, lastValue}, 0)
|
||||
}
|
||||
|
||||
func (f *FakeControllerSource) key(meta *api.ObjectMeta) nnu {
|
||||
return nnu{meta.Namespace, meta.Name, meta.UID}
|
||||
}
|
||||
|
||||
func (f *FakeControllerSource) change(e watch.Event) {
|
||||
// Change records the given event (setting the object's resource version) and
|
||||
// sends a watch event with the specified probability.
|
||||
func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
|
@ -89,7 +114,10 @@ func (f *FakeControllerSource) change(e watch.Event) {
|
|||
case watch.Deleted:
|
||||
delete(f.items, key)
|
||||
}
|
||||
f.broadcaster.Action(e.Type, e.Object)
|
||||
|
||||
if rand.Float64() < watchProbability {
|
||||
f.broadcaster.Action(e.Type, e.Object)
|
||||
}
|
||||
}
|
||||
|
||||
// List returns a list object, with its resource version set.
|
||||
|
@ -98,8 +126,15 @@ func (f *FakeControllerSource) List() (runtime.Object, error) {
|
|||
defer f.lock.RUnlock()
|
||||
list := make([]runtime.Object, 0, len(f.items))
|
||||
for _, obj := range f.items {
|
||||
// TODO: should copy obj first
|
||||
list = append(list, obj)
|
||||
// Must make a copy to allow clients to modify the object.
|
||||
// Otherwise, if they make a change and write it back, they
|
||||
// will inadvertantly change the our canonical copy (in
|
||||
// addition to racing with other clients).
|
||||
objCopy, err := conversion.DeepCopy(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
list = append(list, objCopy.(runtime.Object))
|
||||
}
|
||||
listObj := &api.List{}
|
||||
if err := runtime.SetList(listObj, list); err != nil {
|
||||
|
@ -124,7 +159,20 @@ func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, e
|
|||
return nil, err
|
||||
}
|
||||
if rc < len(f.changes) {
|
||||
return f.broadcaster.WatchWithPrefix(f.changes[rc:]), nil
|
||||
changes := []watch.Event{}
|
||||
for _, c := range f.changes[rc:] {
|
||||
// Must make a copy to allow clients to modify the
|
||||
// object. Otherwise, if they make a change and write
|
||||
// it back, they will inadvertantly change the our
|
||||
// canonical copy (in addition to racing with other
|
||||
// clients).
|
||||
objCopy, err := conversion.DeepCopy(c.Object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
changes = append(changes, watch.Event{c.Type, objCopy.(runtime.Object)})
|
||||
}
|
||||
return f.broadcaster.WatchWithPrefix(changes), nil
|
||||
} else if rc > len(f.changes) {
|
||||
return nil, errors.New("resource version in the future not supported by this fake")
|
||||
}
|
||||
|
|
|
@ -17,22 +17,27 @@ limitations under the License.
|
|||
package conversion
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
var deepCopier = NewConverter()
|
||||
|
||||
// DeepCopy makes a deep copy of source. Won't work for any private fields!
|
||||
// For nil slices, will return 0-length slices. These are equivilent in
|
||||
// basically every way except for the way that reflect.DeepEqual checks.
|
||||
func DeepCopy(source interface{}) (interface{}, error) {
|
||||
src := reflect.ValueOf(source)
|
||||
v := reflect.New(src.Type()).Elem()
|
||||
s := &scope{
|
||||
converter: deepCopier,
|
||||
}
|
||||
if err := deepCopier.convert(src, v, s); err != nil {
|
||||
v := reflect.New(reflect.TypeOf(source))
|
||||
|
||||
buff := &bytes.Buffer{}
|
||||
enc := gob.NewEncoder(buff)
|
||||
dec := gob.NewDecoder(buff)
|
||||
err := enc.Encode(source)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v.Interface(), nil
|
||||
err = dec.Decode(v.Interface())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v.Elem().Interface(), nil
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
|
||||
|
@ -51,7 +52,11 @@ type ConfigFactory struct {
|
|||
// a means to list all services
|
||||
ServiceLister *cache.StoreToServiceLister
|
||||
|
||||
modeler scheduler.SystemModeler
|
||||
// Close this to stop all reflectors
|
||||
StopEverything chan struct{}
|
||||
|
||||
scheduledPodPopulator *framework.Controller
|
||||
modeler scheduler.SystemModeler
|
||||
}
|
||||
|
||||
// Initializes the factory.
|
||||
|
@ -59,13 +64,40 @@ func NewConfigFactory(client *client.Client) *ConfigFactory {
|
|||
c := &ConfigFactory{
|
||||
Client: client,
|
||||
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
||||
ScheduledPodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ScheduledPodLister: &cache.StoreToPodLister{},
|
||||
NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
StopEverything: make(chan struct{}),
|
||||
}
|
||||
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister)
|
||||
c.modeler = modeler
|
||||
c.PodLister = modeler.PodLister()
|
||||
|
||||
// On add/delete to the scheduled pods, remove from the assumed pods.
|
||||
// We construct this here instead of in CreateFromKeys because
|
||||
// ScheduledPodLister is something we provide to plug in functions that
|
||||
// they may need to call.
|
||||
c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer(
|
||||
c.createAssignedPodLW(),
|
||||
&api.Pod{},
|
||||
0,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
if pod, ok := obj.(*api.Pod); ok {
|
||||
c.modeler.ForgetPod(pod)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
switch t := obj.(type) {
|
||||
case *api.Pod:
|
||||
c.modeler.ForgetPod(t)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
c.modeler.ForgetPodByKey(t.Key)
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
@ -109,21 +141,6 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
|
|||
return f.CreateFromKeys(predicateKeys, priorityKeys)
|
||||
}
|
||||
|
||||
// ReflectorDeletionHook passes all operations through to Store, but calls
|
||||
// OnDelete in a goroutine if there is a deletion.
|
||||
type ReflectorDeletionHook struct {
|
||||
cache.Store
|
||||
OnDelete func(obj interface{})
|
||||
}
|
||||
|
||||
func (r ReflectorDeletionHook) Delete(obj interface{}) error {
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
r.OnDelete(obj)
|
||||
}()
|
||||
return r.Store.Delete(obj)
|
||||
}
|
||||
|
||||
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
|
||||
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
|
||||
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
|
||||
|
@ -144,39 +161,25 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
|||
}
|
||||
|
||||
// Watch and queue pods that need scheduling.
|
||||
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run()
|
||||
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)
|
||||
|
||||
// Pass through all events to the scheduled pod store, but on a deletion,
|
||||
// also remove from the assumed pods.
|
||||
assumedPodDeleter := ReflectorDeletionHook{
|
||||
Store: f.ScheduledPodLister.Store,
|
||||
OnDelete: func(obj interface{}) {
|
||||
if pod, ok := obj.(*api.Pod); ok {
|
||||
f.modeler.LockedAction(func() {
|
||||
f.modeler.ForgetPod(pod)
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// Watch and cache all running pods. Scheduler needs to find all pods
|
||||
// so it knows where it's safe to place a pod. Cache this locally.
|
||||
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run()
|
||||
// Begin populating scheduled pods.
|
||||
go f.scheduledPodPopulator.Run(f.StopEverything)
|
||||
|
||||
// Watch minions.
|
||||
// Minions may be listed frequently, so provide a local up-to-date cache.
|
||||
if false {
|
||||
// Disable this code until minions support watches. Note when this code is enabled,
|
||||
// we need to make sure minion ListWatcher has proper FieldSelector.
|
||||
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).Run()
|
||||
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 10*time.Second).RunUntil(f.StopEverything)
|
||||
} else {
|
||||
cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).Run()
|
||||
cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).RunUntil(f.StopEverything)
|
||||
}
|
||||
|
||||
// Watch and cache all service objects. Scheduler needs to find all pods
|
||||
// created by the same service, so that it can spread them correctly.
|
||||
// Cache this locally.
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).Run()
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
|
@ -200,7 +203,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
|||
glog.V(2).Infof("About to try and schedule pod %v", pod.Name)
|
||||
return pod
|
||||
},
|
||||
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
|
||||
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
|
||||
StopEverything: f.StopEverything,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -57,8 +57,9 @@ func (a *actionLocker) LockedAction(do func()) {
|
|||
|
||||
// FakeModeler implements the SystemModeler interface.
|
||||
type FakeModeler struct {
|
||||
AssumePodFunc func(pod *api.Pod)
|
||||
ForgetPodFunc func(pod *api.Pod)
|
||||
AssumePodFunc func(pod *api.Pod)
|
||||
ForgetPodFunc func(pod *api.Pod)
|
||||
ForgetPodByKeyFunc func(key string)
|
||||
actionLocker
|
||||
}
|
||||
|
||||
|
@ -76,6 +77,13 @@ func (f *FakeModeler) ForgetPod(pod *api.Pod) {
|
|||
}
|
||||
}
|
||||
|
||||
// ForgetPodByKey calls the function variable if it is not nil.
|
||||
func (f *FakeModeler) ForgetPodByKey(key string) {
|
||||
if f.ForgetPodFunc != nil {
|
||||
f.ForgetPodByKeyFunc(key)
|
||||
}
|
||||
}
|
||||
|
||||
// SimpleModeler implements the SystemModeler interface with a timed pod cache.
|
||||
type SimpleModeler struct {
|
||||
queuedPods ExtendedPodLister
|
||||
|
@ -110,6 +118,10 @@ func (s *SimpleModeler) ForgetPod(pod *api.Pod) {
|
|||
s.assumedPods.Delete(pod)
|
||||
}
|
||||
|
||||
func (s *SimpleModeler) ForgetPodByKey(key string) {
|
||||
s.assumedPods.Delete(cache.ExplicitKey(key))
|
||||
}
|
||||
|
||||
// Extract names for readable logging.
|
||||
func podNames(pods []api.Pod) []string {
|
||||
out := make([]string, len(pods))
|
||||
|
|
|
@ -51,6 +51,7 @@ type SystemModeler interface {
|
|||
// show the absence of the given pod if the pod is in the scheduled
|
||||
// pods list!)
|
||||
ForgetPod(pod *api.Pod)
|
||||
ForgetPodByKey(key string)
|
||||
|
||||
// For serializing calls to Assume/ForgetPod: imagine you want to add
|
||||
// a pod iff a bind succeeds, but also remove a pod if it is deleted.
|
||||
|
@ -85,6 +86,9 @@ type Config struct {
|
|||
|
||||
// Recorder is the EventRecorder to use
|
||||
Recorder record.EventRecorder
|
||||
|
||||
// Close this to shut down the scheduler.
|
||||
StopEverything chan struct{}
|
||||
}
|
||||
|
||||
// New returns a new scheduler.
|
||||
|
@ -98,7 +102,7 @@ func New(c *Config) *Scheduler {
|
|||
|
||||
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
|
||||
func (s *Scheduler) Run() {
|
||||
go util.Forever(s.scheduleOne, 0)
|
||||
go util.Until(s.scheduleOne, 0, s.config.StopEverything)
|
||||
}
|
||||
|
||||
func (s *Scheduler) scheduleOne() {
|
||||
|
|
Loading…
Reference in New Issue