You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/internal/controller/manager.go

110 lines
2.7 KiB

package controller
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// Manager is responsible for scheduling the execution of controllers.
type Manager struct {
client pbresource.ResourceServiceClient
logger hclog.Logger
raftLeader atomic.Bool
mu sync.Mutex
running bool
controllers []Controller
leaseChans []chan struct{}
}
// NewManager creates a Manager. logger will be used by the Manager, and as the
// base logger for controllers when one is not specified using WithLogger.
func NewManager(client pbresource.ResourceServiceClient, logger hclog.Logger) *Manager {
return &Manager{
client: client,
logger: logger,
}
}
// Register the given controller to be executed by the Manager. Cannot be called
// once the Manager is running.
func (m *Manager) Register(ctrl Controller) {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
panic("cannot register additional controllers after calling Run")
}
if ctrl.reconciler == nil {
panic(fmt.Sprintf("cannot register controller without a reconciler %s", ctrl))
}
m.controllers = append(m.controllers, ctrl)
}
// Run the Manager and start executing controllers until the given context is
// canceled. Cannot be called more than once.
func (m *Manager) Run(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
panic("cannot call Run more than once")
}
m.running = true
for _, desc := range m.controllers {
logger := desc.logger
if logger == nil {
logger = m.logger.With("managed_type", resource.ToGVK(desc.managedType))
}
runner := &controllerRunner{
ctrl: desc,
client: m.client,
logger: logger,
}
go newSupervisor(runner.run, m.newLeaseLocked(desc)).run(ctx)
}
}
// SetRaftLeader notifies the Manager of Raft leadership changes. Controllers
// are currently only executed on the Raft leader, so calling this method will
// cause the Manager to spin them up/down accordingly.
func (m *Manager) SetRaftLeader(leader bool) {
m.raftLeader.Store(leader)
m.mu.Lock()
defer m.mu.Unlock()
for _, ch := range m.leaseChans {
select {
case ch <- struct{}{}:
default:
// Do not block if there's nothing receiving on ch (because the supervisor is
// busy doing something else). Note that ch has a buffer of 1, so we'll never
// miss the notification that something has changed so we need to re-evaluate
// the lease.
}
}
}
func (m *Manager) newLeaseLocked(ctrl Controller) Lease {
if ctrl.placement == PlacementEachServer {
return eternalLease{}
}
ch := make(chan struct{}, 1)
m.leaseChans = append(m.leaseChans, ch)
return &raftLease{m: m, ch: ch}
}