You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/internal/controller/manager.go

112 lines
2.7 KiB

[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package controller
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// Manager is responsible for scheduling the execution of controllers.
type Manager struct {
client pbresource.ResourceServiceClient
logger hclog.Logger
raftLeader atomic.Bool
mu sync.Mutex
running bool
controllers []Controller
leaseChans []chan struct{}
}
// NewManager creates a Manager. logger will be used by the Manager, and as the
// base logger for controllers when one is not specified using WithLogger.
func NewManager(client pbresource.ResourceServiceClient, logger hclog.Logger) *Manager {
return &Manager{
client: client,
logger: logger,
}
}
// Register the given controller to be executed by the Manager. Cannot be called
// once the Manager is running.
func (m *Manager) Register(ctrl Controller) {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
panic("cannot register additional controllers after calling Run")
}
if ctrl.reconciler == nil {
panic(fmt.Sprintf("cannot register controller without a reconciler %s", ctrl))
}
m.controllers = append(m.controllers, ctrl)
}
// Run the Manager and start executing controllers until the given context is
// canceled. Cannot be called more than once.
func (m *Manager) Run(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
panic("cannot call Run more than once")
}
m.running = true
for _, desc := range m.controllers {
logger := desc.logger
if logger == nil {
logger = m.logger.With("managed_type", desc.managedType.Kind)
}
runner := &controllerRunner{
ctrl: desc,
client: m.client,
logger: logger,
}
go newSupervisor(runner.run, m.newLeaseLocked(desc)).run(ctx)
}
}
// SetRaftLeader notifies the Manager of Raft leadership changes. Controllers
// are currently only executed on the Raft leader, so calling this method will
// cause the Manager to spin them up/down accordingly.
func (m *Manager) SetRaftLeader(leader bool) {
m.raftLeader.Store(leader)
m.mu.Lock()
defer m.mu.Unlock()
for _, ch := range m.leaseChans {
select {
case ch <- struct{}{}:
default:
// Do not block if there's nothing receiving on ch (because the supervisor is
// busy doing something else). Note that ch has a buffer of 1, so we'll never
// miss the notification that something has changed so we need to re-evaluate
// the lease.
}
}
}
func (m *Manager) newLeaseLocked(ctrl Controller) Lease {
if ctrl.placement == PlacementEachServer {
return eternalLease{}
}
ch := make(chan struct{}, 1)
m.leaseChans = append(m.leaseChans, ch)
return &raftLease{m: m, ch: ch}
}