Controller Supervision (#17016)

pull/17122/head
Dan Upton 2023-04-25 12:52:35 +01:00 committed by GitHub
parent ba4a314772
commit b9c485dcb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 498 additions and 54 deletions

View File

@ -68,6 +68,7 @@ import (
"github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
@ -435,6 +436,10 @@ type Server struct {
// with the Resource Service in-process (i.e. not via the network) without auth.
// It should only be used for purely-internal workloads, such as controllers.
internalResourceServiceClient pbresource.ResourceServiceClient
// controllerManager schedules the execution of controllers.
controllerManager *controller.Manager
// handles metrics reporting to HashiCorp
reportingManager *reporting.ReportingManager
}
@ -500,6 +505,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
incomingRPCLimiter: incomingRPCLimiter,
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
typeRegistry: resource.NewRegistry(),
controllerManager: controller.NewManager(logger.Named(logging.ControllerRuntime)),
}
incomingRPCLimiter.Register(s)
@ -824,8 +830,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
}
if s.config.DevMode {
demo.Register(s.typeRegistry)
demo.RegisterTypes(s.typeRegistry)
demo.RegisterControllers(s.controllerManager)
}
go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})
return s, nil
}
@ -1951,6 +1959,7 @@ func (s *Server) trackLeaderChanges() {
s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr))
s.peeringBackend.SetLeaderAddress(string(leaderObs.LeaderAddr))
s.raftStorageBackend.LeaderChanged()
s.controllerManager.SetRaftLeader(s.IsLeader())
// Trigger sending an update to HCP status
s.hcpManager.SendUpdate()

View File

@ -22,7 +22,7 @@ func TestDelete_InputValidation(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
testCases := map[string]func(*pbresource.DeleteRequest){
"no id": func(req *pbresource.DeleteRequest) { req.Id = nil },
@ -101,7 +101,7 @@ func TestDelete_ACLs(t *testing.T) {
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
Return(tc.authz, nil)
server.ACLResolver = mockACLResolver
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -122,8 +122,7 @@ func TestDelete_Success(t *testing.T) {
for desc, tc := range deleteTestCases() {
t.Run(desc, func(t *testing.T) {
server, client, ctx := testDeps(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -150,7 +149,7 @@ func TestDelete_NotFound(t *testing.T) {
for desc, tc := range deleteTestCases() {
t.Run(desc, func(t *testing.T) {
server, client, ctx := testDeps(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -165,7 +164,7 @@ func TestDelete_VersionMismatch(t *testing.T) {
t.Parallel()
server, client, ctx := testDeps(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
rsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist})

View File

@ -40,7 +40,7 @@ func TestList_Empty(t *testing.T) {
for desc, tc := range listTestCases() {
t.Run(desc, func(t *testing.T) {
server := testServer(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
client := testClient(t, server)
rsp, err := client.List(tc.ctx, &pbresource.ListRequest{
@ -58,7 +58,7 @@ func TestList_Many(t *testing.T) {
for desc, tc := range listTestCases() {
t.Run(desc, func(t *testing.T) {
server := testServer(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
client := testClient(t, server)
resources := make([]*pbresource.Resource, 10)
@ -89,7 +89,7 @@ func TestList_GroupVersionMismatch(t *testing.T) {
for desc, tc := range listTestCases() {
t.Run(desc, func(t *testing.T) {
server := testServer(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
client := testClient(t, server)
artist, err := demo.GenerateV2Artist()
@ -116,7 +116,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) {
mockBackend := NewMockBackend(t)
server := testServer(t)
server.Backend = mockBackend
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -133,7 +133,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) {
}
}
// N.B. Uses key ACLs for now. See demo.Register()
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestList_ACL_ListDenied(t *testing.T) {
t.Parallel()
@ -146,7 +146,7 @@ func TestList_ACL_ListDenied(t *testing.T) {
require.Contains(t, err.Error(), "lacks permission 'key:list'")
}
// N.B. Uses key ACLs for now. See demo.Register()
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestList_ACL_ListAllowed_ReadDenied(t *testing.T) {
t.Parallel()
@ -160,7 +160,7 @@ func TestList_ACL_ListAllowed_ReadDenied(t *testing.T) {
require.Empty(t, rsp.Resources)
}
// N.B. Uses key ACLs for now. See demo.Register()
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestList_ACL_ListAllowed_ReadAllowed(t *testing.T) {
t.Parallel()
@ -184,7 +184,7 @@ func roundTripList(t *testing.T, authz acl.Authorizer) (*pbresource.Resource, *p
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
Return(authz, nil)
server.ACLResolver = mockACLResolver
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)

View File

@ -25,7 +25,7 @@ func TestRead_InputValidation(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
testCases := map[string]func(*pbresource.ReadRequest){
"no id": func(req *pbresource.ReadRequest) { req.Id = nil },
@ -79,7 +79,7 @@ func TestRead_ResourceNotFound(t *testing.T) {
t.Run(desc, func(t *testing.T) {
server := testServer(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
client := testClient(t, server)
artist, err := demo.GenerateV2Artist()
@ -98,7 +98,7 @@ func TestRead_GroupVersionMismatch(t *testing.T) {
t.Run(desc, func(t *testing.T) {
server := testServer(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
client := testClient(t, server)
artist, err := demo.GenerateV2Artist()
@ -123,7 +123,7 @@ func TestRead_Success(t *testing.T) {
t.Run(desc, func(t *testing.T) {
server := testServer(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
client := testClient(t, server)
artist, err := demo.GenerateV2Artist()
@ -146,7 +146,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) {
server := testServer(t)
mockBackend := NewMockBackend(t)
server.Backend = mockBackend
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -162,7 +162,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) {
}
}
// N.B. Uses key ACLs for now. See demo.Register()
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestRead_ACLs(t *testing.T) {
type testCase struct {
authz resolver.Result
@ -188,7 +188,7 @@ func TestRead_ACLs(t *testing.T) {
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
Return(tc.authz, nil)
server.ACLResolver = mockACLResolver
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)

View File

@ -46,7 +46,7 @@ func TestWatchList_GroupVersionMatches(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
ctx := context.Background()
// create a watch
@ -90,7 +90,7 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) {
t.Parallel()
server := testServer(t)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
client := testClient(t, server)
ctx := context.Background()
@ -123,7 +123,7 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) {
mustGetNoResource(t, rspCh)
}
// N.B. Uses key ACLs for now. See demo.Register()
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestWatchList_ACL_ListDenied(t *testing.T) {
t.Parallel()
@ -137,7 +137,7 @@ func TestWatchList_ACL_ListDenied(t *testing.T) {
require.Contains(t, err.Error(), "lacks permission 'key:list'")
}
// N.B. Uses key ACLs for now. See demo.Register()
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) {
t.Parallel()
@ -152,7 +152,7 @@ func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) {
mustGetNoResource(t, rspCh)
}
// N.B. Uses key ACLs for now. See demo.Register()
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
func TestWatchList_ACL_ListAllowed_ReadAllowed(t *testing.T) {
t.Parallel()
@ -177,7 +177,7 @@ func roundTripACL(t *testing.T, authz acl.Authorizer) (<-chan resourceOrError, *
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
Return(authz, nil)
server.ACLResolver = mockACLResolver
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)

View File

@ -69,7 +69,7 @@ func TestWriteStatus_InputValidation(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
testCases := map[string]func(*pbresource.WriteStatusRequest){
"no id": func(req *pbresource.WriteStatusRequest) { req.Id = nil },
@ -113,7 +113,7 @@ func TestWriteStatus_Success(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -148,7 +148,7 @@ func TestWriteStatus_CASFailure(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -183,7 +183,7 @@ func TestWriteStatus_TypeNotFound(t *testing.T) {
func TestWriteStatus_ResourceNotFound(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -198,7 +198,7 @@ func TestWriteStatus_ResourceNotFound(t *testing.T) {
func TestWriteStatus_WrongUid(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -219,7 +219,7 @@ func TestWriteStatus_NonCASUpdate_Retry(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)

View File

@ -26,7 +26,7 @@ func TestWrite_InputValidation(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
testCases := map[string]func(*pbresource.WriteRequest){
"no resource": func(req *pbresource.WriteRequest) { req.Resource = nil },
@ -79,7 +79,7 @@ func TestWrite_OwnerValidation(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
type testCase struct {
modReqFn func(req *pbresource.WriteRequest)
@ -183,7 +183,7 @@ func TestWrite_ACLs(t *testing.T) {
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
Return(tc.authz, nil)
server.ACLResolver = mockACLResolver
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -198,7 +198,7 @@ func TestWrite_ACLs(t *testing.T) {
func TestWrite_Mutate(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -224,7 +224,7 @@ func TestWrite_ResourceCreation_Success(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -240,7 +240,7 @@ func TestWrite_CASUpdate_Success(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -262,7 +262,7 @@ func TestWrite_ResourceCreation_StatusProvided(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -281,7 +281,7 @@ func TestWrite_CASUpdate_Failure(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -302,7 +302,7 @@ func TestWrite_Update_WrongUid(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -323,7 +323,7 @@ func TestWrite_Update_StatusModified(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -353,7 +353,7 @@ func TestWrite_Update_NilStatus(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -377,7 +377,7 @@ func TestWrite_Update_NoUid(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -396,7 +396,7 @@ func TestWrite_NonCASUpdate_Success(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -417,7 +417,7 @@ func TestWrite_NonCASUpdate_Retry(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
@ -467,7 +467,7 @@ func TestWrite_Owner_Immutable(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
demo.Register(server.Registry)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)

View File

@ -0,0 +1,20 @@
package controller
import (
"github.com/hashicorp/consul/proto-public/pbresource"
)
// ForType begins building a Controller for the given resource type.
func ForType(managedType *pbresource.Type) Controller {
return Controller{managedType: managedType}
}
// Controller runs a reconciliation loop to respond to changes in resources and
// their dependencies. It is heavily inspired by Kubernetes' controller pattern:
// https://kubernetes.io/docs/concepts/architecture/controller/
//
// Use the builder methods in this package (starting with ForType) to construct
// a controller, and then pass it to a Manager to be executed.
type Controller struct {
managedType *pbresource.Type
}

View File

@ -0,0 +1,22 @@
package controller
import (
"context"
"github.com/hashicorp/go-hclog"
)
// controllerRunner contains the actual implementation of running a controller
// including creating watches, calling the reconciler, handling retries, etc.
type controllerRunner struct {
ctrl Controller
logger hclog.Logger
}
func (c *controllerRunner) run(ctx context.Context) error {
c.logger.Debug("controller running")
defer c.logger.Debug("controller stopping")
<-ctx.Done()
return ctx.Err()
}

View File

@ -0,0 +1,24 @@
package controller
// Lease is used to ensure controllers are run as singletons (i.e. one leader-
// elected instance per cluster).
//
// Currently, this is just an abstraction over Raft leadership. In the future,
// we'll build a backend-agnostic leasing system into the Resource Service which
// will allow us to balance controllers between many servers.
type Lease interface {
// Held returns whether we are the current lease-holders.
Held() bool
// Changed returns a channel on which you can receive notifications whenever
// the lease is acquired or lost.
Changed() <-chan struct{}
}
type raftLease struct {
m *Manager
ch <-chan struct{}
}
func (l *raftLease) Held() bool { return l.m.raftLeader.Load() }
func (l *raftLease) Changed() <-chan struct{} { return l.ch }

View File

@ -0,0 +1,89 @@
package controller
import (
"context"
"sync"
"sync/atomic"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/internal/resource"
)
// Manager is responsible for scheduling the execution of controllers.
type Manager struct {
logger hclog.Logger
raftLeader atomic.Bool
mu sync.Mutex
running bool
controllers []Controller
leaseChans []chan struct{}
}
// NewManager creates a Manager. logger will be used by the Manager, and as the
// base logger for controllers when one is not specified using WithLogger.
func NewManager(logger hclog.Logger) *Manager {
return &Manager{logger: logger}
}
// Register the given controller to be executed by the Manager. Cannot be called
// once the Manager is running.
func (m *Manager) Register(ctrl Controller) {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
panic("cannot register additional controllers after calling Run")
}
m.controllers = append(m.controllers, ctrl)
}
// Run the Manager and start executing controllers until the given context is
// canceled. Cannot be called more than once.
func (m *Manager) Run(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
panic("cannot call Run more than once")
}
m.running = true
for _, desc := range m.controllers {
runner := &controllerRunner{
ctrl: desc,
logger: m.logger.With("managed_type", resource.ToGVK(desc.managedType)),
}
go newSupervisor(runner.run, m.newLeaseLocked()).run(ctx)
}
}
// SetRaftLeader notifies the Manager of Raft leadership changes. Controllers
// are currently only executed on the Raft leader, so calling this method will
// cause the Manager to spin them up/down accordingly.
func (m *Manager) SetRaftLeader(leader bool) {
m.raftLeader.Store(leader)
m.mu.Lock()
defer m.mu.Unlock()
for _, ch := range m.leaseChans {
select {
case ch <- struct{}{}:
default:
// Do not block if there's nothing receiving on ch (because the supervisor is
// busy doing something else). Note that ch has a buffer of 1, so we'll never
// miss the notification that something has changed so we need to re-evaluate
// the lease.
}
}
}
func (m *Manager) newLeaseLocked() Lease {
ch := make(chan struct{}, 1)
m.leaseChans = append(m.leaseChans, ch)
return &raftLease{m: m, ch: ch}
}

View File

@ -0,0 +1,140 @@
package controller
import (
"context"
"time"
"github.com/hashicorp/consul/lib/retry"
)
// flapThreshold is the minimum amount of time between restarts for us *not* to
// consider a controller to be stuck in a crash-loop.
const flapThreshold = 2 * time.Second
// supervisor keeps a task running, restarting it on-error, for as long as the
// given lease is held. When the lease is lost, the context given to the task
// will be canceled. If the task persistently fails (i.e. the controller is in
// a crash-loop) supervisor will use exponential backoff to delay restarts.
type supervisor struct {
task task
lease Lease
running bool
startedAt time.Time
errCh chan error
cancelTask context.CancelFunc
backoff *retry.Waiter
backoffUntil time.Time
backoffTimerCh <-chan time.Time
}
func newSupervisor(task task, lease Lease) *supervisor {
return &supervisor{
task: task,
lease: lease,
errCh: make(chan error),
backoff: &retry.Waiter{
MinFailures: 1,
MinWait: 500 * time.Millisecond,
MaxWait: time.Minute,
Jitter: retry.NewJitter(25),
},
}
}
type task func(context.Context) error
func (s *supervisor) run(ctx context.Context) {
for {
if s.shouldStart() {
s.startTask(ctx)
} else if s.shouldStop() {
s.stopTask()
}
select {
// Outer context canceled.
case <-ctx.Done():
if s.cancelTask != nil {
s.cancelTask()
}
return
// Task stopped running.
case err := <-s.errCh:
stopBackoffTimer := s.handleError(err)
if stopBackoffTimer != nil {
defer stopBackoffTimer()
}
// Unblock when the lease is acquired/lost, or the backoff timer fires.
case <-s.lease.Changed():
case <-s.backoffTimerCh:
}
}
}
func (s *supervisor) shouldStart() bool {
if s.running {
return false
}
if !s.lease.Held() {
return false
}
if time.Now().Before(s.backoffUntil) {
return false
}
return true
}
func (s *supervisor) startTask(ctx context.Context) {
if s.cancelTask != nil {
s.cancelTask()
}
taskCtx, cancelTask := context.WithCancel(ctx)
s.cancelTask = cancelTask
s.startedAt = time.Now()
s.running = true
go func() {
err := s.task(taskCtx)
select {
case s.errCh <- err:
case <-ctx.Done():
}
}()
}
func (s *supervisor) shouldStop() bool {
return s.running && !s.lease.Held()
}
func (s *supervisor) stopTask() {
s.cancelTask()
s.backoff.Reset()
s.running = false
}
func (s *supervisor) handleError(err error) func() bool {
s.running = false
if time.Since(s.startedAt) > flapThreshold {
s.backoff.Reset()
s.backoffUntil = time.Time{}
} else {
delay := s.backoff.WaitDuration()
s.backoffUntil = time.Now().Add(delay)
timer := time.NewTimer(delay)
s.backoffTimerCh = timer.C
return timer.Stop
}
return nil
}

View File

@ -0,0 +1,118 @@
package controller
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
)
func TestSupervise(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
runCh := make(chan struct{})
stopCh := make(chan struct{})
errCh := make(chan error)
task := func(taskCtx context.Context) error {
runCh <- struct{}{}
select {
case err := <-errCh:
return err
case <-taskCtx.Done():
stopCh <- struct{}{}
return taskCtx.Err()
}
}
lease := newTestLease()
go newSupervisor(task, lease).run(ctx)
select {
case <-runCh:
t.Fatal("task should not be running before lease is held")
case <-time.After(500 * time.Millisecond):
}
lease.acquired()
select {
case <-runCh:
case <-time.After(500 * time.Millisecond):
t.Fatal("task not running after lease is acquired")
}
select {
case <-stopCh:
t.Fatal("task should not have stopped before lease is lost")
case <-time.After(500 * time.Millisecond):
}
lease.lost()
select {
case <-stopCh:
case <-time.After(500 * time.Millisecond):
t.Fatal("task still running after lease was lost")
}
select {
case <-runCh:
t.Fatal("task should not be run again before lease is re-acquired")
case <-time.After(500 * time.Millisecond):
}
lease.acquired()
select {
case <-runCh:
case <-time.After(500 * time.Millisecond):
t.Fatal("task not running after lease is re-acquired")
}
errCh <- errors.New("KABOOM")
select {
case <-runCh:
case <-time.After(2 * time.Second):
t.Fatal("task was not restarted")
}
cancel()
select {
case <-stopCh:
case <-time.After(500 * time.Millisecond):
t.Fatal("task still running after parent context was canceled")
}
}
func newTestLease() *testLease {
return &testLease{ch: make(chan struct{}, 1)}
}
type testLease struct {
held atomic.Bool
ch chan struct{}
}
func (l *testLease) Held() bool { return l.held.Load() }
func (l *testLease) Changed() <-chan struct{} { return l.ch }
func (l *testLease) acquired() { l.setHeld(true) }
func (l *testLease) lost() { l.setHeld(false) }
func (l *testLease) setHeld(held bool) {
l.held.Store(held)
select {
case l.ch <- struct{}{}:
default:
}
}

View File

@ -0,0 +1,13 @@
package demo
import "github.com/hashicorp/consul/internal/controller"
// RegisterControllers registers controllers for the demo types. Should only be
// called in dev mode.
func RegisterControllers(mgr *controller.Manager) {
mgr.Register(artistController())
}
func artistController() controller.Controller {
return controller.ForType(TypeV2Artist)
}

View File

@ -66,12 +66,12 @@ const (
ArtistV2ListPolicy = `key_prefix "resource/" { policy = "list" }`
)
// Register demo types. Should only be called in tests and dev mode.
// acls are optional.
// RegisterTypes registers the demo types. Should only be called in tests and
// dev mode.
//
// TODO(spatel): We're standing-in key ACLs for demo resources until our ACL
// system can be more modularly extended (or support generic resource permissions).
func Register(r resource.Registry) {
func RegisterTypes(r resource.Registry) {
readACL := func(authz acl.Authorizer, id *pbresource.ID) error {
key := fmt.Sprintf("resource/%s/%s", resource.ToGVK(id.Type), id.Name)
return authz.ToAllowAuthorizer().KeyReadAllowed(key, &acl.AuthorizerContext{})

View File

@ -104,8 +104,8 @@ func (w *Waiter) Failures() int {
// such as when the context is canceled. This makes it suitable for
// long-running routines that do not get re-initialized, such as replication.
func (w *Waiter) Wait(ctx context.Context) error {
w.failures++
timer := time.NewTimer(w.delay())
delay := w.WaitDuration()
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
@ -115,6 +115,15 @@ func (w *Waiter) Wait(ctx context.Context) error {
}
}
// WaitDuration increases the number of failures by one, and returns the
// duration the caller must wait for. This is an alternative to the Wait
// method for cases where you want to handle the timer yourself (e.g. as
// part of a larger select statement).
func (w *Waiter) WaitDuration() time.Duration {
w.failures++
return w.delay()
}
// NextWait returns the period the next call to Wait with block for assuming
// it's context is not cancelled. It's useful for informing a user how long
// it will be before the next attempt is made.

View File

@ -20,6 +20,7 @@ const (
Consul string = "consul"
ConsulClient string = "client"
ConsulServer string = "server"
ControllerRuntime string = "controller-runtime"
Coordinate string = "coordinate"
DNS string = "dns"
Envoy string = "envoy"