k3s/vendor/github.com/rancher/lasso/pkg/controller/controller.go

294 lines
6.6 KiB
Go

package controller
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/rancher/lasso/pkg/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const maxTimeout2min = 2 * time.Minute
type Handler interface {
OnChange(key string, obj runtime.Object) error
}
type HandlerFunc func(key string, obj runtime.Object) error
func (h HandlerFunc) OnChange(key string, obj runtime.Object) error {
return h(key, obj)
}
type Controller interface {
Enqueue(namespace, name string)
EnqueueAfter(namespace, name string, delay time.Duration)
EnqueueKey(key string)
Informer() cache.SharedIndexInformer
Start(ctx context.Context, workers int) error
}
type controller struct {
startLock sync.Mutex
name string
workqueue workqueue.RateLimitingInterface
rateLimiter workqueue.RateLimiter
informer cache.SharedIndexInformer
handler Handler
gvk schema.GroupVersionKind
startKeys []startKey
started bool
startCache func(context.Context) error
}
type startKey struct {
key string
after time.Duration
}
type Options struct {
RateLimiter workqueue.RateLimiter
}
func New(name string, informer cache.SharedIndexInformer, startCache func(context.Context) error, handler Handler, opts *Options) Controller {
opts = applyDefaultOptions(opts)
controller := &controller{
name: name,
handler: handler,
informer: informer,
rateLimiter: opts.RateLimiter,
startCache: startCache,
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
func applyDefaultOptions(opts *Options) *Options {
var newOpts Options
if opts != nil {
newOpts = *opts
}
if newOpts.RateLimiter == nil {
newOpts.RateLimiter = workqueue.NewMaxOfRateLimiter(
workqueue.NewItemFastSlowRateLimiter(time.Millisecond, maxTimeout2min, 30),
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
)
}
return &newOpts
}
func (c *controller) Informer() cache.SharedIndexInformer {
return c.informer
}
func (c *controller) GroupVersionKind() schema.GroupVersionKind {
return c.gvk
}
func (c *controller) run(workers int, stopCh <-chan struct{}) {
c.startLock.Lock()
// we have to defer queue creation until we have a stopCh available because a workqueue
// will create a goroutine under the hood. It we instantiate a workqueue we must have
// a mechanism to Shutdown it down. Without the stopCh we don't know when to shutdown
// the queue and release the goroutine
c.workqueue = workqueue.NewNamedRateLimitingQueue(c.rateLimiter, c.name)
for _, start := range c.startKeys {
if start.after == 0 {
c.workqueue.Add(start.key)
} else {
c.workqueue.AddAfter(start.key, start.after)
}
}
c.startKeys = nil
c.startLock.Unlock()
defer utilruntime.HandleCrash()
defer func() {
c.workqueue.ShutDown()
}()
// Start the informer factories to begin populating the informer caches
log.Infof("Starting %s controller", c.name)
// Launch two workers to process Foo resources
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
log.Infof("Shutting down %s workers", c.name)
}
func (c *controller) Start(ctx context.Context, workers int) error {
c.startLock.Lock()
defer c.startLock.Unlock()
if c.started {
return nil
}
if err := c.startCache(ctx); err != nil {
return err
}
if ok := cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
go c.run(workers, ctx.Done())
c.started = true
return nil
}
func (c *controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
if err := c.processSingleItem(obj); err != nil {
if !strings.Contains(err.Error(), "please apply your changes to the latest version and try again") {
log.Errorf("%v", err)
}
return true
}
return true
}
func (c *controller) processSingleItem(obj interface{}) error {
var (
key string
ok bool
)
defer c.workqueue.Done(obj)
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
log.Errorf("expected string in workqueue but got %#v", obj)
return nil
}
if err := c.syncHandler(key); err != nil {
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.workqueue.Forget(obj)
return nil
}
func (c *controller) syncHandler(key string) error {
obj, exists, err := c.informer.GetStore().GetByKey(key)
if err != nil {
return err
}
if !exists {
return c.handler.OnChange(key, nil)
}
return c.handler.OnChange(key, obj.(runtime.Object))
}
func (c *controller) EnqueueKey(key string) {
c.startLock.Lock()
defer c.startLock.Unlock()
if c.workqueue == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.Add(key)
}
}
func (c *controller) Enqueue(namespace, name string) {
key := keyFunc(namespace, name)
c.startLock.Lock()
defer c.startLock.Unlock()
if c.workqueue == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.AddRateLimited(key)
}
}
func (c *controller) EnqueueAfter(namespace, name string, duration time.Duration) {
key := keyFunc(namespace, name)
c.startLock.Lock()
defer c.startLock.Unlock()
if c.workqueue == nil {
c.startKeys = append(c.startKeys, startKey{key: key, after: duration})
} else {
c.workqueue.AddAfter(key, duration)
}
}
func keyFunc(namespace, name string) string {
if namespace == "" {
return name
}
return namespace + "/" + name
}
func (c *controller) enqueue(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
log.Errorf("%v", err)
return
}
c.startLock.Lock()
if c.workqueue == nil {
c.startKeys = append(c.startKeys, startKey{key: key})
} else {
c.workqueue.Add(key)
}
c.startLock.Unlock()
}
func (c *controller) handleObject(obj interface{}) {
if _, ok := obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("error decoding object, invalid type")
return
}
newObj, ok := tombstone.Obj.(metav1.Object)
if !ok {
log.Errorf("error decoding object tombstone, invalid type")
return
}
obj = newObj
}
c.enqueue(obj)
}