Merge pull request #16502 from wojtek-t/list_from_memory

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-10-30 19:47:10 -07:00
commit d1345a1f4f
4 changed files with 69 additions and 45 deletions

View File

@ -45,6 +45,11 @@ type CacherConfig struct {
// An underlying storage.Versioner.
Versioner Versioner
// Whether to serve Lists from in-memory cache.
//
// NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE!
ListFromCache bool
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
Type interface{}
@ -99,6 +104,11 @@ type Cacher struct {
// keyFunc is used to get a key in the underyling storage for a given object.
keyFunc func(runtime.Object) (string, error)
// Whether to serve Lists from in-memory cache.
//
// NOTE: DO NOT SET TO TRUE IN PRODUCTION CODE!
ListFromCache bool
}
// Create a new Cacher responsible from service WATCH and LIST requests from its
@ -109,14 +119,15 @@ func NewCacher(config CacherConfig) *Cacher {
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
cacher := &Cacher{
usable: sync.RWMutex{},
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
watcherIdx: 0,
watchers: make(map[int]*cacheWatcher),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
usable: sync.RWMutex{},
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
watcherIdx: 0,
watchers: make(map[int]*cacheWatcher),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
ListFromCache: config.ListFromCache,
}
cacher.usable.Lock()
// See startCaching method for why explanation on it.
@ -220,21 +231,19 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error {
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
}
if !c.ListFromCache {
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
}
// ListFromMemory implements list operation (the same signature as List method)
// but it serves the contents from memory.
// Current we cannot use ListFromMemory() instead of List(), because it only
// guarantees eventual consistency (e.g. it's possible for Get called right after
// Create to return not-exist, before the change is propagate).
// TODO: We may consider changing to use ListFromMemory in the future, but this
// requires wider discussion as an "api semantic change".
func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error {
// Do NOT allow Watch to start when the underlying structures are not propagated.
// To avoid situation when List is proceesed before the underlying
// watchCache is propagated for the first time, we acquire and immediately
// release the 'usable' lock.
// We don't need to hold it all the time, because watchCache is thread-safe
// and it would complicate already very difficult locking pattern.
c.usable.RLock()
defer c.usable.RUnlock()
c.usable.RUnlock()
// List elements from cache, with at least 'resourceVersion'.
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
@ -243,15 +252,15 @@ func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error {
if err != nil || listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterFunction(key, c.keyFunc, Everything)
filterFunc := filterFunction(key, c.keyFunc, filter)
objs, resourceVersion := c.watchCache.ListWithVersion()
objs, resourceVersion := c.watchCache.WaitUntilFreshAndList(resourceVersion)
for _, obj := range objs {
object, ok := obj.(runtime.Object)
if !ok {
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
}
if filter(object) {
if filterFunc(object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
}
}

View File

@ -37,7 +37,6 @@ import (
"k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"golang.org/x/net/context"
@ -47,8 +46,9 @@ func newTestCacher(client tools.EtcdClient) *storage.Cacher {
prefix := "pods"
config := storage.CacherConfig{
CacheCapacity: 10,
Versioner: etcdstorage.APIObjectVersioner{},
Storage: etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix()),
Versioner: etcdstorage.APIObjectVersioner{},
ListFromCache: true,
Type: &api.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
@ -65,18 +65,7 @@ func makeTestPod(name string) *api.Pod {
}
}
func waitForUpToDateCache(cacher *storage.Cacher, resourceVersion uint64) error {
ready := func() (bool, error) {
result, err := cacher.LastSyncResourceVersion()
if err != nil {
return false, err
}
return result == resourceVersion, nil
}
return wait.Poll(10*time.Millisecond, util.ForeverTestTimeout, ready)
}
func TestListFromMemory(t *testing.T) {
func TestList(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
prefixedKey := etcdtest.AddPrefix("pods")
fakeClient.ExpectNotFoundGet(prefixedKey)
@ -146,12 +135,9 @@ func TestListFromMemory(t *testing.T) {
for _, test := range testCases {
fakeClient.WatchResponse <- test
}
if err := waitForUpToDateCache(cacher, 5); err != nil {
t.Errorf("watch cache didn't propagated correctly: %v", err)
}
result := &api.PodList{}
if err := cacher.ListFromMemory("pods/ns", result); err != nil {
if err := cacher.List(context.TODO(), "pods/ns", 5, storage.Everything, result); err != nil {
t.Errorf("unexpected error: %v", err)
}
if result.ListMeta.ResourceVersion != "5" {

View File

@ -55,6 +55,10 @@ type watchCacheElement struct {
type watchCache struct {
sync.RWMutex
// Condition on which lists are waiting for the fresh enough
// resource version.
cond *sync.Cond
// Maximum size of history window.
capacity int
@ -84,7 +88,7 @@ type watchCache struct {
}
func newWatchCache(capacity int) *watchCache {
return &watchCache{
wc := &watchCache{
capacity: capacity,
cache: make([]watchCacheElement, capacity),
startIndex: 0,
@ -92,6 +96,8 @@ func newWatchCache(capacity int) *watchCache {
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
resourceVersion: 0,
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
}
func (w *watchCache) Add(obj interface{}) error {
@ -169,6 +175,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
}
w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion
w.cond.Broadcast()
return updateFunc(event.Object)
}
@ -188,8 +195,11 @@ func (w *watchCache) List() []interface{} {
return w.store.List()
}
func (w *watchCache) ListWithVersion() ([]interface{}, uint64) {
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) {
w.RLock()
for w.resourceVersion < resourceVersion {
w.cond.Wait()
}
defer w.RUnlock()
return w.store.List(), w.resourceVersion
}
@ -230,6 +240,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
if w.onReplace != nil {
w.onReplace()
}
w.cond.Broadcast()
return nil
}

View File

@ -230,6 +230,24 @@ func TestEvents(t *testing.T) {
}
}
func TestWaitUntilFreshAndList(t *testing.T) {
store := newWatchCache(3)
// In background, update the store.
go func() {
store.Add(makeTestPod("foo", 2))
store.Add(makeTestPod("bar", 5))
}()
list, resourceVersion := store.WaitUntilFreshAndList(4)
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if len(list) != 2 {
t.Errorf("unexpected list returned: %#v", list)
}
}
type testLW struct {
ListFunc func() (runtime.Object, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error)
@ -244,7 +262,7 @@ func TestReflectorForWatchCache(t *testing.T) {
store := newWatchCache(5)
{
_, version := store.ListWithVersion()
_, version := store.WaitUntilFreshAndList(0)
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
@ -264,7 +282,7 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(util.NeverStop)
{
_, version := store.ListWithVersion()
_, version := store.WaitUntilFreshAndList(10)
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}