mirror of https://github.com/k3s-io/k3s
Allow indexers to be added after an informer start
Both SharedIndexInformer and threadSafeMap were changed to allow AddIndexers to be called after a start or items are in the cache. While a new Indexer is being added handling deltas is blocked in the informer. When a new Indexer is added to a cache with existing items all indices are recalculated. One point to note is that adding a new indexer on a started informer will case all indexes to be rebuilt, but it will not trigger an updateNotification. This is done because it is impractical to assume any existing ResourceEventHandler would have knowledge of a yet to be added index. Any ResourceEventHandler that would need to consume this new index should be added after the new Indexer is added.k3s-v1.13.4
parent
754bcac0fc
commit
e4ae499aa2
|
@ -260,7 +260,8 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
|
|||
defer s.startedLock.Unlock()
|
||||
|
||||
if s.started {
|
||||
return fmt.Errorf("informer has already started")
|
||||
s.blockDeltas.Lock()
|
||||
defer s.blockDeltas.Unlock()
|
||||
}
|
||||
|
||||
return s.indexer.AddIndexers(indexers)
|
||||
|
|
|
@ -125,6 +125,11 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
|
|||
c.items = items
|
||||
|
||||
// rebuild any index
|
||||
c.rebuildIndices()
|
||||
}
|
||||
|
||||
// rebuildIndices rebuilds all indices for the current set c.items. Assumes that c.lock is held by caller
|
||||
func (c *threadSafeMap) rebuildIndices() {
|
||||
c.indices = Indices{}
|
||||
for key, item := range c.items {
|
||||
c.updateIndices(nil, item, key)
|
||||
|
@ -222,10 +227,6 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
|
|||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
if len(c.items) > 0 {
|
||||
return fmt.Errorf("cannot add indexers to running index")
|
||||
}
|
||||
|
||||
oldKeys := sets.StringKeySet(c.indexers)
|
||||
newKeys := sets.StringKeySet(newIndexers)
|
||||
|
||||
|
@ -236,6 +237,11 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
|
|||
for k, v := range newIndexers {
|
||||
c.indexers[k] = v
|
||||
}
|
||||
|
||||
if len(c.items) > 0 {
|
||||
c.rebuildIndices()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/sets"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAddIndexerAfterAdd(t *testing.T) {
|
||||
store := NewThreadSafeStore(Indexers{}, Indices{})
|
||||
|
||||
// Add first indexer
|
||||
err := store.AddIndexers(Indexers{
|
||||
"first": func(obj interface{}) ([]string, error) {
|
||||
value := obj.(string)
|
||||
return []string{
|
||||
value,
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("failed to add first indexer")
|
||||
}
|
||||
|
||||
// Add some data to index
|
||||
store.Add("keya", "value")
|
||||
store.Add("keyb", "value")
|
||||
|
||||
// Assert
|
||||
indexKeys, _ := store.IndexKeys("first", "value")
|
||||
expected := sets.NewString("keya", "keyb")
|
||||
actual := sets.NewString(indexKeys...)
|
||||
if !actual.Equal(expected) {
|
||||
t.Errorf("expected %v does not match actual %v", expected, actual)
|
||||
}
|
||||
|
||||
// Add same indexer, which should fail
|
||||
err = store.AddIndexers(Indexers{
|
||||
"first": func(interface{}) ([]string, error) {
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
if err == nil {
|
||||
t.Errorf("Add same index should have failed")
|
||||
}
|
||||
|
||||
// Add new indexer
|
||||
err = store.AddIndexers(Indexers{
|
||||
"second": func(obj interface{}) ([]string, error) {
|
||||
v := obj.(string)
|
||||
return []string{
|
||||
v +"2",
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("failed to add second indexer")
|
||||
}
|
||||
|
||||
// Assert indexers was added
|
||||
if _, ok := store.GetIndexers()["first"]; !ok {
|
||||
t.Errorf("missing indexer first")
|
||||
}
|
||||
if _, ok := store.GetIndexers()["second"]; !ok {
|
||||
t.Errorf("missing indexer second")
|
||||
}
|
||||
|
||||
// Assert existing data is re-indexed
|
||||
indexKeys, _ = store.IndexKeys("first", "value")
|
||||
expected = sets.NewString("keya", "keyb")
|
||||
actual = sets.NewString(indexKeys...)
|
||||
if !actual.Equal(expected) {
|
||||
t.Errorf("expected %v does not match actual %v", expected, actual)
|
||||
}
|
||||
indexKeys, _ = store.IndexKeys("second", "value2")
|
||||
expected = sets.NewString("keya", "keyb")
|
||||
actual = sets.NewString(indexKeys...)
|
||||
if !actual.Equal(expected) {
|
||||
t.Errorf("expected %v does not match actual %v", expected, actual)
|
||||
}
|
||||
|
||||
// Add more data
|
||||
store.Add("keyc", "value")
|
||||
store.Add("keyd", "value")
|
||||
|
||||
// Assert new data is indexed
|
||||
indexKeys, _ = store.IndexKeys("first", "value")
|
||||
expected = sets.NewString("keya", "keyb", "keyc", "keyd")
|
||||
actual = sets.NewString(indexKeys...)
|
||||
if !actual.Equal(expected) {
|
||||
t.Errorf("expected %v does not match actual %v", expected, actual)
|
||||
}
|
||||
indexKeys, _ = store.IndexKeys("second", "value2")
|
||||
expected = sets.NewString("keya", "keyb", "keyc", "keyd")
|
||||
actual = sets.NewString(indexKeys...)
|
||||
if !actual.Equal(expected) {
|
||||
t.Errorf("expected %v does not match actual %v", expected, actual)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue