|
|
|
@ -18,6 +18,9 @@ import (
|
|
|
|
|
"github.com/hashicorp/consul/types"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// permissionDenied is returned when an ACL based rejection happens.
|
|
|
|
|
const permissionDenied = "Permission denied"
|
|
|
|
|
|
|
|
|
|
// Config is the configuration for the State. It is
|
|
|
|
|
// populated during NewLocalAgent from the agent configuration to avoid
|
|
|
|
|
// race conditions with the agent configuration.
|
|
|
|
@ -36,8 +39,7 @@ type ServiceState struct {
|
|
|
|
|
// Service is the local copy of the service record.
|
|
|
|
|
Service *structs.NodeService
|
|
|
|
|
|
|
|
|
|
// Token is the ACL to update or delete the service record on the
|
|
|
|
|
// server.
|
|
|
|
|
// Token is the ACL to update the service record on the server.
|
|
|
|
|
Token string
|
|
|
|
|
|
|
|
|
|
// InSync contains whether the local state of the service record
|
|
|
|
@ -62,8 +64,8 @@ type CheckState struct {
|
|
|
|
|
// Check is the local copy of the health check record.
|
|
|
|
|
Check *structs.HealthCheck
|
|
|
|
|
|
|
|
|
|
// Token is the ACL record to update or delete the health check
|
|
|
|
|
// record on the server.
|
|
|
|
|
// Token is the ACL record to update the health check record
|
|
|
|
|
// on the server.
|
|
|
|
|
Token string
|
|
|
|
|
|
|
|
|
|
// CriticalTime is the last time the health check status went
|
|
|
|
@ -72,8 +74,8 @@ type CheckState struct {
|
|
|
|
|
CriticalTime time.Time
|
|
|
|
|
|
|
|
|
|
// DeferCheck is used to delay the sync of a health check when
|
|
|
|
|
// only the output has changed. This rate limits changes which
|
|
|
|
|
// do not affect the state of the node and/or service.
|
|
|
|
|
// only the status has changed.
|
|
|
|
|
// todo(fs): ^^ this needs double checking...
|
|
|
|
|
DeferCheck *time.Timer
|
|
|
|
|
|
|
|
|
|
// InSync contains whether the local state of the health check
|
|
|
|
@ -105,7 +107,7 @@ func (c *CheckState) CriticalFor() time.Duration {
|
|
|
|
|
return time.Since(c.CriticalTime)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type rpc interface {
|
|
|
|
|
type delegate interface {
|
|
|
|
|
RPC(method string, args interface{}, reply interface{}) error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -114,25 +116,14 @@ type rpc interface {
|
|
|
|
|
// catalog representation
|
|
|
|
|
type State struct {
|
|
|
|
|
sync.RWMutex
|
|
|
|
|
|
|
|
|
|
// Delegate the RPC interface to the consul server or agent.
|
|
|
|
|
//
|
|
|
|
|
// It is set after both the state and the consul server/agent have
|
|
|
|
|
// been created.
|
|
|
|
|
Delegate rpc
|
|
|
|
|
|
|
|
|
|
// TriggerSyncChanges is used to notify the state syncer that a
|
|
|
|
|
// partial sync should be performed.
|
|
|
|
|
//
|
|
|
|
|
// It is set after both the state and the state syncer have been
|
|
|
|
|
// created.
|
|
|
|
|
TriggerSyncChanges func()
|
|
|
|
|
|
|
|
|
|
logger *log.Logger
|
|
|
|
|
|
|
|
|
|
// Config is the agent config
|
|
|
|
|
config Config
|
|
|
|
|
|
|
|
|
|
// delegate is the consul interface to use for keeping in sync
|
|
|
|
|
delegate delegate
|
|
|
|
|
|
|
|
|
|
// nodeInfoInSync tracks whether the server has our correct top-level
|
|
|
|
|
// node information in sync
|
|
|
|
|
nodeInfoInSync bool
|
|
|
|
@ -143,9 +134,13 @@ type State struct {
|
|
|
|
|
// Checks tracks the local checks
|
|
|
|
|
checks map[types.CheckID]*CheckState
|
|
|
|
|
|
|
|
|
|
// metadata tracks the node metadata fields
|
|
|
|
|
// metadata tracks the local metadata fields
|
|
|
|
|
metadata map[string]string
|
|
|
|
|
|
|
|
|
|
// triggerCh is used to inform of a change to local state
|
|
|
|
|
// that requires anti-entropy with the server
|
|
|
|
|
triggerCh chan struct{}
|
|
|
|
|
|
|
|
|
|
// discardCheckOutput stores whether the output of health checks
|
|
|
|
|
// is stored in the raft log.
|
|
|
|
|
discardCheckOutput atomic.Value // bool
|
|
|
|
@ -155,19 +150,33 @@ type State struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewLocalState creates a is used to initialize the local state
|
|
|
|
|
func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
|
|
|
|
|
func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State {
|
|
|
|
|
l := &State{
|
|
|
|
|
config: c,
|
|
|
|
|
logger: lg,
|
|
|
|
|
services: make(map[string]*ServiceState),
|
|
|
|
|
checks: make(map[types.CheckID]*CheckState),
|
|
|
|
|
metadata: make(map[string]string),
|
|
|
|
|
tokens: tokens,
|
|
|
|
|
}
|
|
|
|
|
l.SetDiscardCheckOutput(c.DiscardCheckOutput)
|
|
|
|
|
config: c,
|
|
|
|
|
logger: lg,
|
|
|
|
|
services: make(map[string]*ServiceState),
|
|
|
|
|
checks: make(map[types.CheckID]*CheckState),
|
|
|
|
|
metadata: make(map[string]string),
|
|
|
|
|
triggerCh: triggerCh,
|
|
|
|
|
tokens: tokens,
|
|
|
|
|
}
|
|
|
|
|
l.discardCheckOutput.Store(c.DiscardCheckOutput)
|
|
|
|
|
return l
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (l *State) SetDelegate(d delegate) {
|
|
|
|
|
l.delegate = d
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// changeMade is used to trigger an anti-entropy run
|
|
|
|
|
func (l *State) changeMade() {
|
|
|
|
|
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
|
|
|
|
|
select {
|
|
|
|
|
case l.triggerCh <- struct{}{}:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (l *State) SetDiscardCheckOutput(b bool) {
|
|
|
|
|
l.discardCheckOutput.Store(b)
|
|
|
|
|
}
|
|
|
|
@ -195,12 +204,14 @@ func (l *State) serviceToken(id string) string {
|
|
|
|
|
// AddService is used to add a service entry to the local state.
|
|
|
|
|
// This entry is persistent and the agent will make a best effort to
|
|
|
|
|
// ensure it is registered
|
|
|
|
|
// todo(fs): where is the persistence happening?
|
|
|
|
|
func (l *State) AddService(service *structs.NodeService, token string) error {
|
|
|
|
|
if service == nil {
|
|
|
|
|
return fmt.Errorf("no service")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// use the service name as id if the id was omitted
|
|
|
|
|
// todo(fs): is this for backwards compatibility?
|
|
|
|
|
if service.ID == "" {
|
|
|
|
|
service.ID = service.Service
|
|
|
|
|
}
|
|
|
|
@ -217,7 +228,7 @@ func (l *State) AddServiceState(s *ServiceState) {
|
|
|
|
|
defer l.Unlock()
|
|
|
|
|
|
|
|
|
|
l.services[s.Service.ID] = s
|
|
|
|
|
l.TriggerSyncChanges()
|
|
|
|
|
l.changeMade()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RemoveService is used to remove a service entry from the local state.
|
|
|
|
@ -236,7 +247,7 @@ func (l *State) RemoveService(id string) error {
|
|
|
|
|
// entry around until it is actually removed.
|
|
|
|
|
s.InSync = false
|
|
|
|
|
s.Deleted = true
|
|
|
|
|
l.TriggerSyncChanges()
|
|
|
|
|
l.changeMade()
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -355,7 +366,7 @@ func (l *State) AddCheckState(c *CheckState) {
|
|
|
|
|
defer l.Unlock()
|
|
|
|
|
|
|
|
|
|
l.checks[c.Check.CheckID] = c
|
|
|
|
|
l.TriggerSyncChanges()
|
|
|
|
|
l.changeMade()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RemoveCheck is used to remove a health check from the local state.
|
|
|
|
@ -376,7 +387,7 @@ func (l *State) RemoveCheck(id types.CheckID) error {
|
|
|
|
|
// entry around until it is actually removed.
|
|
|
|
|
c.InSync = false
|
|
|
|
|
c.Deleted = true
|
|
|
|
|
l.TriggerSyncChanges()
|
|
|
|
|
l.changeMade()
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -432,7 +443,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
c.InSync = false
|
|
|
|
|
l.TriggerSyncChanges()
|
|
|
|
|
l.changeMade()
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
@ -442,7 +453,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|
|
|
|
c.Check.Status = status
|
|
|
|
|
c.Check.Output = output
|
|
|
|
|
c.InSync = false
|
|
|
|
|
l.TriggerSyncChanges()
|
|
|
|
|
l.changeMade()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check returns the locally registered check that the
|
|
|
|
@ -538,12 +549,12 @@ func (l *State) updateSyncState() error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var out1 structs.IndexedNodeServices
|
|
|
|
|
if err := l.Delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil {
|
|
|
|
|
if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var out2 structs.IndexedHealthChecks
|
|
|
|
|
if err := l.Delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
|
|
|
|
if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -594,7 +605,8 @@ func (l *State) updateSyncState() error {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If the service is already scheduled for removal skip it
|
|
|
|
|
// If the service is scheduled for removal skip it.
|
|
|
|
|
// todo(fs): is this correct?
|
|
|
|
|
if ls.Deleted {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
@ -634,7 +646,8 @@ func (l *State) updateSyncState() error {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If the check is already scheduled for removal skip it.
|
|
|
|
|
// If the check is scheduled for removal skip it.
|
|
|
|
|
// todo(fs): is this correct?
|
|
|
|
|
if lc.Deleted {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
@ -674,13 +687,10 @@ func (l *State) updateSyncState() error {
|
|
|
|
|
func (l *State) SyncFull() error {
|
|
|
|
|
// note that we do not acquire the lock here since the methods
|
|
|
|
|
// we are calling will do that themself.
|
|
|
|
|
//
|
|
|
|
|
// Also note that we don't hold the lock for the entire operation
|
|
|
|
|
// but release it between the two calls. This is not an issue since
|
|
|
|
|
// the algorithm is best-effort to achieve eventual consistency.
|
|
|
|
|
// SyncChanges will sync whatever updateSyncState() has determined
|
|
|
|
|
// needs updating.
|
|
|
|
|
|
|
|
|
|
// todo(fs): is it an issue that we do not hold the lock for the entire time?
|
|
|
|
|
// todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever
|
|
|
|
|
// todo(fs): was determined in the update step.
|
|
|
|
|
if err := l.updateSyncState(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -754,7 +764,7 @@ func (l *State) LoadMetadata(data map[string]string) error {
|
|
|
|
|
for k, v := range data {
|
|
|
|
|
l.metadata[k] = v
|
|
|
|
|
}
|
|
|
|
|
l.TriggerSyncChanges()
|
|
|
|
|
l.changeMade()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -805,24 +815,19 @@ func (l *State) deleteService(id string) error {
|
|
|
|
|
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
|
|
|
|
}
|
|
|
|
|
var out struct{}
|
|
|
|
|
err := l.Delegate.RPC("Catalog.Deregister", &req, &out)
|
|
|
|
|
switch {
|
|
|
|
|
case err == nil || strings.Contains(err.Error(), "Unknown service"):
|
|
|
|
|
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
|
|
|
|
if err == nil || strings.Contains(err.Error(), "Unknown service") {
|
|
|
|
|
delete(l.services, id)
|
|
|
|
|
l.logger.Printf("[INFO] agent: Deregistered service %q", id)
|
|
|
|
|
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case acl.IsErrPermissionDenied(err):
|
|
|
|
|
// todo(fs): mark the service to be in sync to prevent excessive retrying before next full sync
|
|
|
|
|
// todo(fs): some backoff strategy might be a better solution
|
|
|
|
|
}
|
|
|
|
|
if acl.IsErrPermissionDenied(err) {
|
|
|
|
|
// todo(fs): why is the service in sync here?
|
|
|
|
|
l.services[id].InSync = true
|
|
|
|
|
l.logger.Printf("[WARN] agent: Service %q deregistration blocked by ACLs", id)
|
|
|
|
|
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
l.logger.Printf("[WARN] agent: Deregistering service %q failed. %s", id, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// deleteCheck is used to delete a check from the server
|
|
|
|
@ -838,28 +843,20 @@ func (l *State) deleteCheck(id types.CheckID) error {
|
|
|
|
|
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
|
|
|
|
}
|
|
|
|
|
var out struct{}
|
|
|
|
|
err := l.Delegate.RPC("Catalog.Deregister", &req, &out)
|
|
|
|
|
switch {
|
|
|
|
|
case err == nil || strings.Contains(err.Error(), "Unknown check"):
|
|
|
|
|
c := l.checks[id]
|
|
|
|
|
if c != nil && c.DeferCheck != nil {
|
|
|
|
|
c.DeferCheck.Stop()
|
|
|
|
|
}
|
|
|
|
|
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
|
|
|
|
if err == nil || strings.Contains(err.Error(), "Unknown check") {
|
|
|
|
|
// todo(fs): do we need to stop the deferCheck timer here?
|
|
|
|
|
delete(l.checks, id)
|
|
|
|
|
l.logger.Printf("[INFO] agent: Deregistered check %q", id)
|
|
|
|
|
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case acl.IsErrPermissionDenied(err):
|
|
|
|
|
// todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync
|
|
|
|
|
// todo(fs): some backoff strategy might be a better solution
|
|
|
|
|
}
|
|
|
|
|
if acl.IsErrPermissionDenied(err) {
|
|
|
|
|
// todo(fs): why is the check in sync here?
|
|
|
|
|
l.checks[id].InSync = true
|
|
|
|
|
l.logger.Printf("[WARN] agent: Check %q deregistration blocked by ACLs", id)
|
|
|
|
|
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
l.logger.Printf("[WARN] agent: Deregistering check %q failed. %s", id, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// syncService is used to sync a service to the server
|
|
|
|
@ -903,9 +900,8 @@ func (l *State) syncService(id string) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var out struct{}
|
|
|
|
|
err := l.Delegate.RPC("Catalog.Register", &req, &out)
|
|
|
|
|
switch {
|
|
|
|
|
case err == nil:
|
|
|
|
|
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
|
|
|
|
if err == nil {
|
|
|
|
|
l.services[id].InSync = true
|
|
|
|
|
// Given how the register API works, this info is also updated
|
|
|
|
|
// every time we sync a service.
|
|
|
|
@ -913,23 +909,20 @@ func (l *State) syncService(id string) error {
|
|
|
|
|
for _, check := range checks {
|
|
|
|
|
l.checks[check.CheckID].InSync = true
|
|
|
|
|
}
|
|
|
|
|
l.logger.Printf("[INFO] agent: Synced service %q", id)
|
|
|
|
|
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case acl.IsErrPermissionDenied(err):
|
|
|
|
|
// todo(fs): mark the service and the checks to be in sync to prevent excessive retrying before next full sync
|
|
|
|
|
// todo(fs): some backoff strategy might be a better solution
|
|
|
|
|
}
|
|
|
|
|
if acl.IsErrPermissionDenied(err) {
|
|
|
|
|
// todo(fs): why are the service and the checks in sync here?
|
|
|
|
|
// todo(fs): why is the node info not in sync here?
|
|
|
|
|
l.services[id].InSync = true
|
|
|
|
|
for _, check := range checks {
|
|
|
|
|
l.checks[check.CheckID].InSync = true
|
|
|
|
|
}
|
|
|
|
|
l.logger.Printf("[WARN] agent: Service %q registration blocked by ACLs", id)
|
|
|
|
|
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
l.logger.Printf("[WARN] agent: Syncing service %q failed. %s", id, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// syncCheck is used to sync a check to the server
|
|
|
|
@ -954,27 +947,22 @@ func (l *State) syncCheck(id types.CheckID) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var out struct{}
|
|
|
|
|
err := l.Delegate.RPC("Catalog.Register", &req, &out)
|
|
|
|
|
switch {
|
|
|
|
|
case err == nil:
|
|
|
|
|
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
|
|
|
|
if err == nil {
|
|
|
|
|
l.checks[id].InSync = true
|
|
|
|
|
// Given how the register API works, this info is also updated
|
|
|
|
|
// every time we sync a check.
|
|
|
|
|
l.nodeInfoInSync = true
|
|
|
|
|
l.logger.Printf("[INFO] agent: Synced check %q", id)
|
|
|
|
|
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case acl.IsErrPermissionDenied(err):
|
|
|
|
|
// todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync
|
|
|
|
|
// todo(fs): some backoff strategy might be a better solution
|
|
|
|
|
}
|
|
|
|
|
if acl.IsErrPermissionDenied(err) {
|
|
|
|
|
// todo(fs): why is the check in sync here?
|
|
|
|
|
l.checks[id].InSync = true
|
|
|
|
|
l.logger.Printf("[WARN] agent: Check %q registration blocked by ACLs", id)
|
|
|
|
|
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
l.logger.Printf("[WARN] agent: Syncing check %q failed. %s", id, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (l *State) syncNodeInfo() error {
|
|
|
|
@ -988,22 +976,17 @@ func (l *State) syncNodeInfo() error {
|
|
|
|
|
WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()},
|
|
|
|
|
}
|
|
|
|
|
var out struct{}
|
|
|
|
|
err := l.Delegate.RPC("Catalog.Register", &req, &out)
|
|
|
|
|
switch {
|
|
|
|
|
case err == nil:
|
|
|
|
|
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
|
|
|
|
if err == nil {
|
|
|
|
|
l.nodeInfoInSync = true
|
|
|
|
|
l.logger.Printf("[INFO] agent: Synced node info")
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case acl.IsErrPermissionDenied(err):
|
|
|
|
|
// todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync
|
|
|
|
|
// todo(fs): some backoff strategy might be a better solution
|
|
|
|
|
}
|
|
|
|
|
if acl.IsErrPermissionDenied(err) {
|
|
|
|
|
// todo(fs): why is the node info in sync here?
|
|
|
|
|
l.nodeInfoInSync = true
|
|
|
|
|
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
l.logger.Printf("[WARN] agent: Syncing node info failed. %s", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|