Enable look-up by secondary index in cache

pull/6/head
derekwaynecarr 2015-02-04 15:47:47 -05:00
parent 25659cf1b3
commit 7a2d63048d
2 changed files with 144 additions and 0 deletions

View File

@ -21,6 +21,7 @@ import (
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
// Store is a generic object storage interface. Reflector knows how to watch a server // Store is a generic object storage interface. Reflector knows how to watch a server
@ -59,12 +60,35 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
return meta.Namespace() + "/" + meta.Name(), nil return meta.Namespace() + "/" + meta.Name(), nil
} }
// Index is a generic object storage interface that lets you list objects by their Index
type Index interface {
Store
Index(obj interface{}) ([]interface{}, error)
}
// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) (string, error)
// MetaNamespaceIndexFunc is a convenient default IndexFun which knows how to index
// an object by its namespace.
func MetaNamespaceIndexFunc(obj interface{}) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
return meta.Namespace(), nil
}
type cache struct { type cache struct {
lock sync.RWMutex lock sync.RWMutex
items map[string]interface{} items map[string]interface{}
// keyFunc is used to make the key for objects stored in and retrieved from items, and // keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic. // should be deterministic.
keyFunc KeyFunc keyFunc KeyFunc
// indexFunc is used to make the index value for objects stored in an retrieved from index
indexFunc IndexFunc
// maps the indexFunc value for an object to a set whose keys are keys in items
index map[string]util.StringSet
} }
// Add inserts an item into the cache. // Add inserts an item into the cache.
@ -76,6 +100,53 @@ func (c *cache) Add(obj interface{}) error {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
c.items[key] = obj c.items[key] = obj
c.updateIndex(obj)
return nil
}
// updateIndex adds or modifies an object in the index
// it is intended to be called from a function that already has a lock on the cache
func (c *cache) updateIndex(obj interface{}) error {
if c.indexFunc == nil {
return nil
}
key, err := c.keyFunc(obj)
if err != nil {
return err
}
indexValue, err := c.indexFunc(obj)
if err != nil {
return err
}
set := c.index[indexValue]
if set == nil {
set = util.StringSet{}
c.index[indexValue] = set
}
set.Insert(key)
return nil
}
// deleteFromIndex removes an entry from the index
// it is intended to be called from a function that already has a lock on the cache
func (c *cache) deleteFromIndex(obj interface{}) error {
if c.indexFunc == nil {
return nil
}
key, err := c.keyFunc(obj)
if err != nil {
return err
}
indexValue, err := c.indexFunc(obj)
if err != nil {
return err
}
set := c.index[indexValue]
if set == nil {
set = util.StringSet{}
c.index[indexValue] = set
}
set.Delete(key)
return nil return nil
} }
@ -88,6 +159,7 @@ func (c *cache) Update(obj interface{}) error {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
c.items[key] = obj c.items[key] = obj
c.updateIndex(obj)
return nil return nil
} }
@ -100,6 +172,7 @@ func (c *cache) Delete(obj interface{}) error {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
delete(c.items, key) delete(c.items, key)
c.deleteFromIndex(obj)
return nil return nil
} }
@ -115,6 +188,24 @@ func (c *cache) List() []interface{} {
return list return list
} }
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
indexKey, err := c.indexFunc(obj)
if err != nil {
return nil, err
}
set := c.index[indexKey]
list := make([]interface{}, 0, set.Len())
for _, key := range set.List() {
list = append(list, c.items[key])
}
return list, nil
}
// Get returns the requested item, or sets exists=false. // Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable. // Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
@ -150,6 +241,13 @@ func (c *cache) Replace(list []interface{}) error {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
c.items = items c.items = items
// rebuild any index
c.index = map[string]util.StringSet{}
for _, item := range c.items {
c.updateIndex(item)
}
return nil return nil
} }
@ -157,3 +255,8 @@ func (c *cache) Replace(list []interface{}) error {
func NewStore(keyFunc KeyFunc) Store { func NewStore(keyFunc KeyFunc) Store {
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc} return &cache{items: map[string]interface{}{}, keyFunc: keyFunc}
} }
// NewIndex returns an Index implemented simply with a map and a lock.
func NewIndex(keyFunc KeyFunc, indexFunc IndexFunc) Index {
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexFunc: indexFunc, index: map[string]util.StringSet{}}
}

View File

@ -86,10 +86,47 @@ func doTestStore(t *testing.T, store Store) {
} }
} }
// Test public interface
func doTestIndex(t *testing.T, index Index) {
mkObj := func(id string, val string) testStoreObject {
return testStoreObject{id: id, val: val}
}
// Test Index
expected := map[string]util.StringSet{}
expected["b"] = util.NewStringSet("a", "c")
expected["f"] = util.NewStringSet("e")
expected["h"] = util.NewStringSet("g")
index.Add(mkObj("a", "b"))
index.Add(mkObj("c", "b"))
index.Add(mkObj("e", "f"))
index.Add(mkObj("g", "h"))
{
for k, v := range expected {
found := util.StringSet{}
indexResults, err := index.Index(mkObj("", k))
if err != nil {
t.Errorf("Unexpected error %v", err)
}
for _, item := range indexResults {
found.Insert(item.(testStoreObject).id)
}
items := v.List()
if !found.HasAll(items...) {
t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List())
}
}
}
}
func testStoreKeyFunc(obj interface{}) (string, error) { func testStoreKeyFunc(obj interface{}) (string, error) {
return obj.(testStoreObject).id, nil return obj.(testStoreObject).id, nil
} }
func testStoreIndexFunc(obj interface{}) (string, error) {
return obj.(testStoreObject).val, nil
}
type testStoreObject struct { type testStoreObject struct {
id string id string
val string val string
@ -107,3 +144,7 @@ func TestUndeltaStore(t *testing.T) {
nop := func([]interface{}) {} nop := func([]interface{}) {}
doTestStore(t, NewUndeltaStore(nop, testStoreKeyFunc)) doTestStore(t, NewUndeltaStore(nop, testStoreKeyFunc))
} }
func TestIndex(t *testing.T) {
doTestIndex(t, NewIndex(testStoreKeyFunc, testStoreIndexFunc))
}