2015-03-30 18:17:16 +00:00
|
|
|
/*
|
2015-05-01 16:19:44 +00:00
|
|
|
Copyright 2014 The Kubernetes Authors All rights reserved.
|
2015-03-30 18:17:16 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
|
|
|
)
|
|
|
|
|
|
|
|
// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
|
|
|
|
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
|
|
|
|
// the indexing feature in addition to not being thread safe.
|
|
|
|
//
|
|
|
|
// The guarantees of thread safety provided by List/Get are only valid if the caller
|
|
|
|
// treats returned items as read-only. For example, a pointer inserted in the store
|
|
|
|
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
|
|
|
|
// on the same key and modify the pointer in a non-thread-safe way. Also note that
|
|
|
|
// modifying objects stored by the indexers (if any) will *not* automatically lead
|
|
|
|
// to a re-index. So it's not a good idea to directly modify the objects returned by
|
|
|
|
// Get/List, in general.
|
|
|
|
type ThreadSafeStore interface {
|
|
|
|
Add(key string, obj interface{})
|
|
|
|
Update(key string, obj interface{})
|
|
|
|
Delete(key string)
|
|
|
|
Get(key string) (item interface{}, exists bool)
|
|
|
|
List() []interface{}
|
|
|
|
ListKeys() []string
|
|
|
|
Replace(map[string]interface{})
|
|
|
|
Index(indexName string, obj interface{}) ([]interface{}, error)
|
2015-07-23 14:00:04 +00:00
|
|
|
ListIndexFuncValues(name string) []string
|
2015-03-30 18:17:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// threadSafeMap implements ThreadSafeStore
|
|
|
|
type threadSafeMap struct {
|
|
|
|
lock sync.RWMutex
|
|
|
|
items map[string]interface{}
|
|
|
|
|
|
|
|
// indexers maps a name to an IndexFunc
|
|
|
|
indexers Indexers
|
|
|
|
// indices maps a name to an Index
|
|
|
|
indices Indices
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *threadSafeMap) Add(key string, obj interface{}) {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
oldObject := c.items[key]
|
|
|
|
c.items[key] = obj
|
|
|
|
c.updateIndices(oldObject, obj, key)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *threadSafeMap) Update(key string, obj interface{}) {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
oldObject := c.items[key]
|
|
|
|
c.items[key] = obj
|
|
|
|
c.updateIndices(oldObject, obj, key)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *threadSafeMap) Delete(key string) {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
if obj, exists := c.items[key]; exists {
|
|
|
|
c.deleteFromIndices(obj, key)
|
|
|
|
delete(c.items, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
|
|
|
|
c.lock.RLock()
|
|
|
|
defer c.lock.RUnlock()
|
|
|
|
item, exists = c.items[key]
|
|
|
|
return item, exists
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *threadSafeMap) List() []interface{} {
|
|
|
|
c.lock.RLock()
|
|
|
|
defer c.lock.RUnlock()
|
|
|
|
list := make([]interface{}, 0, len(c.items))
|
|
|
|
for _, item := range c.items {
|
|
|
|
list = append(list, item)
|
|
|
|
}
|
|
|
|
return list
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListKeys returns a list of all the keys of the objects currently
|
|
|
|
// in the threadSafeMap.
|
|
|
|
func (c *threadSafeMap) ListKeys() []string {
|
|
|
|
c.lock.RLock()
|
|
|
|
defer c.lock.RUnlock()
|
|
|
|
list := make([]string, 0, len(c.items))
|
|
|
|
for key := range c.items {
|
|
|
|
list = append(list, key)
|
|
|
|
}
|
|
|
|
return list
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *threadSafeMap) Replace(items map[string]interface{}) {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
c.items = items
|
|
|
|
|
|
|
|
// rebuild any index
|
|
|
|
c.indices = Indices{}
|
|
|
|
for key, item := range c.items {
|
|
|
|
c.updateIndices(nil, item, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Index returns a list of items that match on the index function
|
|
|
|
// Index is thread-safe so long as you treat all items as immutable
|
|
|
|
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
|
|
|
|
c.lock.RLock()
|
|
|
|
defer c.lock.RUnlock()
|
|
|
|
|
|
|
|
indexFunc := c.indexers[indexName]
|
|
|
|
if indexFunc == nil {
|
|
|
|
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
|
|
|
}
|
|
|
|
|
|
|
|
indexKey, err := indexFunc(obj)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
index := c.indices[indexName]
|
|
|
|
set := index[indexKey]
|
|
|
|
list := make([]interface{}, 0, set.Len())
|
|
|
|
for _, key := range set.List() {
|
|
|
|
list = append(list, c.items[key])
|
|
|
|
}
|
|
|
|
return list, nil
|
|
|
|
}
|
|
|
|
|
2015-07-23 14:00:04 +00:00
|
|
|
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
|
|
|
|
index := c.indices[indexName]
|
|
|
|
names := make([]string, 0, len(index))
|
|
|
|
for key := range index {
|
|
|
|
names = append(names, key)
|
|
|
|
}
|
|
|
|
return names
|
|
|
|
}
|
|
|
|
|
2015-03-30 18:17:16 +00:00
|
|
|
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
|
|
|
|
// updateIndices must be called from a function that already has a lock on the cache
|
|
|
|
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
|
|
|
|
// if we got an old object, we need to remove it before we add it again
|
|
|
|
if oldObj != nil {
|
|
|
|
c.deleteFromIndices(oldObj, key)
|
|
|
|
}
|
|
|
|
for name, indexFunc := range c.indexers {
|
|
|
|
indexValue, err := indexFunc(newObj)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
index := c.indices[name]
|
|
|
|
if index == nil {
|
|
|
|
index = Index{}
|
|
|
|
c.indices[name] = index
|
|
|
|
}
|
|
|
|
set := index[indexValue]
|
|
|
|
if set == nil {
|
|
|
|
set = util.StringSet{}
|
|
|
|
index[indexValue] = set
|
|
|
|
}
|
|
|
|
set.Insert(key)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// deleteFromIndices removes the object from each of the managed indexes
|
|
|
|
// it is intended to be called from a function that already has a lock on the cache
|
|
|
|
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
|
|
|
|
for name, indexFunc := range c.indexers {
|
|
|
|
indexValue, err := indexFunc(obj)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
index := c.indices[name]
|
|
|
|
if index != nil {
|
|
|
|
set := index[indexValue]
|
|
|
|
if set != nil {
|
|
|
|
set.Delete(key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
|
|
|
|
return &threadSafeMap{
|
|
|
|
items: map[string]interface{}{},
|
|
|
|
indexers: indexers,
|
|
|
|
indices: Indices{},
|
|
|
|
}
|
|
|
|
}
|