2014-08-03 07:00:42 +00:00
|
|
|
/*
|
2015-05-01 16:19:44 +00:00
|
|
|
Copyright 2014 The Kubernetes Authors All rights reserved.
|
2014-08-03 07:00:42 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
2015-03-05 21:38:31 +00:00
|
|
|
// Queue is exactly like a Store, but has a Pop() method too.
|
|
|
|
type Queue interface {
|
|
|
|
Store
|
2015-03-09 21:48:59 +00:00
|
|
|
|
2015-03-05 21:38:31 +00:00
|
|
|
// Pop blocks until it has something to return.
|
|
|
|
Pop() interface{}
|
2015-03-09 21:48:59 +00:00
|
|
|
|
|
|
|
// AddIfNotPresent adds a value previously
|
|
|
|
// returned by Pop back into the queue as long
|
|
|
|
// as nothing else (presumably more recent)
|
|
|
|
// has since been added.
|
|
|
|
AddIfNotPresent(interface{}) error
|
2016-02-03 03:03:31 +00:00
|
|
|
|
|
|
|
// Return true if the first batch of items has been popped
|
|
|
|
HasSynced() bool
|
2015-03-05 21:38:31 +00:00
|
|
|
}
|
|
|
|
|
2014-08-05 20:53:52 +00:00
|
|
|
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
|
2014-08-03 22:36:36 +00:00
|
|
|
// FIFO order processing. If multiple adds/updates of a single item happen while
|
|
|
|
// an item is in the queue before it has been processed, it will only be
|
|
|
|
// processed once, and when it is processed, the most recent version will be
|
|
|
|
// processed. This can't be done with a channel.
|
2015-03-05 21:38:31 +00:00
|
|
|
//
|
|
|
|
// FIFO solves this use case:
|
|
|
|
// * You want to process every object (exactly) once.
|
|
|
|
// * You want to process the most recent version of the object when you process it.
|
|
|
|
// * You do not want to process deleted objects, they should be removed from the queue.
|
|
|
|
// * You do not want to periodically reprocess objects.
|
|
|
|
// Compare with DeltaFIFO for other use cases.
|
2014-08-03 07:00:42 +00:00
|
|
|
type FIFO struct {
|
2014-10-21 14:39:58 +00:00
|
|
|
lock sync.RWMutex
|
|
|
|
cond sync.Cond
|
|
|
|
// We depend on the property that items in the set are in the queue and vice versa.
|
2014-08-03 07:00:42 +00:00
|
|
|
items map[string]interface{}
|
|
|
|
queue []string
|
2016-02-03 03:03:31 +00:00
|
|
|
|
|
|
|
// populated is true if the first batch of items inserted by Replace() has been populated
|
|
|
|
// or Delete/Add/Update was called first.
|
|
|
|
populated bool
|
|
|
|
// initialPopulationCount is the number of items inserted by the first call of Replace()
|
|
|
|
initialPopulationCount int
|
|
|
|
|
2015-01-26 21:44:53 +00:00
|
|
|
// keyFunc is used to make the key used for queued item insertion and retrieval, and
|
|
|
|
// should be deterministic.
|
|
|
|
keyFunc KeyFunc
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
2015-03-05 21:38:31 +00:00
|
|
|
var (
|
|
|
|
_ = Queue(&FIFO{}) // FIFO is a Queue
|
|
|
|
)
|
|
|
|
|
2016-02-03 03:03:31 +00:00
|
|
|
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
|
|
|
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
|
|
|
func (f *FIFO) HasSynced() bool {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
return f.populated && f.initialPopulationCount == 0
|
|
|
|
}
|
|
|
|
|
2014-10-21 14:39:58 +00:00
|
|
|
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
|
|
|
// if it doesn't already exist in the set.
|
2015-01-26 21:44:53 +00:00
|
|
|
func (f *FIFO) Add(obj interface{}) error {
|
|
|
|
id, err := f.keyFunc(obj)
|
|
|
|
if err != nil {
|
2015-03-26 20:28:05 +00:00
|
|
|
return KeyError{obj, err}
|
2015-01-26 21:44:53 +00:00
|
|
|
}
|
2014-08-03 07:00:42 +00:00
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
2016-02-03 03:03:31 +00:00
|
|
|
f.populated = true
|
2014-10-21 14:39:58 +00:00
|
|
|
if _, exists := f.items[id]; !exists {
|
|
|
|
f.queue = append(f.queue, id)
|
|
|
|
}
|
2014-08-18 21:47:20 +00:00
|
|
|
f.items[id] = obj
|
2014-08-03 07:00:42 +00:00
|
|
|
f.cond.Broadcast()
|
2015-01-26 21:44:53 +00:00
|
|
|
return nil
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
2015-02-19 21:58:37 +00:00
|
|
|
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
|
|
|
|
// present in the set, it is neither enqueued nor added to the set.
|
|
|
|
//
|
|
|
|
// This is useful in a single producer/consumer scenario so that the consumer can
|
|
|
|
// safely retry items without contending with the producer and potentially enqueueing
|
|
|
|
// stale items.
|
|
|
|
func (f *FIFO) AddIfNotPresent(obj interface{}) error {
|
|
|
|
id, err := f.keyFunc(obj)
|
|
|
|
if err != nil {
|
2015-03-26 20:28:05 +00:00
|
|
|
return KeyError{obj, err}
|
2015-02-19 21:58:37 +00:00
|
|
|
}
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
2016-02-03 03:03:31 +00:00
|
|
|
f.populated = true
|
2015-02-19 21:58:37 +00:00
|
|
|
if _, exists := f.items[id]; exists {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
f.queue = append(f.queue, id)
|
|
|
|
f.items[id] = obj
|
|
|
|
f.cond.Broadcast()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-10-21 14:39:58 +00:00
|
|
|
// Update is the same as Add in this implementation.
|
2015-01-26 21:44:53 +00:00
|
|
|
func (f *FIFO) Update(obj interface{}) error {
|
|
|
|
return f.Add(obj)
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Delete removes an item. It doesn't add it to the queue, because
|
|
|
|
// this implementation assumes the consumer only cares about the objects,
|
|
|
|
// not the order in which they were created/added.
|
2015-01-26 21:44:53 +00:00
|
|
|
func (f *FIFO) Delete(obj interface{}) error {
|
|
|
|
id, err := f.keyFunc(obj)
|
|
|
|
if err != nil {
|
2015-03-26 20:28:05 +00:00
|
|
|
return KeyError{obj, err}
|
2015-01-26 21:44:53 +00:00
|
|
|
}
|
2014-08-03 07:00:42 +00:00
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
2016-02-03 03:03:31 +00:00
|
|
|
f.populated = true
|
2014-08-18 21:47:20 +00:00
|
|
|
delete(f.items, id)
|
2015-01-26 21:44:53 +00:00
|
|
|
return err
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// List returns a list of all the items.
|
|
|
|
func (f *FIFO) List() []interface{} {
|
|
|
|
f.lock.RLock()
|
|
|
|
defer f.lock.RUnlock()
|
|
|
|
list := make([]interface{}, 0, len(f.items))
|
|
|
|
for _, item := range f.items {
|
|
|
|
list = append(list, item)
|
|
|
|
}
|
|
|
|
return list
|
|
|
|
}
|
|
|
|
|
2015-03-09 21:48:59 +00:00
|
|
|
// ListKeys returns a list of all the keys of the objects currently
|
|
|
|
// in the FIFO.
|
|
|
|
func (f *FIFO) ListKeys() []string {
|
|
|
|
f.lock.RLock()
|
|
|
|
defer f.lock.RUnlock()
|
|
|
|
list := make([]string, 0, len(f.items))
|
|
|
|
for key := range f.items {
|
|
|
|
list = append(list, key)
|
|
|
|
}
|
|
|
|
return list
|
|
|
|
}
|
|
|
|
|
2014-08-03 07:00:42 +00:00
|
|
|
// Get returns the requested item, or sets exists=false.
|
2015-01-26 21:44:53 +00:00
|
|
|
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
2015-02-01 19:55:45 +00:00
|
|
|
key, err := f.keyFunc(obj)
|
2015-01-26 21:44:53 +00:00
|
|
|
if err != nil {
|
2015-03-26 20:28:05 +00:00
|
|
|
return nil, false, KeyError{obj, err}
|
2015-01-26 21:44:53 +00:00
|
|
|
}
|
2015-02-01 19:55:45 +00:00
|
|
|
return f.GetByKey(key)
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetByKey returns the requested item, or sets exists=false.
|
|
|
|
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
|
2014-08-03 07:00:42 +00:00
|
|
|
f.lock.RLock()
|
|
|
|
defer f.lock.RUnlock()
|
2015-02-01 19:55:45 +00:00
|
|
|
item, exists = f.items[key]
|
2015-01-26 21:44:53 +00:00
|
|
|
return item, exists, nil
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Pop waits until an item is ready and returns it. If multiple items are
|
|
|
|
// ready, they are returned in the order in which they were added/updated.
|
|
|
|
// The item is removed from the queue (and the store) before it is returned,
|
2015-08-08 21:29:57 +00:00
|
|
|
// so if you don't successfully process it, you need to add it back with
|
2015-03-05 21:38:31 +00:00
|
|
|
// AddIfNotPresent().
|
2014-08-03 07:00:42 +00:00
|
|
|
func (f *FIFO) Pop() interface{} {
|
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
|
|
|
for {
|
|
|
|
for len(f.queue) == 0 {
|
|
|
|
f.cond.Wait()
|
|
|
|
}
|
|
|
|
id := f.queue[0]
|
|
|
|
f.queue = f.queue[1:]
|
2016-02-03 03:03:31 +00:00
|
|
|
if f.initialPopulationCount > 0 {
|
|
|
|
f.initialPopulationCount--
|
|
|
|
}
|
2014-08-03 07:00:42 +00:00
|
|
|
item, ok := f.items[id]
|
|
|
|
if !ok {
|
|
|
|
// Item may have been deleted subsequently.
|
|
|
|
continue
|
|
|
|
}
|
2014-08-03 22:36:36 +00:00
|
|
|
delete(f.items, id)
|
2014-08-03 07:00:42 +00:00
|
|
|
return item
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-09-16 01:10:10 +00:00
|
|
|
// Replace will delete the contents of 'f', using instead the given map.
|
2015-04-09 01:00:48 +00:00
|
|
|
// 'f' takes ownership of the map, you should not reference the map again
|
2014-09-16 01:10:10 +00:00
|
|
|
// after calling this function. f's queue is reset, too; upon return, it
|
|
|
|
// will contain the items in the map, in no particular order.
|
2015-08-18 08:34:27 +00:00
|
|
|
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
|
2015-01-26 21:44:53 +00:00
|
|
|
items := map[string]interface{}{}
|
|
|
|
for _, item := range list {
|
|
|
|
key, err := f.keyFunc(item)
|
|
|
|
if err != nil {
|
2015-03-26 20:28:05 +00:00
|
|
|
return KeyError{item, err}
|
2015-01-26 21:44:53 +00:00
|
|
|
}
|
|
|
|
items[key] = item
|
|
|
|
}
|
|
|
|
|
2014-09-16 01:10:10 +00:00
|
|
|
f.lock.Lock()
|
|
|
|
defer f.lock.Unlock()
|
2016-02-03 03:03:31 +00:00
|
|
|
|
|
|
|
if !f.populated {
|
|
|
|
f.populated = true
|
|
|
|
f.initialPopulationCount = len(items)
|
|
|
|
}
|
|
|
|
|
2015-01-26 21:44:53 +00:00
|
|
|
f.items = items
|
2014-09-16 01:10:10 +00:00
|
|
|
f.queue = f.queue[:0]
|
2015-01-26 21:44:53 +00:00
|
|
|
for id := range items {
|
2014-09-16 01:10:10 +00:00
|
|
|
f.queue = append(f.queue, id)
|
|
|
|
}
|
|
|
|
if len(f.queue) > 0 {
|
|
|
|
f.cond.Broadcast()
|
|
|
|
}
|
2015-01-26 21:44:53 +00:00
|
|
|
return nil
|
2014-09-16 01:10:10 +00:00
|
|
|
}
|
|
|
|
|
2014-08-03 22:36:36 +00:00
|
|
|
// NewFIFO returns a Store which can be used to queue up items to
|
2014-08-03 07:00:42 +00:00
|
|
|
// process.
|
2015-01-26 21:44:53 +00:00
|
|
|
func NewFIFO(keyFunc KeyFunc) *FIFO {
|
2014-08-03 07:00:42 +00:00
|
|
|
f := &FIFO{
|
2015-01-26 21:44:53 +00:00
|
|
|
items: map[string]interface{}{},
|
|
|
|
queue: []string{},
|
|
|
|
keyFunc: keyFunc,
|
2014-08-03 07:00:42 +00:00
|
|
|
}
|
|
|
|
f.cond.L = &f.lock
|
|
|
|
return f
|
|
|
|
}
|