From 3fe17b93cfee71d989bd8f67600ddabac39468f8 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 27 Feb 2015 16:49:51 -0800 Subject: [PATCH 1/3] Begin a controller framework. --- pkg/controller/framework/controller.go | 104 ++++++++++++++ pkg/controller/framework/controller_test.go | 115 +++++++++++++++ pkg/controller/framework/doc.go | 18 +++ .../framework/fake_controller_source.go | 132 ++++++++++++++++++ pkg/util/util.go | 4 + pkg/watch/mux.go | 26 ++++ 6 files changed, 399 insertions(+) create mode 100644 pkg/controller/framework/controller.go create mode 100644 pkg/controller/framework/controller_test.go create mode 100644 pkg/controller/framework/doc.go create mode 100644 pkg/controller/framework/fake_controller_source.go diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go new file mode 100644 index 0000000000..1868e0361e --- /dev/null +++ b/pkg/controller/framework/controller.go @@ -0,0 +1,104 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Config contains all the settings for a Controller. +type Config struct { + // The queue for your objects; either a cache.FIFO or + // a cache.DeltaFIFO. Your Process() function should accept + // the output of this Oueue's Pop() method. + cache.Queue + + // Something that can list and watch your objects. + cache.ListerWatcher + + // Something that can process your objects. + Process ProcessFunc + + // The type of your objects. + ObjectType runtime.Object + + // Reprocess everything at least this often. + // Note that if it takes longer for you to clear the queue than this + // period, you will end up processing items in the order determined + // by cache.FIFO.Replace(). Currently, this is random. If this is a + // problem, we can change that replacement policy to append new + // things to the end of the queue instead of replacing the entire + // queue. + FullResyncPeriod time.Duration + + // If true, when Process() returns an error, re-enqueue the object. + // TODO: add interface to let you inject a delay/backoff or drop + // the object completely if desired. Pass the object in + // question to this interface as a parameter. + RetryOnError bool +} + +// ProcessFunc processes a single object. +type ProcessFunc func(obj interface{}) error + +// Controller is a generic controller framework. +type Controller struct { + config Config +} + +// New makes a new Controller from the given Config. +func New(c *Config) *Controller { + ctlr := &Controller{ + config: *c, + } + return ctlr +} + +// Run begins processing items, and will continue until a value is sent down stopCh. +// It's an error to call Run more than once. +// Run does not block. +func (c *Controller) Run(stopCh <-chan struct{}) { + cache.NewReflector( + c.config.ListerWatcher, + c.config.ObjectType, + c.config.Queue, + c.config.FullResyncPeriod, + ).RunUntil(stopCh) + + go util.Until(c.processLoop, time.Second, stopCh) +} + +// processLoop drains the work queue. +// TODO: Consider doing the processing in parallel. This will require a little thought +// to make sure that we don't end up processing the same object multiple times +// concurrently. +func (c *Controller) processLoop() { + for { + obj := c.config.Queue.Pop() + err := c.config.Process(obj) + if err != nil { + if c.config.RetryOnError { + // This is the safe way to re-enqueue. + c.config.Queue.AddIfNotPresent(obj) + } + } + } +} diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go new file mode 100644 index 0000000000..1ec94ea71e --- /dev/null +++ b/pkg/controller/framework/controller_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework_test + +import ( + "fmt" + "sync" + "time" + // "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func Example() { + // source simulates an apiserver object endpoint. + source := framework.NewFakeControllerSource() + + // This will hold the downstream state, as we know it. + downstream := cache.NewStore(cache.MetaNamespaceKeyFunc) + + // This will hold incoming changes. Note how we pass downstream in as a + // KeyLister, that way resync operations will result in the correct set + // of update/delete deltas. + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream) + + // Let's do threadsafe output to get predictable test results. + outputSetLock := sync.Mutex{} + outputSet := util.StringSet{} + + cfg := &framework.Config{ + Queue: fifo, + ListerWatcher: source, + ObjectType: &api.Pod{}, + FullResyncPeriod: time.Millisecond * 100, + RetryOnError: false, + + // Let's implement a simple controller that just deletes + // everything that comes in. + Process: func(obj interface{}) error { + // Obj is from the Pop method of the Queue we make above. + newest := obj.(cache.Deltas).Newest() + + if newest.Type != cache.Deleted { + // Update our downstream store. + err := downstream.Add(newest.Object) + if err != nil { + return err + } + + source.Delete(newest.Object.(runtime.Object)) + } else { + // Update our downstream store. + err := downstream.Delete(newest.Object) + if err != nil { + return err + } + + // fifo's KeyOf is easiest, because it handles + // DeletedFinalStateUnknown markers. + key, err := fifo.KeyOf(newest.Object) + if err != nil { + return err + } + + // Record some output. + outputSetLock.Lock() + defer outputSetLock.Unlock() + outputSet.Insert(key) + } + return nil + }, + } + + // Create the controller and run it until we close stop. + stop := make(chan struct{}) + framework.New(cfg).Run(stop) + + // Let's add a few objects to the source. + for _, name := range []string{"a-hello", "b-controller", "c-framework"} { + // Note that these pods are not valid-- the fake source doesn't + // call validation or anything. + source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}}) + } + + // Let's wait for the controller to process the things we just added. + time.Sleep(500 * time.Millisecond) + close(stop) + + outputSetLock.Lock() + for _, key := range outputSet.List() { + fmt.Println(key) + } + // Output: + // a-hello + // b-controller + // c-framework +} diff --git a/pkg/controller/framework/doc.go b/pkg/controller/framework/doc.go new file mode 100644 index 0000000000..bbf9df798c --- /dev/null +++ b/pkg/controller/framework/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package framework implements all the grunt work involved in running a simple controller. +package framework diff --git a/pkg/controller/framework/fake_controller_source.go b/pkg/controller/framework/fake_controller_source.go new file mode 100644 index 0000000000..d7b863ad39 --- /dev/null +++ b/pkg/controller/framework/fake_controller_source.go @@ -0,0 +1,132 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "errors" + "strconv" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func NewFakeControllerSource() *FakeControllerSource { + return &FakeControllerSource{ + items: map[nnu]runtime.Object{}, + broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), + } +} + +// FakeControllerSource implements listing/watching for testing. +type FakeControllerSource struct { + lock sync.RWMutex + items map[nnu]runtime.Object + changes []watch.Event // one change per resourceVersion + broadcaster *watch.Broadcaster +} + +// namespace, name, uid to be used as a key. +type nnu struct { + namespace, name string + uid types.UID +} + +// Add adds an object to the set and sends an add event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) Add(obj runtime.Object) { + f.change(watch.Event{watch.Added, obj}) +} + +// Modify updates an object in the set and sends a modified event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) Modify(obj runtime.Object) { + f.change(watch.Event{watch.Modified, obj}) +} + +// Delete deletes an object from the set and sends a delete event to watchers. +// obj's ResourceVersion is set. +func (f *FakeControllerSource) Delete(lastValue runtime.Object) { + f.change(watch.Event{watch.Deleted, lastValue}) +} + +func (f *FakeControllerSource) key(meta *api.ObjectMeta) nnu { + return nnu{meta.Namespace, meta.Name, meta.UID} +} + +func (f *FakeControllerSource) change(e watch.Event) { + f.lock.Lock() + defer f.lock.Unlock() + + objMeta, err := api.ObjectMetaFor(e.Object) + if err != nil { + panic(err) // this is test code only + } + + resourceVersion := len(f.changes) + objMeta.ResourceVersion = strconv.Itoa(resourceVersion) + f.changes = append(f.changes, e) + key := f.key(objMeta) + switch e.Type { + case watch.Added, watch.Modified: + f.items[key] = e.Object + case watch.Deleted: + delete(f.items, key) + } + f.broadcaster.Action(e.Type, e.Object) +} + +// List returns a list object, with its resource version set. +func (f *FakeControllerSource) List() (runtime.Object, error) { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]runtime.Object, 0, len(f.items)) + for _, obj := range f.items { + // TODO: should copy obj first + list = append(list, obj) + } + listObj := &api.List{} + if err := runtime.SetList(listObj, list); err != nil { + return nil, err + } + objMeta, err := api.ListMetaFor(listObj) + if err != nil { + return nil, err + } + resourceVersion := len(f.changes) + objMeta.ResourceVersion = strconv.Itoa(resourceVersion) + return listObj, nil +} + +// Watch returns a watch, which will be pre-populated with all changes +// after resourceVersion. +func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, error) { + f.lock.RLock() + defer f.lock.RUnlock() + rc, err := strconv.Atoi(resourceVersion) + if err != nil { + return nil, err + } + if rc < len(f.changes) { + return f.broadcaster.WatchWithPrefix(f.changes[rc:]), nil + } else if rc > len(f.changes) { + return nil, errors.New("resource version in the future not supported by this fake") + } + return f.broadcaster.Watch(), nil +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 281ca27f57..66c0a347b6 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -88,10 +88,14 @@ func logError(err error) { } // Forever loops forever running f every period. Catches any panics, and keeps going. +// Deprecated. Please use Until and pass NeverStop as the stopCh. func Forever(f func(), period time.Duration) { Until(f, period, nil) } +// NeverStop may be passed to Until to make it never stop. +var NeverStop <-chan struct{} = make(chan struct{}) + // Until loops until stop channel is closed, running f every period. // Catches any panics, and keeps going. f may not be invoked if // stop channel is already closed. diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index ed5c09e512..1789a49d07 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -89,6 +89,32 @@ func (m *Broadcaster) Watch() Interface { return w } +// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends +// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster. +// The returned watch will have a queue length that is at least large enough to accomodate +// all of the items in queuedEvents. +func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface { + m.lock.Lock() + defer m.lock.Unlock() + id := m.nextWatcher + m.nextWatcher++ + length := m.watchQueueLength + if n := len(queuedEvents) + 1; n > length { + length = n + } + w := &broadcasterWatcher{ + result: make(chan Event, length), + stopped: make(chan struct{}), + id: id, + m: m, + } + m.watchers[id] = w + for _, e := range queuedEvents { + w.result <- e + } + return w +} + // stopWatching stops the given watcher and removes it from the list. func (m *Broadcaster) stopWatching(id int64) { m.lock.Lock() From 8ee9ee99208e2bb6f1b5c805432ddfa14d138ed8 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 2 Apr 2015 15:26:39 -0700 Subject: [PATCH 2/3] add ListKeys method to Store --- pkg/client/cache/store.go | 13 +++++++++++++ pkg/client/cache/undelta_store.go | 10 ++++++++++ 2 files changed, 23 insertions(+) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 47d70971c0..22e9ffd2ad 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -37,6 +37,7 @@ type Store interface { Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} + ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) @@ -195,6 +196,18 @@ func (c *cache) List() []interface{} { return list } +// ListKeys returns a list of all the keys of the objects currently +// in the cache. +func (c *cache) ListKeys() []string { + c.lock.RLock() + defer c.lock.RUnlock() + list := make([]string, 0, len(c.items)) + for key := range c.items { + list = append(list, key) + } + return list +} + // Index returns a list of items that match on the index function // Index is thread-safe so long as you treat all items as immutable func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) { diff --git a/pkg/client/cache/undelta_store.go b/pkg/client/cache/undelta_store.go index 70adda90c3..1c4c39ecb7 100644 --- a/pkg/client/cache/undelta_store.go +++ b/pkg/client/cache/undelta_store.go @@ -49,6 +49,7 @@ func (u *UndeltaStore) Add(obj interface{}) error { u.PushFunc(u.ActualStore.List()) return nil } + func (u *UndeltaStore) Update(obj interface{}) error { if err := u.ActualStore.Update(obj); err != nil { return err @@ -56,6 +57,7 @@ func (u *UndeltaStore) Update(obj interface{}) error { u.PushFunc(u.ActualStore.List()) return nil } + func (u *UndeltaStore) Delete(obj interface{}) error { if err := u.ActualStore.Delete(obj); err != nil { return err @@ -63,15 +65,23 @@ func (u *UndeltaStore) Delete(obj interface{}) error { u.PushFunc(u.ActualStore.List()) return nil } + func (u *UndeltaStore) List() []interface{} { return u.ActualStore.List() } + +func (u *UndeltaStore) ListKeys() []string { + return u.ActualStore.ListKeys() +} + func (u *UndeltaStore) Get(obj interface{}) (item interface{}, exists bool, err error) { return u.ActualStore.Get(obj) } + func (u *UndeltaStore) GetByKey(key string) (item interface{}, exists bool, err error) { return u.ActualStore.GetByKey(key) } + func (u *UndeltaStore) Replace(list []interface{}) error { if err := u.ActualStore.Replace(list); err != nil { return err From cc5ef8c5ad033badbe2bc5bff763801d7b60857d Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 2 Apr 2015 15:31:19 -0700 Subject: [PATCH 3/3] make undelta store use go's implementatio inheritance mechanism --- pkg/client/cache/undelta_store.go | 40 ++++++++++--------------------- 1 file changed, 12 insertions(+), 28 deletions(-) diff --git a/pkg/client/cache/undelta_store.go b/pkg/client/cache/undelta_store.go index 1c4c39ecb7..5e0724de0b 100644 --- a/pkg/client/cache/undelta_store.go +++ b/pkg/client/cache/undelta_store.go @@ -24,8 +24,8 @@ package cache // in one call to PushFunc, but sometimes PushFunc may be called twice with the same values. // PushFunc should be thread safe. type UndeltaStore struct { - ActualStore Store - PushFunc func([]interface{}) + Store + PushFunc func([]interface{}) } // Assert that it implements the Store interface. @@ -43,57 +43,41 @@ var _ Store = &UndeltaStore{} // 5 Store.List() -> [a,b] func (u *UndeltaStore) Add(obj interface{}) error { - if err := u.ActualStore.Add(obj); err != nil { + if err := u.Store.Add(obj); err != nil { return err } - u.PushFunc(u.ActualStore.List()) + u.PushFunc(u.Store.List()) return nil } func (u *UndeltaStore) Update(obj interface{}) error { - if err := u.ActualStore.Update(obj); err != nil { + if err := u.Store.Update(obj); err != nil { return err } - u.PushFunc(u.ActualStore.List()) + u.PushFunc(u.Store.List()) return nil } func (u *UndeltaStore) Delete(obj interface{}) error { - if err := u.ActualStore.Delete(obj); err != nil { + if err := u.Store.Delete(obj); err != nil { return err } - u.PushFunc(u.ActualStore.List()) + u.PushFunc(u.Store.List()) return nil } -func (u *UndeltaStore) List() []interface{} { - return u.ActualStore.List() -} - -func (u *UndeltaStore) ListKeys() []string { - return u.ActualStore.ListKeys() -} - -func (u *UndeltaStore) Get(obj interface{}) (item interface{}, exists bool, err error) { - return u.ActualStore.Get(obj) -} - -func (u *UndeltaStore) GetByKey(key string) (item interface{}, exists bool, err error) { - return u.ActualStore.GetByKey(key) -} - func (u *UndeltaStore) Replace(list []interface{}) error { - if err := u.ActualStore.Replace(list); err != nil { + if err := u.Store.Replace(list); err != nil { return err } - u.PushFunc(u.ActualStore.List()) + u.PushFunc(u.Store.List()) return nil } // NewUndeltaStore returns an UndeltaStore implemented with a Store. func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore { return &UndeltaStore{ - ActualStore: NewStore(keyFunc), - PushFunc: pushFunc, + Store: NewStore(keyFunc), + PushFunc: pushFunc, } }