/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package watch import ( "sync" "sync/atomic" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) func newTicketer() *ticketer { return &ticketer{ cond: sync.NewCond(&sync.Mutex{}), } } type ticketer struct { counter uint64 cond *sync.Cond current uint64 } func (t *ticketer) GetTicket() uint64 { // -1 to start from 0 return atomic.AddUint64(&t.counter, 1) - 1 } func (t *ticketer) WaitForTicket(ticket uint64, f func()) { t.cond.L.Lock() defer t.cond.L.Unlock() for ticket != t.current { t.cond.Wait() } f() t.current++ t.cond.Broadcast() } // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { ch := make(chan watch.Event) w := watch.NewProxyWatcher(ch) t := newTicketer() indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { go t.WaitForTicket(t.GetTicket(), func() { select { case ch <- watch.Event{ Type: watch.Added, Object: obj.(runtime.Object), }: case <-w.StopChan(): } }) }, UpdateFunc: func(old, new interface{}) { go t.WaitForTicket(t.GetTicket(), func() { select { case ch <- watch.Event{ Type: watch.Modified, Object: new.(runtime.Object), }: case <-w.StopChan(): } }) }, DeleteFunc: func(obj interface{}) { go t.WaitForTicket(t.GetTicket(), func() { staleObj, stale := obj.(cache.DeletedFinalStateUnknown) if stale { // We have no means of passing the additional information down using watch API based on watch.Event // but the caller can filter such objects by checking if metadata.deletionTimestamp is set obj = staleObj } select { case ch <- watch.Event{ Type: watch.Deleted, Object: obj.(runtime.Object), }: case <-w.StopChan(): } }) }, }, cache.Indexers{}) go func() { informer.Run(w.StopChan()) }() return indexer, informer, w }