// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: BUSL-1.1 package controller import ( "context" "errors" "fmt" "time" "github.com/hashicorp/go-hclog" "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/internal/controller/cache" "github.com/hashicorp/consul/internal/protoutil" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" ) // Runtime contains the dependencies required by reconcilers. type Runtime struct { Client pbresource.ResourceServiceClient Logger hclog.Logger Cache cache.ReadOnlyCache } // controllerRunner contains the actual implementation of running a controller // including creating watches, calling the reconciler, handling retries, etc. type controllerRunner struct { ctrl *Controller // watchClient will be used by the controller infrastructure internally to // perform watches and maintain the cache. On servers, this client will use // the in-memory gRPC clients which DO NOT cause cloning of data returned by // the resource service. This is desirable so that our cache doesn't incur // the overhead of duplicating all resources that are watched. Generally // dependency mappers and reconcilers should not be given this client so // that they can be free to modify the data they are returned. watchClient pbresource.ResourceServiceClient // runtimeClient will be used by dependency mappers and reconcilers to // access the resource service. On servers, this client will use the in-memory // gRPC client wrapped with the cloning client to force cloning of protobuf // messages as they pass through the client. This is desirable so that // controllers and their mappers can be free to modify the data returned // to them without having to think about the fact that the data should // be immutable as it is shared with the controllers cache as well as the // resource service itself. runtimeClient pbresource.ResourceServiceClient logger hclog.Logger cache cache.Cache } func newControllerRunner(c *Controller, client pbresource.ResourceServiceClient, defaultLogger hclog.Logger) *controllerRunner { return &controllerRunner{ ctrl: c, watchClient: client, runtimeClient: pbresource.NewCloningResourceServiceClient(client), logger: c.buildLogger(defaultLogger), // Do not build the cache here. If we build/set it when the controller runs // then if a controller is restarted it will invalidate the previous cache automatically. } } func (c *controllerRunner) run(ctx context.Context) error { c.logger.Debug("controller running") defer c.logger.Debug("controller stopping") c.cache = c.ctrl.buildCache() defer func() { // once no longer running we should nil out the cache // so that we don't hold pointers to resources which may // become out of date in the future. c.cache = nil }() if c.ctrl.startCb != nil { c.ctrl.startCb(ctx, c.runtime(c.logger)) } if c.ctrl.stopCb != nil { defer c.ctrl.stopCb(ctx, c.runtime(c.logger)) } group, groupCtx := errgroup.WithContext(ctx) recQueue := runQueue[Request](groupCtx, c.ctrl) // Managed Type Events → Reconciliation Queue group.Go(func() error { return c.watch(groupCtx, c.ctrl.managedTypeWatch.watchedType, func(res *pbresource.Resource) { recQueue.Add(Request{ID: res.Id}) }) }) for _, w := range c.ctrl.watches { mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) watcher := w // Watched Type Events → Mapper Queue group.Go(func() error { return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) { mapQueue.Add(mapperRequest{res: res}) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res) }) }) } for _, cw := range c.ctrl.customWatches { customMapQueue := runQueue[Event](groupCtx, c.ctrl) watcher := cw // Custom Events → Mapper Queue group.Go(func() error { return watcher.source.Watch(groupCtx, func(e Event) { customMapQueue.Add(e) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { return watcher.mapper(ctx, runtime, itemType.(Event)) }) }) } // Reconciliation Queue → Reconciler group.Go(func() error { return c.runReconciler(groupCtx, recQueue) }) return group.Wait() } func runQueue[T queue.ItemType](ctx context.Context, ctrl *Controller) queue.WorkQueue[T] { base, max := ctrl.backoff() return queue.RunWorkQueue[T](ctx, base, max) } func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { wl, err := c.watchClient.WatchList(ctx, &pbresource.WatchListRequest{ Type: typ, Tenancy: &pbresource.Tenancy{ Partition: storage.Wildcard, PeerName: storage.Wildcard, Namespace: storage.Wildcard, }, }) if err != nil { c.logger.Error("failed to create watch", "error", err) return err } for { event, err := wl.Recv() if err != nil { c.logger.Warn("error received from watch", "error", err) return err } // Keep the cache up to date. There main reason to do this here is // to ensure that any mapper/reconciliation queue deduping wont // hide events from being observed and updating the cache state. // Therefore we should do this before any queueing. switch event.Operation { case pbresource.WatchEvent_OPERATION_UPSERT: c.cache.Insert(event.Resource) case pbresource.WatchEvent_OPERATION_DELETE: c.cache.Delete(event.Resource) } // Before adding the resource into the queue we must clone it. // While we want the cache to not have duplicate copies of all the // data, we do want downstream consumers like dependency mappers and // controllers to be able to freely modify the data they are given. // Therefore we clone the resource here to prevent any accidental // mutation of data held by the cache (and presumably by the resource // service assuming that the regular client we were given is the inmem // variant) add(protoutil.Clone(event.Resource)) } } func (c *controllerRunner) runMapper( ctx context.Context, w *watch, from queue.WorkQueue[mapperRequest], to queue.WorkQueue[Request], mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) for { item, shutdown := from.Get() if shutdown { return nil } if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } from.Forget(item) from.Done(item) } } func (c *controllerRunner) runCustomMapper( ctx context.Context, cw customWatch, from queue.WorkQueue[Event], to queue.WorkQueue[Request], mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { logger := c.logger.With("watched_event", cw.source) for { item, shutdown := from.Get() if shutdown { return nil } if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } from.Forget(item) from.Done(item) } } func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error { var reqs []Request if err := c.handlePanic(func() error { var err error reqs, err = mapper(ctx, c.runtime(logger.With("map-request-key", item.Key())), item) return err }); err != nil { return err } for _, r := range reqs { if !resource.EqualType(r.ID.Type, c.ctrl.managedTypeWatch.watchedType) { logger.Error("dependency mapper returned request for a resource of the wrong type", "type_expected", resource.ToGVK(c.ctrl.managedTypeWatch.watchedType), "type_got", resource.ToGVK(r.ID.Type), ) continue } to.Add(r) } return nil } func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get() if shutdown { return nil } c.logger.Trace("handling request", "request", req) err := c.handlePanic(func() error { return c.ctrl.reconciler.Reconcile(ctx, c.runtime(c.logger.With("resource-id", req.ID.String())), req) }) if err == nil { queue.Forget(req) } else { var requeueAfter RequeueAfterError if errors.As(err, &requeueAfter) { queue.Forget(req) queue.AddAfter(req, time.Duration(requeueAfter)) } else { queue.AddRateLimited(req) } } queue.Done(req) } } func (c *controllerRunner) handlePanic(fn func() error) (err error) { defer func() { if r := recover(); r != nil { stack := hclog.Stacktrace() c.logger.Error("controller panic", "panic", r, "stack", stack, ) err = fmt.Errorf("panic [recovered]: %v", r) return } }() return fn() } func (c *controllerRunner) runtime(logger hclog.Logger) Runtime { return Runtime{ // dependency mappers and controllers are always given the cloning client // so that they do not have to care about mutating values that they read // through the client. Client: c.runtimeClient, Logger: logger, // ensure that resources queried via the cache get cloned so that the // dependency mapper or reconciler is free to modify them. Cache: cache.NewCloningReadOnlyCache(c.cache), } } type mapperRequest struct{ res *pbresource.Resource } // Key satisfies the queue.ItemType interface. It returns a string which will be // used to de-duplicate requests in the queue. func (i mapperRequest) Key() string { return fmt.Sprintf( "type=%q,part=%q,peer=%q,ns=%q,name=%q,uid=%q", resource.ToGVK(i.res.Id.Type), i.res.Id.Tenancy.Partition, i.res.Id.Tenancy.PeerName, i.res.Id.Tenancy.Namespace, i.res.Id.Name, i.res.Id.Uid, ) }