You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/internal/controller/api.go

261 lines
7.3 KiB

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// SPDX-License-Identifier: BUSL-1.1
package controller
import (
"context"
"fmt"
"strings"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// ForType begins building a Controller for the given resource type.
func ForType(managedType *pbresource.Type) Controller {
return Controller{managedType: managedType}
}
// WithReconciler changes the controller's reconciler.
func (c Controller) WithReconciler(reconciler Reconciler) Controller {
if reconciler == nil {
panic("reconciler must not be nil")
}
c.reconciler = reconciler
return c
}
// WithWatch adds a watch on the given type/dependency to the controller. mapper
// will be called to determine which resources must be reconciled as a result of
// a watched resource changing.
func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMapper) Controller {
if watchedType == nil {
panic("watchedType must not be nil")
}
if mapper == nil {
panic("mapper must not be nil")
}
c.watches = append(c.watches, watch{watchedType, mapper})
return c
}
// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper
// will be called to map events produced by source to the controller's watched type.
func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller {
if source == nil {
panic("source must not be nil")
}
if mapper == nil {
panic("mapper must not be nil")
}
c.customWatches = append(c.customWatches, customWatch{source, mapper})
return c
}
// WithLogger changes the controller's logger.
func (c Controller) WithLogger(logger hclog.Logger) Controller {
if logger == nil {
panic("logger must not be nil")
}
c.logger = logger
return c
}
// WithBackoff changes the base and maximum backoff values for the controller's
// retry rate limiter.
func (c Controller) WithBackoff(base, max time.Duration) Controller {
c.baseBackoff = base
c.maxBackoff = max
return c
}
// WithPlacement changes where and how many replicas of the controller will run.
// In the majority of cases, the default placement (one leader elected instance
// per cluster) is the most appropriate and you shouldn't need to override it.
func (c Controller) WithPlacement(placement Placement) Controller {
c.placement = placement
return c
}
// String returns a textual description of the controller, useful for debugging.
func (c Controller) String() string {
watchedTypes := make([]string, len(c.watches))
for idx, w := range c.watches {
watchedTypes[idx] = fmt.Sprintf("%q", resource.ToGVK(w.watchedType))
}
base, max := c.backoff()
return fmt.Sprintf(
"<Controller managed_type=%q, watched_types=[%s], backoff=<base=%q, max=%q>, placement=%q>",
resource.ToGVK(c.managedType),
strings.Join(watchedTypes, ", "),
base, max,
c.placement,
)
}
func (c Controller) backoff() (time.Duration, time.Duration) {
base := c.baseBackoff
if base == 0 {
base = 5 * time.Millisecond
}
max := c.maxBackoff
if max == 0 {
max = 1000 * time.Second
}
return base, max
}
// Controller runs a reconciliation loop to respond to changes in resources and
// their dependencies. It is heavily inspired by Kubernetes' controller pattern:
// https://kubernetes.io/docs/concepts/architecture/controller/
//
// Use the builder methods in this package (starting with ForType) to construct
// a controller, and then pass it to a Manager to be executed.
type Controller struct {
managedType *pbresource.Type
reconciler Reconciler
logger hclog.Logger
watches []watch
customWatches []customWatch
baseBackoff time.Duration
maxBackoff time.Duration
placement Placement
}
type watch struct {
watchedType *pbresource.Type
mapper DependencyMapper
}
// Watch is responsible for watching for custom events from source and adding them to
// the event queue.
func (s *Source) Watch(ctx context.Context, add func(e Event)) error {
for {
select {
case <-ctx.Done():
return nil
case evt, ok := <-s.Source:
if !ok {
return nil
}
add(evt)
}
}
}
// Source is used as a generic source of events. This can be used when events aren't coming from resources
// stored by the resource API.
type Source struct {
Source <-chan Event
}
// Event captures an event in the system which the API can choose to respond to.
type Event struct {
Obj queue.ItemType
}
// Key returns a string that will be used to de-duplicate items in the queue.
func (e Event) Key() string {
return e.Obj.Key()
}
// customWatch represent a Watch on a custom Event source and a Mapper to map said
// Events into Requests that the controller can respond to.
type customWatch struct {
source *Source
mapper CustomDependencyMapper
}
// Request represents a request to reconcile the resource with the given ID.
type Request struct {
// ID of the resource that needs to be reconciled.
ID *pbresource.ID
}
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (r Request) Key() string {
return fmt.Sprintf(
"part=%q,peer=%q,ns=%q,name=%q,uid=%q",
r.ID.Tenancy.Partition,
r.ID.Tenancy.PeerName,
r.ID.Tenancy.Namespace,
r.ID.Name,
r.ID.Uid,
)
}
// Runtime contains the dependencies required by reconcilers.
type Runtime struct {
Client pbresource.ResourceServiceClient
Logger hclog.Logger
}
// Reconciler implements the business logic of a controller.
type Reconciler interface {
// Reconcile the resource identified by req.ID.
Reconcile(ctx context.Context, rt Runtime, req Request) error
}
// Placement determines where and how many replicas of the controller will run.
type Placement int
const (
// PlacementSingleton ensures there is a single, leader-elected, instance of
// the controller running in the cluster at any time. It's the default and is
// suitable for most use-cases.
PlacementSingleton Placement = iota
// PlacementEachServer ensures there is a replica of the controller running on
// each server in the cluster. It is useful for cases where the controller is
// responsible for applying some configuration resource to the server whenever
// it changes (e.g. rate-limit configuration). Generally, controllers in this
// placement mode should not modify resources.
PlacementEachServer
)
// String satisfies the fmt.Stringer interface.
func (p Placement) String() string {
switch p {
case PlacementSingleton:
return "singleton"
case PlacementEachServer:
return "each-server"
}
panic(fmt.Sprintf("unknown placement %d", p))
}
// RequeueAfterError is an error that allows a Reconciler to override the
// exponential backoff behavior of the Controller, rather than applying
// the backoff algorithm, returning a RequeueAfterError will cause the
// Controller to reschedule the Request at a given time in the future.
type RequeueAfterError time.Duration
// Error implements the error interface.
func (r RequeueAfterError) Error() string {
return fmt.Sprintf("requeue at %s", time.Duration(r))
}
// RequeueAfter constructs a RequeueAfterError with the given duration
// setting.
func RequeueAfter(after time.Duration) error {
return RequeueAfterError(after)
}
// RequeueNow constructs a RequeueAfterError that reschedules the Request
// immediately.
func RequeueNow() error {
return RequeueAfterError(0)
}