@ -21,12 +21,6 @@ import (
// permissionDenied is returned when an ACL based rejection happens.
const permissionDenied = "Permission denied"
// syncStatus is used to represent the difference between
// the local and remote state, and if action needs to be taken
type syncStatus struct {
inSync bool // Is this in sync with the server
}
// Config is the configuration for the State. It is
// populated during NewLocalAgent from the agent configuration to avoid
// race conditions with the agent configuration.
@ -40,6 +34,62 @@ type Config struct {
TaggedAddresses map [ string ] string
}
// ServiceState describes the state of a service record.
type ServiceState struct {
// Service is the local copy of the service record.
Service * structs . NodeService
// Token is the ACL to update the service record on the server.
Token string
// InSync contains whether the local state of the service record
// is in sync with the remote state on the server.
InSync bool
// Deleted is true when the service record has been marked as deleted
// but has not been removed on the server yet.
Deleted bool
}
// CheckState describes the state of a health check record.
type CheckState struct {
// Check is the local copy of the health check record.
Check * structs . HealthCheck
// Token is the ACL record to update the health check record
// on the server.
Token string
// CriticalTime is the last time the health check status went
// from non-critical to critical. When the health check is not
// in critical state the value is the zero value.
CriticalTime time . Time
// DeferCheck is used to delay the sync of a health check when
// only the status has changed.
// todo(fs): ^^ this needs double checking...
DeferCheck * time . Timer
// InSync contains whether the local state of the health check
// record is in sync with the remote state on the server.
InSync bool
// Deleted is true when the health check record has been marked as
// deleted but has not been removed on the server yet.
Deleted bool
}
// Critical returns true when the health check is in critical state.
func ( c * CheckState ) Critical ( ) bool {
return ! c . CriticalTime . IsZero ( )
}
// CriticalFor returns the amount of time the service has been in critical
// state. Its value is undefined when the service is not in critical state.
func ( c * CheckState ) CriticalFor ( ) time . Duration {
return time . Since ( c . CriticalTime )
}
type delegate interface {
RPC ( method string , args interface { } , reply interface { } ) error
}
@ -62,18 +112,10 @@ type State struct {
nodeInfoInSync bool
// Services tracks the local services
services map [ string ] * structs . NodeService
serviceStatus map [ string ] syncStatus
serviceTokens map [ string ] string
services map [ string ] * ServiceState
// Checks tracks the local checks
checks map [ types . CheckID ] * structs . HealthCheck
checkStatus map [ types . CheckID ] syncStatus
checkTokens map [ types . CheckID ] string
checkCriticalTime map [ types . CheckID ] time . Time
// Used to track checks that are being deferred
deferCheck map [ types . CheckID ] * time . Timer
checks map [ types . CheckID ] * CheckState
// metadata tracks the local metadata fields
metadata map [ string ] string
@ -86,25 +128,20 @@ type State struct {
// is stored in the raft log.
discardCheckOutput atomic . Value // bool
// tokens contains the ACL tokens
tokens * token . Store
}
// NewLocalState creates a is used to initialize the local state
func NewState ( c Config , lg * log . Logger , tokens * token . Store , triggerCh chan struct { } ) * State {
l := & State {
config : c ,
logger : lg ,
services : make ( map [ string ] * structs . NodeService ) ,
serviceStatus : make ( map [ string ] syncStatus ) ,
serviceTokens : make ( map [ string ] string ) ,
checks : make ( map [ types . CheckID ] * structs . HealthCheck ) ,
checkStatus : make ( map [ types . CheckID ] syncStatus ) ,
checkTokens : make ( map [ types . CheckID ] string ) ,
checkCriticalTime : make ( map [ types . CheckID ] time . Time ) ,
deferCheck : make ( map [ types . CheckID ] * time . Timer ) ,
metadata : make ( map [ string ] string ) ,
triggerCh : triggerCh ,
tokens : tokens ,
config : c ,
logger : lg ,
services : make ( map [ string ] * ServiceState ) ,
checks : make ( map [ types . CheckID ] * CheckState ) ,
metadata : make ( map [ string ] string ) ,
triggerCh : triggerCh ,
tokens : tokens ,
}
l . discardCheckOutput . Store ( c . DiscardCheckOutput )
return l
@ -137,7 +174,10 @@ func (l *State) ServiceToken(id string) string {
// serviceToken returns an ACL token associated with a service.
func ( l * State ) serviceToken ( id string ) string {
token := l . serviceTokens [ id ]
var token string
if s := l . services [ id ] ; s != nil {
token = s . Token
}
if token == "" {
token = l . tokens . UserToken ( )
}
@ -147,37 +187,48 @@ func (l *State) serviceToken(id string) string {
// AddService is used to add a service entry to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func ( l * State ) AddService ( service * structs . NodeService , token string ) {
// Assign the ID if none given
if service . ID == "" && service . Service != "" {
service . ID = service . Service
}
// todo(fs): where is the persistence happening?
func ( l * State ) AddService ( service * structs . NodeService , token string ) error {
l . Lock ( )
defer l . Unlock ( )
l . services [ service . ID ] = service
l . serviceStatus [ service . ID ] = syncStatus { }
l . serviceTokens [ service . ID ] = token
if service == nil {
return fmt . Errorf ( "no service" )
}
// use the service name as id if the id was omitted
// todo(fs): is this for backwards compatibility?
if service . ID == "" {
service . ID = service . Service
}
l . services [ service . ID ] = & ServiceState {
Service : service ,
Token : token ,
}
l . changeMade ( )
return nil
}
// RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered
func ( l * State ) RemoveService ( serviceID string ) error {
// The agent will make a best effort to ensure it is deregistered .
func ( l * State ) RemoveService ( id string ) error {
l . Lock ( )
defer l . Unlock ( )
if _ , ok := l . services [ serviceID ] ; ok {
delete ( l . services , serviceID )
// Leave the service token around, if any, until we successfully
// delete the service.
l . serviceStatus [ serviceID ] = syncStatus { inSync : false }
l . changeMade ( )
} else {
return fmt . Errorf ( "Service does not exist" )
s := l . services [ id ]
if s == nil || s . Deleted {
return fmt . Errorf ( "Service %q does not exist" , id )
}
// To remove the service on the server we need the token.
// Therefore, we mark the service as deleted and keep the
// entry around until it is actually removed.
s . InSync = false
s . Deleted = true
l . changeMade ( )
return nil
}
@ -186,20 +237,28 @@ func (l *State) RemoveService(serviceID string) error {
func ( l * State ) Service ( id string ) * structs . NodeService {
l . RLock ( )
defer l . RUnlock ( )
return l . services [ id ]
s := l . services [ id ]
if s == nil || s . Deleted {
return nil
}
return s . Service
}
// Services returns the locally registered services that the
// agent is aware of and are being kept in sync with the server
func ( l * State ) Services ( ) map [ string ] * structs . NodeService {
services := make ( map [ string ] * structs . NodeService )
l . RLock ( )
defer l . RUnlock ( )
for name , serv := range l . services {
services [ name ] = serv
m := make ( map [ string ] * structs . NodeService )
for id , s := range l . services {
if s . Deleted {
continue
}
m [ id ] = s . Service
}
return services
return m
}
// CheckToken is used to return the configured health check token for a
@ -211,8 +270,12 @@ func (l *State) CheckToken(checkID types.CheckID) string {
}
// checkToken returns an ACL token associated with a check.
func ( l * State ) checkToken ( checkID types . CheckID ) string {
token := l . checkTokens [ checkID ]
func ( l * State ) checkToken ( id types . CheckID ) string {
var token string
c := l . checks [ id ]
if c != nil {
token = c . Token
}
if token == "" {
token = l . tokens . UserToken ( )
}
@ -226,8 +289,9 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
l . Lock ( )
defer l . Unlock ( )
// Set the node name
check . Node = l . config . NodeName
if check == nil {
return fmt . Errorf ( "no check" )
}
if l . discardCheckOutput . Load ( ) . ( bool ) {
check . Output = ""
@ -236,38 +300,51 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
// if there is a serviceID associated with the check, make sure it exists before adding it
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
if check . ServiceID != "" && l . services [ check . ServiceID ] == nil {
return fmt . Errorf ( " ServiceID %q does not exist" , check . ServiceID )
return fmt . Errorf ( " Check %q refers to non-existent service %q does not exist", check . CheckID , check . ServiceID )
}
l . checks [ check . CheckID ] = check
l . checkStatus [ check . CheckID ] = syncStatus { }
l . checkTokens [ check . CheckID ] = token
delete ( l . checkCriticalTime , check . CheckID )
// hard-set the node name
check . Node = l . config . NodeName
l . checks [ check . CheckID ] = & CheckState {
Check : check ,
Token : token ,
}
l . changeMade ( )
return nil
}
// RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered
func ( l * State ) RemoveCheck ( checkID types . CheckID ) {
// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well.
// todo(fs): Check code that calls this to handle the error.
func ( l * State ) RemoveCheck ( id types . CheckID ) error {
l . Lock ( )
defer l . Unlock ( )
delete ( l . checks , checkID )
// Leave the check token around, if any, until we successfully delete
// the check.
delete ( l . checkCriticalTime , checkID )
l . checkStatus [ checkID ] = syncStatus { inSync : false }
c := l . checks [ id ]
if c == nil || c . Deleted {
return fmt . Errorf ( "Check %q does not exist" , id )
}
// To remove the check on the server we need the token.
// Therefore, we mark the service as deleted and keep the
// entry around until it is actually removed.
c . InSync = false
c . Deleted = true
l . changeMade ( )
return nil
}
// UpdateCheck is used to update the status of a check
func ( l * State ) UpdateCheck ( checkID types . CheckID , status , output string ) {
func ( l * State ) UpdateCheck ( id types . CheckID , status , output string ) {
l . Lock ( )
defer l . Unlock ( )
c heck, ok := l . checks [ checkID ]
if ! ok {
c := l . checks [ id ]
if c == nil || c . Deleted {
return
}
@ -278,16 +355,15 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
// Update the critical time tracking (this doesn't cause a server updates
// so we can always keep this up to date).
if status == api . HealthCritical {
_ , wasCritical := l . checkCriticalTime [ checkID ]
if ! wasCritical {
l . checkCriticalTime [ checkID ] = time . Now ( )
if ! c . Critical ( ) {
c . CriticalTime = time . Now ( )
}
} else {
delete ( l . checkCriticalTime , checkID )
c . CriticalTime = time . Time { }
}
// Do nothing if update is idempotent
if c heck. Status == status && c heck. Output == output {
if c . C heck. Status == status && c . C heck. Output == output {
return
}
@ -295,28 +371,34 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
// frequent updates of output. Instead, we update the output internally,
// and periodically do a write-back to the servers. If there is a status
// change we do the write immediately.
if l . config . CheckUpdateInterval > 0 && check . Status == status {
check . Output = output
if _ , ok := l . deferCheck [ checkID ] ; ! ok {
intv := time . Duration ( uint64 ( l . config . CheckUpdateInterval ) / 2 ) + lib . RandomStagger ( l . config . CheckUpdateInterval )
deferSync := time . AfterFunc ( intv , func ( ) {
if l . config . CheckUpdateInterval > 0 && c . Check . Status == status {
c . Check . Output = output
if c . DeferCheck == nil {
d := l . config . CheckUpdateInterval
intv := time . Duration ( uint64 ( d ) / 2 ) + lib . RandomStagger ( d )
c . DeferCheck = time . AfterFunc ( intv , func ( ) {
l . Lock ( )
if _ , ok := l . checkStatus [ checkID ] ; ok {
l . checkStatus [ checkID ] = syncStatus { inSync : false }
l . changeMade ( )
defer l . Unlock ( )
c := l . checks [ id ]
if c == nil {
return
}
c . DeferCheck = nil
if c . Deleted {
return
}
delete ( l . deferCheck , checkID )
l . Unlock ( )
c . InSync = false
l . changeMade ( )
} )
l . deferCheck [ checkID ] = deferSync
}
return
}
// Update status and mark out of sync
c heck. Status = status
c heck. Output = output
l. checkStatus [ checkID ] = syncStatus { inSync : false }
c . C heck. Status = status
c . C heck. Output = output
c. InSync = false
l . changeMade ( )
}
@ -325,7 +407,12 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
func ( l * State ) Check ( id types . CheckID ) * structs . HealthCheck {
l . RLock ( )
defer l . RUnlock ( )
return l . checks [ id ]
c := l . checks [ id ]
if c == nil || c . Deleted {
return nil
}
return c . Check
}
// Checks returns the locally registered checks that the
@ -334,78 +421,83 @@ func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
l . RLock ( )
defer l . RUnlock ( )
checks := make ( map [ types . CheckID ] * structs . HealthCheck )
m := make ( map [ types . CheckID ] * structs . HealthCheck )
for id , c := range l . checks {
if c . Deleted {
continue
}
c2 := new ( structs . HealthCheck )
* c2 = * c
checks [ id ] = c2
* c2 = * c . Check
m [ id ] = c2
}
return checks
}
// CriticalCheck is used to return the duration a check has been critical along
// with its associated health check.
type CriticalCheck struct {
CriticalFor time . Duration
Check * structs . HealthCheck
return m
}
// CriticalChecks returns locally registered health checks that the agent is
// aware of and are being kept in sync with the server, and that are in a
// critical state. This also returns information about how long each check has
// been critical.
func ( l * State ) CriticalChecks ( ) map [ types . CheckID ] CriticalCheck {
checks := make ( map [ types . CheckID ] CriticalCheck )
// CriticalCheckStates returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server
func ( l * State ) CriticalCheckStates ( ) map [ types . CheckID ] * CheckState {
l . RLock ( )
defer l . RUnlock ( )
now := time . Now ( )
for checkID , criticalTime := range l . checkCriticalTime {
checks [ checkID ] = CriticalCheck {
CriticalFor : now . Sub ( criticalTime ) ,
Check : l . checks [ checkID ] ,
m := make ( map [ types . CheckID ] * CheckState )
for id , c := range l . checks {
if c . Deleted || ! c . Critical ( ) {
continue
}
m [ id ] = c
}
return checks
return m
}
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func ( l * State ) Metadata ( ) map [ string ] string {
metadata := make ( map [ string ] string )
l . RLock ( )
defer l . RUnlock ( )
for k ey , v alue := range l . metadata {
m etadata [ k ey ] = v alue
m := make ( map [ string ] string )
for k , v := range l . metadata {
m [ k ] = v
}
return m etadata
return m
}
// UpdateSyncState does a read of the server state, and updates
// the local sync status as appropriate
func ( l * State ) UpdateSyncState ( ) error {
// 1. get all checks and services from the master
req := structs . NodeSpecificRequest {
Datacenter : l . config . Datacenter ,
Node : l . config . NodeName ,
QueryOptions : structs . QueryOptions { Token : l . tokens . AgentToken ( ) } ,
}
var out1 structs . IndexedNodeServices
var out2 structs . IndexedHealthChecks
if e := l . delegate . RPC ( "Catalog.NodeServices" , & req , & out1 ) ; e != nil {
return e
if err := l . delegate . RPC ( "Catalog.NodeServices" , & req , & out1 ) ; err != nil {
return err
}
var out2 structs . IndexedHealthChecks
if err := l . delegate . RPC ( "Health.NodeChecks" , & req , & out2 ) ; err != nil {
return err
}
checks := out2 . HealthChecks
// 2. create useful data structures for traversal
remoteServices := make ( map [ string ] * structs . NodeService )
if out1 . NodeServices != nil {
remoteServices = out1 . NodeServices . Services
}
remoteChecks := make ( map [ types . CheckID ] * structs . HealthCheck , len ( out2 . HealthChecks ) )
for _ , rc := range out2 . HealthChecks {
remoteChecks [ rc . CheckID ] = rc
}
// 3. perform sync
l . Lock ( )
defer l . Unlock ( )
// Check the node info
// sync node info
if out1 . NodeServices == nil || out1 . NodeServices . Node == nil ||
out1 . NodeServices . Node . ID != l . config . NodeID ||
! reflect . DeepEqual ( out1 . NodeServices . Node . TaggedAddresses , l . config . TaggedAddresses ) ||
@ -413,99 +505,103 @@ func (l *State) UpdateSyncState() error {
l . nodeInfoInSync = false
}
// Check all our services
services := make ( map [ string ] * structs . NodeService )
if out1 . NodeServices != nil {
services = out1 . NodeServices . Services
}
// sync services
for id := range l . services {
// If the local service doesn't exist remotely, then sync it
if _, ok := services [ id ] ; ! ok {
l. serviceStatus [ id ] = syncStatus { inSync : false }
// sync local services that do not exist remotely
for id , s := range l . services {
if remoteServices [ id ] == nil {
s. InSync = false
}
}
for id , service := range s ervices {
for id , rs := range remoteS ervices {
// If we don't have the service locally, deregister it
existing, ok := l . services [ id ]
if ! ok {
// The consul service is created automatically , and does
ls := l . services [ id ]
if ls == nil {
// The consul service is created automatically and does
// not need to be deregistered.
if id == structs . ConsulServiceID {
continue
}
l . serviceStatus [ id ] = syncStatus { inSync : false }
l . services [ id ] = & ServiceState { Deleted : true }
continue
}
// If the service is scheduled for removal skip it.
// todo(fs): is this correct?
if ls . Deleted {
continue
}
// If our definition is different, we need to update it. Make a
// copy so that we don't retain a pointer to any actual state
// store info for in-memory RPCs.
if existing . EnableTagOverride {
existing . Tags = make ( [ ] string , len ( service . Tags ) )
copy ( existing. Tags , service . Tags )
if ls. Service . EnableTagOverride {
ls. Service . Tags = make ( [ ] string , len ( rs . Tags ) )
copy ( ls. Service . Tags , rs . Tags )
}
equal := existing . IsSame ( service )
l . serviceStatus [ id ] = syncStatus { inSync : equal }
ls . InSync = ls . Service . IsSame ( rs )
}
// Index the remote health checks to improve efficiency
checkIndex := make ( map [ types . CheckID ] * structs . HealthCheck , len ( checks ) )
for _ , check := range checks {
checkIndex [ check . CheckID ] = check
}
// sync checks
// Sync any check which doesn't exist on the remote side
for id := range l . checks {
if _, ok := checkIndex [ id ] ; ! ok {
l. checkStatus [ id ] = syncStatus { inSync : false }
// sync local checks which do not exist remotely
for id , c := range l . checks {
if remoteChecks [ id ] == nil {
c. InSync = false
}
}
for _ , check := range checks {
for id , rc := range remoteChecks {
lc := l . checks [ id ]
// If we don't have the check locally, deregister it
id := check . CheckID
existing , ok := l . checks [ id ]
if ! ok {
// The Serf check is created automatically, and does not
if lc == nil {
// The Serf check is created automatically and does not
// need to be deregistered.
if id == structs . SerfCheckID {
l . logger . Printf ( "Skipping remote check %q since it is managed automatically" , id )
continue
}
l . checkStatus [ id ] = syncStatus { inSync : false }
l . checks [ id ] = & CheckState { Deleted : true }
continue
}
// If the check is scheduled for removal skip it.
// todo(fs): is this correct?
if lc . Deleted {
continue
}
// If our definition is different, we need to update it
var equal bool
if l . config . CheckUpdateInterval == 0 {
equal = existing . IsSame ( check )
} else {
// Copy the existing check before potentially modifying
// it before the compare operation.
eCopy := existing . Clone ( )
// Copy the server's check before modifying, otherwise
// in-memory RPCs will have side effects.
cCopy := check . Clone ( )
// If there's a defer timer active then we've got a
// potentially spammy check so we don't sync the output
// during this sweep since the timer will mark the check
// out of sync for us. Otherwise, it is safe to sync the
// output now. This is especially important for checks
// that don't change state after they are created, in
// which case we'd never see their output synced back ever.
if _ , ok := l . deferCheck [ id ] ; ok {
eCopy . Output = ""
cCopy . Output = ""
}
equal = eCopy . IsSame ( cCopy )
lc . InSync = lc . Check . IsSame ( rc )
continue
}
// Update the status
l . checkStatus [ id ] = syncStatus { inSync : equal }
// Copy the existing check before potentially modifying
// it before the compare operation.
lcCopy := lc . Check . Clone ( )
// Copy the server's check before modifying, otherwise
// in-memory RPCs will have side effects.
rcCopy := rc . Clone ( )
// If there's a defer timer active then we've got a
// potentially spammy check so we don't sync the output
// during this sweep since the timer will mark the check
// out of sync for us. Otherwise, it is safe to sync the
// output now. This is especially important for checks
// that don't change state after they are created, in
// which case we'd never see their output synced back ever.
if lc . DeferCheck != nil {
lcCopy . Output = ""
rcCopy . Output = ""
}
lc . InSync = lcCopy . IsSame ( rcCopy )
}
return nil
}
@ -521,39 +617,38 @@ func (l *State) SyncChanges() error {
// API works.
// Sync the services
for id , status := range l . serviceStatus {
if _ , ok := l . services [ id ] ; ! ok {
if err := l . deleteService ( id ) ; err != nil {
return err
}
} else if ! status . inSync {
if err := l . syncService ( id ) ; err != nil {
return err
}
} else {
for id , s := range l . services {
var err error
switch {
case s . Deleted :
err = l . deleteService ( id )
case ! s . InSync :
err = l . syncService ( id )
default :
l . logger . Printf ( "[DEBUG] agent: Service '%s' in sync" , id )
}
if err != nil {
return err
}
}
// Sync the checks
for id , status := range l . checkStatus {
if _ , ok := l . checks [ id ] ; ! ok {
if err := l . deleteCheck ( id ) ; err != nil {
return err
}
} else if ! status . inSync {
// Cancel a deferred sync
if timer := l . deferCheck [ id ] ; timer != nil {
timer . Stop ( )
delete ( l . deferCheck , id )
}
if err := l . syncCheck ( id ) ; err != nil {
return err
for id , c := range l . checks {
var err error
switch {
case c . Deleted :
err = l . deleteCheck ( id )
case ! c . InSync :
if c . DeferCheck != nil {
c . DeferCheck . Stop ( )
c . DeferCheck = nil
}
} else {
err = l . syncCheck ( id )
default :
l . logger . Printf ( "[DEBUG] agent: Check '%s' in sync" , id )
}
if err != nil {
return err
}
}
// Now sync the node level info if we need to, and didn't do any of
@ -593,9 +688,26 @@ func (l *State) UnloadMetadata() {
func ( l * State ) Stats ( ) map [ string ] string {
l . RLock ( )
defer l . RUnlock ( )
services := 0
for _ , s := range l . services {
if s . Deleted {
continue
}
services ++
}
checks := 0
for _ , c := range l . checks {
if c . Deleted {
continue
}
checks ++
}
return map [ string ] string {
"services" : strconv . Itoa ( len ( l . services ) ) ,
"checks" : strconv . Itoa ( len ( l . checks ) ) ,
"services" : strconv . Itoa ( services ) ,
"checks" : strconv . Itoa ( checks ) ,
}
}
@ -614,12 +726,13 @@ func (l *State) deleteService(id string) error {
var out struct { }
err := l . delegate . RPC ( "Catalog.Deregister" , & req , & out )
if err == nil || strings . Contains ( err . Error ( ) , "Unknown service" ) {
delete ( l . serviceStatus , id )
delete ( l . serviceTokens , id )
delete ( l . services , id )
l . logger . Printf ( "[INFO] agent: Deregistered service '%s'" , id )
return nil
} else if acl . IsErrPermissionDenied ( err ) {
l . serviceStatus [ id ] = syncStatus { inSync : true }
}
if acl . IsErrPermissionDenied ( err ) {
// todo(fs): why is the service in sync here?
l . services [ id ] . InSync = true
l . logger . Printf ( "[WARN] agent: Service '%s' deregistration blocked by ACLs" , id )
return nil
}
@ -641,12 +754,14 @@ func (l *State) deleteCheck(id types.CheckID) error {
var out struct { }
err := l . delegate . RPC ( "Catalog.Deregister" , & req , & out )
if err == nil || strings . Contains ( err . Error ( ) , "Unknown check" ) {
delete ( l . checkStatus , id )
delete ( l . check Token s, id )
// todo(fs): do we need to stop the deferCheck timer here?
delete ( l . check s, id )
l . logger . Printf ( "[INFO] agent: Deregistered check '%s'" , id )
return nil
} else if acl . IsErrPermissionDenied ( err ) {
l . checkStatus [ id ] = syncStatus { inSync : true }
}
if acl . IsErrPermissionDenied ( err ) {
// todo(fs): why is the check in sync here?
l . checks [ id ] . InSync = true
l . logger . Printf ( "[WARN] agent: Check '%s' deregistration blocked by ACLs" , id )
return nil
}
@ -655,17 +770,6 @@ func (l *State) deleteCheck(id types.CheckID) error {
// syncService is used to sync a service to the server
func ( l * State ) syncService ( id string ) error {
req := structs . RegisterRequest {
Datacenter : l . config . Datacenter ,
ID : l . config . NodeID ,
Node : l . config . NodeName ,
Address : l . config . AdvertiseAddr ,
TaggedAddresses : l . config . TaggedAddresses ,
NodeMeta : l . metadata ,
Service : l . services [ id ] ,
WriteRequest : structs . WriteRequest { Token : l . serviceToken ( id ) } ,
}
// If the service has associated checks that are out of sync,
// piggyback them on the service sync so they are part of the
// same transaction and are registered atomically. We only let
@ -673,12 +777,28 @@ func (l *State) syncService(id string) error {
// otherwise we need to register them separately so they don't
// pick up privileges from the service token.
var checks structs . HealthChecks
for _ , check := range l . checks {
if check . ServiceID == id && ( l . serviceToken ( id ) == l . checkToken ( check . CheckID ) ) {
if stat , ok := l . checkStatus [ check . CheckID ] ; ! ok || ! stat . inSync {
checks = append ( checks , check )
}
for checkID , c := range l . checks {
if c . Deleted || c . InSync {
continue
}
if c . Check . ServiceID != id {
continue
}
if l . serviceToken ( id ) != l . checkToken ( checkID ) {
continue
}
checks = append ( checks , c . Check )
}
req := structs . RegisterRequest {
Datacenter : l . config . Datacenter ,
ID : l . config . NodeID ,
Node : l . config . NodeName ,
Address : l . config . AdvertiseAddr ,
TaggedAddresses : l . config . TaggedAddresses ,
NodeMeta : l . metadata ,
Service : l . services [ id ] . Service ,
WriteRequest : structs . WriteRequest { Token : l . serviceToken ( id ) } ,
}
// Backwards-compatibility for Consul < 0.5
@ -691,20 +811,24 @@ func (l *State) syncService(id string) error {
var out struct { }
err := l . delegate . RPC ( "Catalog.Register" , & req , & out )
if err == nil {
l . service Status[ id ] = syncStatus { inSync : true }
l . service s[ id ] . InSync = true
// Given how the register API works, this info is also updated
// every time we sync a service.
l . nodeInfoInSync = true
l . logger . Printf ( "[INFO] agent: Synced service '%s'" , id )
for _ , check := range checks {
l . check Statu s[ check . CheckID ] = syncStatus { inSync : true }
l . check s[ check . CheckID ] . InSync = true
}
} else if acl . IsErrPermissionDenied ( err ) {
l . serviceStatus [ id ] = syncStatus { inSync : true }
l . logger . Printf ( "[WARN] agent: Service '%s' registration blocked by ACLs" , id )
l . logger . Printf ( "[INFO] agent: Synced service '%s'" , id )
return nil
}
if acl . IsErrPermissionDenied ( err ) {
// todo(fs): why are the service and the checks in sync here?
// todo(fs): why is the node info not in sync here?
l . services [ id ] . InSync = true
for _ , check := range checks {
l . checkStatus [ check . CheckID ] = syncStatus { inSync : true }
l . check s[ check . CheckID ] . InSync = true
}
l . logger . Printf ( "[WARN] agent: Service '%s' registration blocked by ACLs" , id )
return nil
}
return err
@ -712,14 +836,7 @@ func (l *State) syncService(id string) error {
// syncCheck is used to sync a check to the server
func ( l * State ) syncCheck ( id types . CheckID ) error {
// Pull in the associated service if any
check := l . checks [ id ]
var service * structs . NodeService
if check . ServiceID != "" {
if serv , ok := l . services [ check . ServiceID ] ; ok {
service = serv
}
}
c := l . checks [ id ]
req := structs . RegisterRequest {
Datacenter : l . config . Datacenter ,
@ -728,20 +845,29 @@ func (l *State) syncCheck(id types.CheckID) error {
Address : l . config . AdvertiseAddr ,
TaggedAddresses : l . config . TaggedAddresses ,
NodeMeta : l . metadata ,
Service : service ,
Check : l . checks [ id ] ,
Check : c . Check ,
WriteRequest : structs . WriteRequest { Token : l . checkToken ( id ) } ,
}
// Pull in the associated service if any
s := l . services [ c . Check . ServiceID ]
if s != nil && ! s . Deleted {
req . Service = s . Service
}
var out struct { }
err := l . delegate . RPC ( "Catalog.Register" , & req , & out )
if err == nil {
l . checkStatus [ id ] = syncStatus { inSync : true }
l . check s[ id ] . InSync = true
// Given how the register API works, this info is also updated
// every time we sync a check.
l . nodeInfoInSync = true
l . logger . Printf ( "[INFO] agent: Synced check '%s'" , id )
} else if acl . IsErrPermissionDenied ( err ) {
l . checkStatus [ id ] = syncStatus { inSync : true }
return nil
}
if acl . IsErrPermissionDenied ( err ) {
// todo(fs): why is the check in sync here?
l . checks [ id ] . InSync = true
l . logger . Printf ( "[WARN] agent: Check '%s' registration blocked by ACLs" , id )
return nil
}
@ -763,7 +889,10 @@ func (l *State) syncNodeInfo() error {
if err == nil {
l . nodeInfoInSync = true
l . logger . Printf ( "[INFO] agent: Synced node info" )
} else if acl . IsErrPermissionDenied ( err ) {
return nil
}
if acl . IsErrPermissionDenied ( err ) {
// todo(fs): why is the node info in sync here?
l . nodeInfoInSync = true
l . logger . Printf ( "[WARN] agent: Node info update blocked by ACLs" )
return nil