diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index 9243f7ce3c..5d0f51f3d4 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -43,6 +43,11 @@ CLOUD_PROVIDER=${CLOUD_PROVIDER:-""} CLOUD_CONFIG=${CLOUD_CONFIG:-""} FEATURE_GATES=${FEATURE_GATES:-"AllAlpha=true"} +# start the cache mutation detector by default so that cache mutators will be found +KUBE_CACHE_MUTATION_DETECTOR="${KUBE_CACHE_MUTATION_DETECTOR:-true}" +export KUBE_CACHE_MUTATION_DETECTOR + + # START_MODE can be 'all', 'kubeletonly', or 'nokubelet' START_MODE=${START_MODE:-"all"} diff --git a/hack/make-rules/test.sh b/hack/make-rules/test.sh index 2431e26858..bce6e29851 100755 --- a/hack/make-rules/test.sh +++ b/hack/make-rules/test.sh @@ -23,6 +23,10 @@ source "${KUBE_ROOT}/hack/lib/init.sh" kube::golang::setup_env +# start the cache mutation detector by default so that cache mutators will be found +KUBE_CACHE_MUTATION_DETECTOR="${KUBE_CACHE_MUTATION_DETECTOR:-true}" +export KUBE_CACHE_MUTATION_DETECTOR + kube::test::find_dirs() { ( cd ${KUBE_ROOT} diff --git a/pkg/client/cache/mutation_detector.go b/pkg/client/cache/mutation_detector.go new file mode 100644 index 0000000000..11d0e6ffa2 --- /dev/null +++ b/pkg/client/cache/mutation_detector.go @@ -0,0 +1,135 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "os" + "reflect" + "strconv" + "sync" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/diff" +) + +var mutationDetectionEnabled = false + +func init() { + mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR")) +} + +type CacheMutationDetector interface { + AddObject(obj interface{}) + Run(stopCh <-chan struct{}) +} + +func NewCacheMutationDetector(name string) CacheMutationDetector { + if !mutationDetectionEnabled { + return dummyMutationDetector{} + } + return &defaultCacheMutationDetector{name: name, period: 1 * time.Second} +} + +type dummyMutationDetector struct{} + +func (dummyMutationDetector) Run(stopCh <-chan struct{}) { +} +func (dummyMutationDetector) AddObject(obj interface{}) { +} + +// defaultCacheMutationDetector gives a way to detect if a cached object has been mutated +// It has a list of cached objects and their copies. I haven't thought of a way +// to see WHO is mutating it, just that it's getting mutated. +type defaultCacheMutationDetector struct { + name string + period time.Duration + + lock sync.Mutex + cachedObjs []cacheObj + + // failureFunc is injectable for unit testing. If you don't have it, the process will panic. + // This panic is intentional, since turning on this detection indicates you want a strong + // failure signal. This failure is effectively a p0 bug and you can't trust process results + // after a mutation anyway. + failureFunc func(message string) +} + +// cacheObj holds the actual object and a copy +type cacheObj struct { + cached interface{} + copied interface{} +} + +func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) { + // we DON'T want protection from panics. If we're running this code, we want to die + go func() { + for { + d.CompareObjects() + + select { + case <-stopCh: + return + case <-time.After(d.period): + } + } + }() +} + +// AddObject makes a deep copy of the object for later comparison. It only works on runtime.Object +// but that covers the vast majority of our cached objects +func (d *defaultCacheMutationDetector) AddObject(obj interface{}) { + if _, ok := obj.(DeletedFinalStateUnknown); ok { + return + } + if _, ok := obj.(runtime.Object); !ok { + return + } + + copiedObj, err := api.Scheme.Copy(obj.(runtime.Object)) + if err != nil { + return + } + + d.lock.Lock() + defer d.lock.Unlock() + d.cachedObjs = append(d.cachedObjs, cacheObj{cached: obj, copied: copiedObj}) +} + +func (d *defaultCacheMutationDetector) CompareObjects() { + d.lock.Lock() + defer d.lock.Unlock() + + altered := false + for i, obj := range d.cachedObjs { + if !reflect.DeepEqual(obj.cached, obj.copied) { + fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectDiff(obj.cached, obj.copied)) + altered = true + } + } + + if altered { + msg := fmt.Sprintf("cache %s modified", d.name) + if d.failureFunc != nil { + d.failureFunc(msg) + return + } + panic(msg) + } +} diff --git a/pkg/client/cache/mutation_detector_test.go b/pkg/client/cache/mutation_detector_test.go new file mode 100644 index 0000000000..3a5d70c1c9 --- /dev/null +++ b/pkg/client/cache/mutation_detector_test.go @@ -0,0 +1,80 @@ +// +build !race + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +func TestMutationDetector(t *testing.T) { + fakeWatch := watch.NewFake() + lw := &testLW{ + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return fakeWatch, nil + }, + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return &api.PodList{}, nil + }, + } + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "anything", + Labels: map[string]string{"check": "foo"}, + }, + } + stopCh := make(chan struct{}) + defer close(stopCh) + addReceived := make(chan bool) + mutationFound := make(chan bool) + + informer := NewSharedInformer(lw, &api.Pod{}, 1*time.Second).(*sharedIndexInformer) + informer.cacheMutationDetector = &defaultCacheMutationDetector{ + name: "name", + period: 1 * time.Second, + failureFunc: func(message string) { + mutationFound <- true + }, + } + informer.AddEventHandler( + ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addReceived <- true + }, + }, + ) + go informer.Run(stopCh) + + fakeWatch.Add(pod) + + select { + case <-addReceived: + } + + pod.Labels["change"] = "true" + + select { + case <-mutationFound: + } + +} diff --git a/pkg/client/cache/shared_informer.go b/pkg/client/cache/shared_informer.go index 976af298e6..65da513a49 100644 --- a/pkg/client/cache/shared_informer.go +++ b/pkg/client/cache/shared_informer.go @@ -68,11 +68,12 @@ func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod ti // be shared amongst all consumers. func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { sharedIndexInformer := &sharedIndexInformer{ - processor: &sharedProcessor{}, - indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), - listerWatcher: lw, - objectType: objType, - fullResyncPeriod: resyncPeriod, + processor: &sharedProcessor{}, + indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), + listerWatcher: lw, + objectType: objType, + fullResyncPeriod: resyncPeriod, + cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), } return sharedIndexInformer } @@ -109,7 +110,8 @@ type sharedIndexInformer struct { indexer Indexer controller *Controller - processor *sharedProcessor + processor *sharedProcessor + cacheMutationDetector CacheMutationDetector // This block is tracked to handle late initialization of the controller listerWatcher ListerWatcher @@ -180,6 +182,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { }() s.stopCh = stopCh + s.cacheMutationDetector.Run(stopCh) s.processor.run(stopCh) s.controller.Run(stopCh) } @@ -273,6 +276,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: + s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err