Merge pull request #5270 from lavalamp/fix7

Controller framework
pull/6/head
Derek Carr 2015-04-07 16:58:09 -04:00
commit 6eb54e73e0
8 changed files with 427 additions and 21 deletions

View File

@ -37,6 +37,7 @@ type Store interface {
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
@ -196,6 +197,18 @@ func (c *cache) List() []interface{} {
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the cache.
func (c *cache) ListKeys() []string {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]string, 0, len(c.items))
for key := range c.items {
list = append(list, key)
}
return list
}
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {

View File

@ -24,7 +24,7 @@ package cache
// in one call to PushFunc, but sometimes PushFunc may be called twice with the same values.
// PushFunc should be thread safe.
type UndeltaStore struct {
ActualStore Store
Store
PushFunc func([]interface{})
}
@ -43,47 +43,41 @@ var _ Store = &UndeltaStore{}
// 5 Store.List() -> [a,b]
func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.ActualStore.Add(obj); err != nil {
if err := u.Store.Add(obj); err != nil {
return err
}
u.PushFunc(u.ActualStore.List())
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Update(obj interface{}) error {
if err := u.ActualStore.Update(obj); err != nil {
if err := u.Store.Update(obj); err != nil {
return err
}
u.PushFunc(u.ActualStore.List())
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) Delete(obj interface{}) error {
if err := u.ActualStore.Delete(obj); err != nil {
if err := u.Store.Delete(obj); err != nil {
return err
}
u.PushFunc(u.ActualStore.List())
u.PushFunc(u.Store.List())
return nil
}
func (u *UndeltaStore) List() []interface{} {
return u.ActualStore.List()
}
func (u *UndeltaStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
return u.ActualStore.Get(obj)
}
func (u *UndeltaStore) GetByKey(key string) (item interface{}, exists bool, err error) {
return u.ActualStore.GetByKey(key)
}
func (u *UndeltaStore) Replace(list []interface{}) error {
if err := u.ActualStore.Replace(list); err != nil {
if err := u.Store.Replace(list); err != nil {
return err
}
u.PushFunc(u.ActualStore.List())
u.PushFunc(u.Store.List())
return nil
}
// NewUndeltaStore returns an UndeltaStore implemented with a Store.
func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore {
return &UndeltaStore{
ActualStore: NewStore(keyFunc),
Store: NewStore(keyFunc),
PushFunc: pushFunc,
}
}

View File

@ -0,0 +1,104 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects; either a cache.FIFO or
// a cache.DeltaFIFO. Your Process() function should accept
// the output of this Oueue's Pop() method.
cache.Queue
// Something that can list and watch your objects.
cache.ListerWatcher
// Something that can process your objects.
Process ProcessFunc
// The type of your objects.
ObjectType runtime.Object
// Reprocess everything at least this often.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by cache.FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
FullResyncPeriod time.Duration
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter.
RetryOnError bool
}
// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework.
type Controller struct {
config Config
}
// New makes a new Controller from the given Config.
func New(c *Config) *Controller {
ctlr := &Controller{
config: *c,
}
return ctlr
}
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run does not block.
func (c *Controller) Run(stopCh <-chan struct{}) {
cache.NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
).RunUntil(stopCh)
go util.Until(c.processLoop, time.Second, stopCh)
}
// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
func (c *Controller) processLoop() {
for {
obj := c.config.Queue.Pop()
err := c.config.Process(obj)
if err != nil {
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework_test
import (
"fmt"
"sync"
"time"
// "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func Example() {
// source simulates an apiserver object endpoint.
source := framework.NewFakeControllerSource()
// This will hold the downstream state, as we know it.
downstream := cache.NewStore(cache.MetaNamespaceKeyFunc)
// This will hold incoming changes. Note how we pass downstream in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream)
// Let's do threadsafe output to get predictable test results.
outputSetLock := sync.Mutex{}
outputSet := util.StringSet{}
cfg := &framework.Config{
Queue: fifo,
ListerWatcher: source,
ObjectType: &api.Pod{},
FullResyncPeriod: time.Millisecond * 100,
RetryOnError: false,
// Let's implement a simple controller that just deletes
// everything that comes in.
Process: func(obj interface{}) error {
// Obj is from the Pop method of the Queue we make above.
newest := obj.(cache.Deltas).Newest()
if newest.Type != cache.Deleted {
// Update our downstream store.
err := downstream.Add(newest.Object)
if err != nil {
return err
}
source.Delete(newest.Object.(runtime.Object))
} else {
// Update our downstream store.
err := downstream.Delete(newest.Object)
if err != nil {
return err
}
// fifo's KeyOf is easiest, because it handles
// DeletedFinalStateUnknown markers.
key, err := fifo.KeyOf(newest.Object)
if err != nil {
return err
}
// Record some output.
outputSetLock.Lock()
defer outputSetLock.Unlock()
outputSet.Insert(key)
}
return nil
},
}
// Create the controller and run it until we close stop.
stop := make(chan struct{})
framework.New(cfg).Run(stop)
// Let's add a few objects to the source.
for _, name := range []string{"a-hello", "b-controller", "c-framework"} {
// Note that these pods are not valid-- the fake source doesn't
// call validation or anything.
source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
}
// Let's wait for the controller to process the things we just added.
time.Sleep(500 * time.Millisecond)
close(stop)
outputSetLock.Lock()
for _, key := range outputSet.List() {
fmt.Println(key)
}
// Output:
// a-hello
// b-controller
// c-framework
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package framework implements all the grunt work involved in running a simple controller.
package framework

View File

@ -0,0 +1,132 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"errors"
"strconv"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func NewFakeControllerSource() *FakeControllerSource {
return &FakeControllerSource{
items: map[nnu]runtime.Object{},
broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
}
}
// FakeControllerSource implements listing/watching for testing.
type FakeControllerSource struct {
lock sync.RWMutex
items map[nnu]runtime.Object
changes []watch.Event // one change per resourceVersion
broadcaster *watch.Broadcaster
}
// namespace, name, uid to be used as a key.
type nnu struct {
namespace, name string
uid types.UID
}
// Add adds an object to the set and sends an add event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) Add(obj runtime.Object) {
f.change(watch.Event{watch.Added, obj})
}
// Modify updates an object in the set and sends a modified event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) Modify(obj runtime.Object) {
f.change(watch.Event{watch.Modified, obj})
}
// Delete deletes an object from the set and sends a delete event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) Delete(lastValue runtime.Object) {
f.change(watch.Event{watch.Deleted, lastValue})
}
func (f *FakeControllerSource) key(meta *api.ObjectMeta) nnu {
return nnu{meta.Namespace, meta.Name, meta.UID}
}
func (f *FakeControllerSource) change(e watch.Event) {
f.lock.Lock()
defer f.lock.Unlock()
objMeta, err := api.ObjectMetaFor(e.Object)
if err != nil {
panic(err) // this is test code only
}
resourceVersion := len(f.changes)
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
f.changes = append(f.changes, e)
key := f.key(objMeta)
switch e.Type {
case watch.Added, watch.Modified:
f.items[key] = e.Object
case watch.Deleted:
delete(f.items, key)
}
f.broadcaster.Action(e.Type, e.Object)
}
// List returns a list object, with its resource version set.
func (f *FakeControllerSource) List() (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]runtime.Object, 0, len(f.items))
for _, obj := range f.items {
// TODO: should copy obj first
list = append(list, obj)
}
listObj := &api.List{}
if err := runtime.SetList(listObj, list); err != nil {
return nil, err
}
objMeta, err := api.ListMetaFor(listObj)
if err != nil {
return nil, err
}
resourceVersion := len(f.changes)
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
return listObj, nil
}
// Watch returns a watch, which will be pre-populated with all changes
// after resourceVersion.
func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, error) {
f.lock.RLock()
defer f.lock.RUnlock()
rc, err := strconv.Atoi(resourceVersion)
if err != nil {
return nil, err
}
if rc < len(f.changes) {
return f.broadcaster.WatchWithPrefix(f.changes[rc:]), nil
} else if rc > len(f.changes) {
return nil, errors.New("resource version in the future not supported by this fake")
}
return f.broadcaster.Watch(), nil
}

View File

@ -88,10 +88,14 @@ func logError(err error) {
}
// Forever loops forever running f every period. Catches any panics, and keeps going.
// Deprecated. Please use Until and pass NeverStop as the stopCh.
func Forever(f func(), period time.Duration) {
Until(f, period, nil)
}
// NeverStop may be passed to Until to make it never stop.
var NeverStop <-chan struct{} = make(chan struct{})
// Until loops until stop channel is closed, running f every period.
// Catches any panics, and keeps going. f may not be invoked if
// stop channel is already closed.

View File

@ -89,6 +89,32 @@ func (m *Broadcaster) Watch() Interface {
return w
}
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
// queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
// The returned watch will have a queue length that is at least large enough to accomodate
// all of the items in queuedEvents.
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
length := m.watchQueueLength
if n := len(queuedEvents) + 1; n > length {
length = n
}
w := &broadcasterWatcher{
result: make(chan Event, length),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
for _, e := range queuedEvents {
w.result <- e
}
return w
}
// stopWatching stops the given watcher and removes it from the list.
func (m *Broadcaster) stopWatching(id int64) {
m.lock.Lock()