Merge pull request #3391 from hashicorp/refactor-local-state-2

This patch decouples the local state and the anti-entropy code from the
agent and from each other to have cleaner separation of concerns and to
ensure that data structures are properly locked.
pull/3609/head
Frank Schroeder 2017-10-23 10:59:55 +02:00
commit c88733d144
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
18 changed files with 2478 additions and 1754 deletions

View File

@ -259,7 +259,7 @@ func (a *Agent) vetServiceRegister(token string, service *structs.NodeService) e
} }
// Vet any service that might be getting overwritten. // Vet any service that might be getting overwritten.
services := a.state.Services() services := a.State.Services()
if existing, ok := services[service.ID]; ok { if existing, ok := services[service.ID]; ok {
if !rule.ServiceWrite(existing.Service, nil) { if !rule.ServiceWrite(existing.Service, nil) {
return acl.ErrPermissionDenied return acl.ErrPermissionDenied
@ -282,7 +282,7 @@ func (a *Agent) vetServiceUpdate(token string, serviceID string) error {
} }
// Vet any changes based on the existing services's info. // Vet any changes based on the existing services's info.
services := a.state.Services() services := a.State.Services()
if existing, ok := services[serviceID]; ok { if existing, ok := services[serviceID]; ok {
if !rule.ServiceWrite(existing.Service, nil) { if !rule.ServiceWrite(existing.Service, nil) {
return acl.ErrPermissionDenied return acl.ErrPermissionDenied
@ -318,7 +318,7 @@ func (a *Agent) vetCheckRegister(token string, check *structs.HealthCheck) error
} }
// Vet any check that might be getting overwritten. // Vet any check that might be getting overwritten.
checks := a.state.Checks() checks := a.State.Checks()
if existing, ok := checks[check.CheckID]; ok { if existing, ok := checks[check.CheckID]; ok {
if len(existing.ServiceName) > 0 { if len(existing.ServiceName) > 0 {
if !rule.ServiceWrite(existing.ServiceName, nil) { if !rule.ServiceWrite(existing.ServiceName, nil) {
@ -346,7 +346,7 @@ func (a *Agent) vetCheckUpdate(token string, checkID types.CheckID) error {
} }
// Vet any changes based on the existing check's info. // Vet any changes based on the existing check's info.
checks := a.state.Checks() checks := a.State.Checks()
if existing, ok := checks[checkID]; ok { if existing, ok := checks[checkID]; ok {
if len(existing.ServiceName) > 0 { if len(existing.ServiceName) > 0 {
if !rule.ServiceWrite(existing.ServiceName, nil) { if !rule.ServiceWrite(existing.ServiceName, nil) {

View File

@ -564,7 +564,7 @@ func TestACL_vetServiceRegister(t *testing.T) {
// Try to register over a service without write privs to the existing // Try to register over a service without write privs to the existing
// service. // service.
a.state.AddService(&structs.NodeService{ a.State.AddService(&structs.NodeService{
ID: "my-service", ID: "my-service",
Service: "other", Service: "other",
}, "") }, "")
@ -596,7 +596,7 @@ func TestACL_vetServiceUpdate(t *testing.T) {
} }
// Update with write privs. // Update with write privs.
a.state.AddService(&structs.NodeService{ a.State.AddService(&structs.NodeService{
ID: "my-service", ID: "my-service",
Service: "service", Service: "service",
}, "") }, "")
@ -662,11 +662,11 @@ func TestACL_vetCheckRegister(t *testing.T) {
// Try to register over a service check without write privs to the // Try to register over a service check without write privs to the
// existing service. // existing service.
a.state.AddService(&structs.NodeService{ a.State.AddService(&structs.NodeService{
ID: "my-service", ID: "my-service",
Service: "service", Service: "service",
}, "") }, "")
a.state.AddCheck(&structs.HealthCheck{ a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-check"), CheckID: types.CheckID("my-check"),
ServiceID: "my-service", ServiceID: "my-service",
ServiceName: "other", ServiceName: "other",
@ -681,7 +681,7 @@ func TestACL_vetCheckRegister(t *testing.T) {
} }
// Try to register over a node check without write privs to the node. // Try to register over a node check without write privs to the node.
a.state.AddCheck(&structs.HealthCheck{ a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-node-check"), CheckID: types.CheckID("my-node-check"),
}, "") }, "")
err = a.vetCheckRegister("service-rw", &structs.HealthCheck{ err = a.vetCheckRegister("service-rw", &structs.HealthCheck{
@ -713,11 +713,11 @@ func TestACL_vetCheckUpdate(t *testing.T) {
} }
// Update service check with write privs. // Update service check with write privs.
a.state.AddService(&structs.NodeService{ a.State.AddService(&structs.NodeService{
ID: "my-service", ID: "my-service",
Service: "service", Service: "service",
}, "") }, "")
a.state.AddCheck(&structs.HealthCheck{ a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-service-check"), CheckID: types.CheckID("my-service-check"),
ServiceID: "my-service", ServiceID: "my-service",
ServiceName: "service", ServiceName: "service",
@ -734,7 +734,7 @@ func TestACL_vetCheckUpdate(t *testing.T) {
} }
// Update node check with write privs. // Update node check with write privs.
a.state.AddCheck(&structs.HealthCheck{ a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-node-check"), CheckID: types.CheckID("my-node-check"),
}, "") }, "")
err = a.vetCheckUpdate("node-rw", "my-node-check") err = a.vetCheckUpdate("node-rw", "my-node-check")

321
agent/ae/ae.go Normal file
View File

@ -0,0 +1,321 @@
// Package ae provides tools to synchronize state between local and remote consul servers.
package ae
import (
"fmt"
"log"
"math"
"sync"
"time"
"github.com/hashicorp/consul/lib"
)
// scaleThreshold is the number of nodes after which regular sync runs are
// spread out farther apart. The value should be a power of 2 since the
// scale function uses log2.
//
// When set to 128 nodes the delay between regular runs is doubled when the
// cluster is larger than 128 nodes. It doubles again when it passes 256
// nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay
// factor is 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
const scaleThreshold = 128
// scaleFactor returns a factor by which the next sync run should be delayed to
// avoid saturation of the cluster. The larger the cluster grows the farther
// the sync runs should be spread apart.
//
// The current implementation uses a log2 scale which doubles the delay between
// runs every time the cluster doubles in size.
func scaleFactor(nodes int) int {
if nodes <= scaleThreshold {
return 1.0
}
return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0)
}
type SyncState interface {
SyncChanges() error
SyncFull() error
}
// StateSyncer manages background synchronization of the given state.
//
// The state is synchronized on a regular basis or on demand when either
// the state has changed or a new Consul server has joined the cluster.
//
// The regular state sychronization provides a self-healing mechanism
// for the cluster which is also called anti-entropy.
type StateSyncer struct {
// State contains the data that needs to be synchronized.
State SyncState
// Interval is the time between two full sync runs.
Interval time.Duration
// ShutdownCh is closed when the application is shutting down.
ShutdownCh chan struct{}
// Logger is the logger.
Logger *log.Logger
// ClusterSize returns the number of members in the cluster to
// allow staggering the sync runs based on cluster size.
// This needs to be set before Run() is called.
ClusterSize func() int
// SyncFull allows triggering an immediate but staggered full sync
// in a non-blocking way.
SyncFull *Trigger
// SyncChanges allows triggering an immediate partial sync
// in a non-blocking way.
SyncChanges *Trigger
// paused stores whether sync runs are temporarily disabled.
pauseLock sync.Mutex
paused int
// serverUpInterval is the max time after which a full sync is
// performed when a server has been added to the cluster.
serverUpInterval time.Duration
// retryFailInterval is the time after which a failed full sync is retried.
retryFailInterval time.Duration
// stagger randomly picks a duration between 0s and the given duration.
stagger func(time.Duration) time.Duration
// retrySyncFullEvent generates an event based on multiple conditions
// when the state machine is trying to retry a full state sync.
retrySyncFullEvent func() event
// syncChangesEvent generates an event based on multiple conditions
// when the state machine is performing partial state syncs.
syncChangesEvent func() event
}
const (
// serverUpIntv is the max time to wait before a sync is triggered
// when a consul server has been added to the cluster.
serverUpIntv = 3 * time.Second
// retryFailIntv is the min time to wait before a failed sync is retried.
retryFailIntv = 15 * time.Second
)
func NewStateSyncer(state SyncState, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
s := &StateSyncer{
State: state,
Interval: intv,
ShutdownCh: shutdownCh,
Logger: logger,
SyncFull: NewTrigger(),
SyncChanges: NewTrigger(),
serverUpInterval: serverUpIntv,
retryFailInterval: retryFailIntv,
}
// retain these methods as member variables so that
// we can mock them for testing.
s.retrySyncFullEvent = s.retrySyncFullEventFn
s.syncChangesEvent = s.syncChangesEventFn
s.stagger = s.staggerFn
return s
}
// fsmState defines states for the state machine.
type fsmState string
const (
doneState fsmState = "done"
fullSyncState fsmState = "fullSync"
partialSyncState fsmState = "partialSync"
retryFullSyncState fsmState = "retryFullSync"
)
// Run is the long running method to perform state synchronization
// between local and remote servers.
func (s *StateSyncer) Run() {
if s.ClusterSize == nil {
panic("ClusterSize not set")
}
s.runFSM(fullSyncState, s.nextFSMState)
}
// runFSM runs the state machine.
func (s *StateSyncer) runFSM(fs fsmState, next func(fsmState) fsmState) {
for {
if fs = next(fs); fs == doneState {
return
}
}
}
// nextFSMState determines the next state based on the current state.
func (s *StateSyncer) nextFSMState(fs fsmState) fsmState {
switch fs {
case fullSyncState:
if s.Paused() {
return retryFullSyncState
}
err := s.State.SyncFull()
if err != nil {
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
return retryFullSyncState
}
return partialSyncState
case retryFullSyncState:
e := s.retrySyncFullEvent()
switch e {
case syncFullNotifEvent, syncFullTimerEvent:
return fullSyncState
case shutdownEvent:
return doneState
default:
panic(fmt.Sprintf("invalid event: %s", e))
}
case partialSyncState:
e := s.syncChangesEvent()
switch e {
case syncFullNotifEvent, syncFullTimerEvent:
return fullSyncState
case syncChangesNotifEvent:
if s.Paused() {
return partialSyncState
}
err := s.State.SyncChanges()
if err != nil {
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
return partialSyncState
case shutdownEvent:
return doneState
default:
panic(fmt.Sprintf("invalid event: %s", e))
}
default:
panic(fmt.Sprintf("invalid state: %s", fs))
}
}
// event defines a timing or notification event from multiple timers and
// channels.
type event string
const (
shutdownEvent event = "shutdown"
syncFullNotifEvent event = "syncFullNotif"
syncFullTimerEvent event = "syncFullTimer"
syncChangesNotifEvent event = "syncChangesNotif"
)
// retrySyncFullEventFn waits for an event which triggers a retry
// of a full sync or a termination signal. This function should not be
// called directly but through s.retryFullSyncState to allow mocking for
// testing.
func (s *StateSyncer) retrySyncFullEventFn() event {
select {
// trigger a full sync immediately.
// this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(s.stagger(s.serverUpInterval)):
return syncFullNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
// retry full sync after some time
// todo(fs): why don't we use s.Interval here?
case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)):
return syncFullTimerEvent
case <-s.ShutdownCh:
return shutdownEvent
}
}
// syncChangesEventFn waits for a event which either triggers a full
// or a partial sync or a termination signal. This function should not
// be called directly but through s.syncChangesEvent to allow mocking
// for testing.
func (s *StateSyncer) syncChangesEventFn() event {
select {
// trigger a full sync immediately
// this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(s.stagger(s.serverUpInterval)):
return syncFullNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
// time for a full sync again
case <-time.After(s.Interval + s.stagger(s.Interval)):
return syncFullTimerEvent
// do partial syncs on demand
case <-s.SyncChanges.Notif():
return syncChangesNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
}
// stubbed out for testing
var libRandomStagger = lib.RandomStagger
// staggerFn returns a random duration which depends on the cluster size
// and a random factor which should provide some timely distribution of
// cluster wide events. This function should not be called directly
// but through s.stagger to allow mocking for testing.
func (s *StateSyncer) staggerFn(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return libRandomStagger(time.Duration(f) * d)
}
// Pause temporarily disables sync runs.
func (s *StateSyncer) Pause() {
s.pauseLock.Lock()
s.paused++
s.pauseLock.Unlock()
}
// Paused returns whether sync runs are temporarily disabled.
func (s *StateSyncer) Paused() bool {
s.pauseLock.Lock()
defer s.pauseLock.Unlock()
return s.paused != 0
}
// Resume re-enables sync runs.
func (s *StateSyncer) Resume() {
s.pauseLock.Lock()
s.paused--
if s.paused < 0 {
panic("unbalanced pause/resume")
}
trigger := s.paused == 0
s.pauseLock.Unlock()
if trigger {
s.SyncChanges.Trigger()
}
}

397
agent/ae/ae_test.go Normal file
View File

@ -0,0 +1,397 @@
package ae
import (
"errors"
"fmt"
"log"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/lib"
)
func TestAE_scaleFactor(t *testing.T) {
t.Parallel()
tests := []struct {
nodes int
scale int
}{
{100, 1},
{200, 2},
{1000, 4},
{10000, 8},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%d nodes", tt.nodes), func(t *testing.T) {
if got, want := scaleFactor(tt.nodes), tt.scale; got != want {
t.Fatalf("got scale factor %d want %d", got, want)
}
})
}
}
func TestAE_Pause_nestedPauseResume(t *testing.T) {
t.Parallel()
l := NewStateSyncer(nil, 0, nil, nil)
if l.Paused() != false {
t.Fatal("syncer should be unPaused after init")
}
l.Pause()
if l.Paused() != true {
t.Fatal("syncer should be Paused after first call to Pause()")
}
l.Pause()
if l.Paused() != true {
t.Fatal("syncer should STILL be Paused after second call to Pause()")
}
l.Resume()
if l.Paused() != true {
t.Fatal("syncer should STILL be Paused after FIRST call to Resume()")
}
l.Resume()
if l.Paused() != false {
t.Fatal("syncer should NOT be Paused after SECOND call to Resume()")
}
defer func() {
err := recover()
if err == nil {
t.Fatal("unbalanced Resume() should panic")
}
}()
l.Resume()
}
func TestAE_Pause_ResumeTriggersSyncChanges(t *testing.T) {
l := NewStateSyncer(nil, 0, nil, nil)
l.Pause()
l.Resume()
select {
case <-l.SyncChanges.Notif():
// expected
case <-l.SyncFull.Notif():
t.Fatal("resume triggered SyncFull instead of SyncChanges")
default:
t.Fatal("resume did not trigger SyncFull")
}
}
func TestAE_staggerDependsOnClusterSize(t *testing.T) {
libRandomStagger = func(d time.Duration) time.Duration { return d }
defer func() { libRandomStagger = lib.RandomStagger }()
l := testSyncer()
if got, want := l.staggerFn(10*time.Millisecond), 10*time.Millisecond; got != want {
t.Fatalf("got %v want %v", got, want)
}
l.ClusterSize = func() int { return 256 }
if got, want := l.staggerFn(10*time.Millisecond), 20*time.Millisecond; got != want {
t.Fatalf("got %v want %v", got, want)
}
}
func TestAE_Run_SyncFullBeforeChanges(t *testing.T) {
shutdownCh := make(chan struct{})
state := &mock{
syncChanges: func() error {
close(shutdownCh)
return nil
},
}
// indicate that we have partial changes before starting Run
l := testSyncer()
l.State = state
l.ShutdownCh = shutdownCh
l.SyncChanges.Trigger()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
l.Run()
}()
wg.Wait()
if got, want := state.seq, []string{"full", "changes"}; !reflect.DeepEqual(got, want) {
t.Fatalf("got call sequence %v want %v", got, want)
}
}
func TestAE_Run_Quit(t *testing.T) {
t.Run("Run panics without ClusterSize", func(t *testing.T) {
defer func() {
err := recover()
if err == nil {
t.Fatal("Run should panic")
}
}()
l := testSyncer()
l.ClusterSize = nil
l.Run()
})
t.Run("runFSM quits", func(t *testing.T) {
// start timer which explodes if runFSM does not quit
tm := time.AfterFunc(time.Second, func() { panic("timeout") })
l := testSyncer()
l.runFSM(fullSyncState, func(fsmState) fsmState { return doneState })
// should just quit
tm.Stop()
})
}
func TestAE_FSM(t *testing.T) {
t.Run("fullSyncState", func(t *testing.T) {
t.Run("Paused -> retryFullSyncState", func(t *testing.T) {
l := testSyncer()
l.Pause()
fs := l.nextFSMState(fullSyncState)
if got, want := fs, retryFullSyncState; got != want {
t.Fatalf("got state %v want %v", got, want)
}
})
t.Run("SyncFull() error -> retryFullSyncState", func(t *testing.T) {
l := testSyncer()
l.State = &mock{syncFull: func() error { return errors.New("boom") }}
fs := l.nextFSMState(fullSyncState)
if got, want := fs, retryFullSyncState; got != want {
t.Fatalf("got state %v want %v", got, want)
}
})
t.Run("SyncFull() OK -> partialSyncState", func(t *testing.T) {
l := testSyncer()
l.State = &mock{}
fs := l.nextFSMState(fullSyncState)
if got, want := fs, partialSyncState; got != want {
t.Fatalf("got state %v want %v", got, want)
}
})
})
t.Run("retryFullSyncState", func(t *testing.T) {
// helper for testing state transitions from retrySyncFullState
test := func(ev event, to fsmState) {
l := testSyncer()
l.retrySyncFullEvent = func() event { return ev }
fs := l.nextFSMState(retryFullSyncState)
if got, want := fs, to; got != want {
t.Fatalf("got state %v want %v", got, want)
}
}
t.Run("shutdownEvent -> doneState", func(t *testing.T) {
test(shutdownEvent, doneState)
})
t.Run("syncFullNotifEvent -> fullSyncState", func(t *testing.T) {
test(syncFullNotifEvent, fullSyncState)
})
t.Run("syncFullTimerEvent -> fullSyncState", func(t *testing.T) {
test(syncFullTimerEvent, fullSyncState)
})
t.Run("invalid event -> panic ", func(t *testing.T) {
defer func() {
err := recover()
if err == nil {
t.Fatal("invalid event should panic")
}
}()
test(event("invalid"), fsmState(""))
})
})
t.Run("partialSyncState", func(t *testing.T) {
// helper for testing state transitions from partialSyncState
test := func(ev event, to fsmState) {
l := testSyncer()
l.syncChangesEvent = func() event { return ev }
fs := l.nextFSMState(partialSyncState)
if got, want := fs, to; got != want {
t.Fatalf("got state %v want %v", got, want)
}
}
t.Run("shutdownEvent -> doneState", func(t *testing.T) {
test(shutdownEvent, doneState)
})
t.Run("syncFullNotifEvent -> fullSyncState", func(t *testing.T) {
test(syncFullNotifEvent, fullSyncState)
})
t.Run("syncFullTimerEvent -> fullSyncState", func(t *testing.T) {
test(syncFullTimerEvent, fullSyncState)
})
t.Run("syncChangesEvent+Paused -> partialSyncState", func(t *testing.T) {
l := testSyncer()
l.Pause()
l.syncChangesEvent = func() event { return syncChangesNotifEvent }
fs := l.nextFSMState(partialSyncState)
if got, want := fs, partialSyncState; got != want {
t.Fatalf("got state %v want %v", got, want)
}
})
t.Run("syncChangesEvent+SyncChanges() error -> partialSyncState", func(t *testing.T) {
l := testSyncer()
l.State = &mock{syncChanges: func() error { return errors.New("boom") }}
l.syncChangesEvent = func() event { return syncChangesNotifEvent }
fs := l.nextFSMState(partialSyncState)
if got, want := fs, partialSyncState; got != want {
t.Fatalf("got state %v want %v", got, want)
}
})
t.Run("syncChangesEvent+SyncChanges() OK -> partialSyncState", func(t *testing.T) {
l := testSyncer()
l.State = &mock{}
l.syncChangesEvent = func() event { return syncChangesNotifEvent }
fs := l.nextFSMState(partialSyncState)
if got, want := fs, partialSyncState; got != want {
t.Fatalf("got state %v want %v", got, want)
}
})
t.Run("invalid event -> panic ", func(t *testing.T) {
defer func() {
err := recover()
if err == nil {
t.Fatal("invalid event should panic")
}
}()
test(event("invalid"), fsmState(""))
})
})
t.Run("invalid state -> panic ", func(t *testing.T) {
defer func() {
err := recover()
if err == nil {
t.Fatal("invalid state should panic")
}
}()
l := testSyncer()
l.nextFSMState(fsmState("invalid"))
})
}
func TestAE_RetrySyncFullEvent(t *testing.T) {
t.Run("trigger shutdownEvent", func(t *testing.T) {
l := testSyncer()
l.ShutdownCh = make(chan struct{})
evch := make(chan event)
go func() { evch <- l.retrySyncFullEvent() }()
close(l.ShutdownCh)
if got, want := <-evch, shutdownEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
t.Run("trigger shutdownEvent during FullNotif", func(t *testing.T) {
l := testSyncer()
l.ShutdownCh = make(chan struct{})
evch := make(chan event)
go func() { evch <- l.retrySyncFullEvent() }()
l.SyncFull.Trigger()
time.Sleep(100 * time.Millisecond)
close(l.ShutdownCh)
if got, want := <-evch, shutdownEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
t.Run("trigger syncFullNotifEvent", func(t *testing.T) {
l := testSyncer()
l.serverUpInterval = 10 * time.Millisecond
evch := make(chan event)
go func() { evch <- l.retrySyncFullEvent() }()
l.SyncFull.Trigger()
if got, want := <-evch, syncFullNotifEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
t.Run("trigger syncFullTimerEvent", func(t *testing.T) {
l := testSyncer()
l.retryFailInterval = 10 * time.Millisecond
evch := make(chan event)
go func() { evch <- l.retrySyncFullEvent() }()
if got, want := <-evch, syncFullTimerEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
}
func TestAE_SyncChangesEvent(t *testing.T) {
t.Run("trigger shutdownEvent", func(t *testing.T) {
l := testSyncer()
l.ShutdownCh = make(chan struct{})
evch := make(chan event)
go func() { evch <- l.syncChangesEvent() }()
close(l.ShutdownCh)
if got, want := <-evch, shutdownEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
t.Run("trigger shutdownEvent during FullNotif", func(t *testing.T) {
l := testSyncer()
l.ShutdownCh = make(chan struct{})
evch := make(chan event)
go func() { evch <- l.syncChangesEvent() }()
l.SyncFull.Trigger()
time.Sleep(100 * time.Millisecond)
close(l.ShutdownCh)
if got, want := <-evch, shutdownEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
t.Run("trigger syncFullNotifEvent", func(t *testing.T) {
l := testSyncer()
l.serverUpInterval = 10 * time.Millisecond
evch := make(chan event)
go func() { evch <- l.syncChangesEvent() }()
l.SyncFull.Trigger()
if got, want := <-evch, syncFullNotifEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
t.Run("trigger syncFullTimerEvent", func(t *testing.T) {
l := testSyncer()
l.Interval = 10 * time.Millisecond
evch := make(chan event)
go func() { evch <- l.syncChangesEvent() }()
if got, want := <-evch, syncFullTimerEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
t.Run("trigger syncChangesNotifEvent", func(t *testing.T) {
l := testSyncer()
evch := make(chan event)
go func() { evch <- l.syncChangesEvent() }()
l.SyncChanges.Trigger()
if got, want := <-evch, syncChangesNotifEvent; got != want {
t.Fatalf("got event %q want %q", got, want)
}
})
}
type mock struct {
seq []string
syncFull, syncChanges func() error
}
func (m *mock) SyncFull() error {
m.seq = append(m.seq, "full")
if m.syncFull != nil {
return m.syncFull()
}
return nil
}
func (m *mock) SyncChanges() error {
m.seq = append(m.seq, "changes")
if m.syncChanges != nil {
return m.syncChanges()
}
return nil
}
func testSyncer() *StateSyncer {
logger := log.New(os.Stderr, "", 0)
l := NewStateSyncer(nil, time.Second, nil, logger)
l.stagger = func(d time.Duration) time.Duration { return d }
l.ClusterSize = func() int { return 1 }
return l
}

23
agent/ae/trigger.go Normal file
View File

@ -0,0 +1,23 @@
package ae
// Trigger implements a non-blocking event notifier. Events can be
// triggered without blocking and notifications happen only when the
// previous event was consumed.
type Trigger struct {
ch chan struct{}
}
func NewTrigger() *Trigger {
return &Trigger{make(chan struct{}, 1)}
}
func (t Trigger) Trigger() {
select {
case t.ch <- struct{}{}:
default:
}
}
func (t Trigger) Notif() <-chan struct{} {
return t.ch
}

View File

@ -20,8 +20,10 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd" "github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
@ -107,7 +109,11 @@ type Agent struct {
// state stores a local representation of the node, // state stores a local representation of the node,
// services and checks. Used for anti-entropy. // services and checks. Used for anti-entropy.
state *localState State *local.State
// sync manages the synchronization of the local
// and the remote state.
sync *ae.StateSyncer
// checkReapAfter maps the check ID to a timeout after which we should // checkReapAfter maps the check ID to a timeout after which we should
// reap its associated service // reap its associated service
@ -224,6 +230,22 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
return a, nil return a, nil
} }
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
lc := local.Config{
AdvertiseAddr: cfg.AdvertiseAddrLAN.String(),
CheckUpdateInterval: cfg.CheckUpdateInterval,
Datacenter: cfg.Datacenter,
DiscardCheckOutput: cfg.DiscardCheckOutput,
NodeID: cfg.NodeID,
NodeName: cfg.NodeName,
TaggedAddresses: map[string]string{},
}
for k, v := range cfg.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
return lc
}
func (a *Agent) Start() error { func (a *Agent) Start() error {
c := a.config c := a.config
@ -242,7 +264,11 @@ func (a *Agent) Start() error {
} }
// create the local state // create the local state
a.state = NewLocalState(c, a.logger, a.tokens) a.State = local.NewState(LocalConfig(c), a.logger, a.tokens)
// create the state synchronization manager which performs
// regular and on-demand state synchronizations (anti-entropy).
a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)
// create the config for the rpc server/client // create the config for the rpc server/client
consulCfg, err := a.consulConfig() consulCfg, err := a.consulConfig()
@ -250,8 +276,10 @@ func (a *Agent) Start() error {
return err return err
} }
// link consul client/server with the state // ServerUp is used to inform that a new consul server is now
consulCfg.ServerUp = a.state.ConsulServerUp // up. This can be used to speed up the sync process if we are blocking
// waiting to discover a consul server
consulCfg.ServerUp = a.sync.SyncFull.Trigger
// Setup either the client or the server. // Setup either the client or the server.
if c.ServerMode { if c.ServerMode {
@ -259,19 +287,25 @@ func (a *Agent) Start() error {
if err != nil { if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err) return fmt.Errorf("Failed to start Consul server: %v", err)
} }
a.delegate = server a.delegate = server
a.state.delegate = server
} else { } else {
client, err := consul.NewClientLogger(consulCfg, a.logger) client, err := consul.NewClientLogger(consulCfg, a.logger)
if err != nil { if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err) return fmt.Errorf("Failed to start Consul client: %v", err)
} }
a.delegate = client a.delegate = client
a.state.delegate = client
} }
// the staggering of the state syncing depends on the cluster size.
a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) }
// link the state with the consul server/client and the state syncer
// via callbacks. After several attempts this was easier than using
// channels since the event notification needs to be non-blocking
// and that should be hidden in the state syncer implementation.
a.State.Delegate = a.delegate
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
// Load checks/services/metadata. // Load checks/services/metadata.
if err := a.loadServices(c); err != nil { if err := a.loadServices(c); err != nil {
return err return err
@ -1264,18 +1298,18 @@ func (a *Agent) WANMembers() []serf.Member {
// StartSync is called once Services and Checks are registered. // StartSync is called once Services and Checks are registered.
// This is called to prevent a race between clients and the anti-entropy routines // This is called to prevent a race between clients and the anti-entropy routines
func (a *Agent) StartSync() { func (a *Agent) StartSync() {
// Start the anti entropy routine go a.sync.Run()
go a.state.antiEntropy(a.shutdownCh) a.logger.Printf("[INFO] agent: started state syncer")
} }
// PauseSync is used to pause anti-entropy while bulk changes are make // PauseSync is used to pause anti-entropy while bulk changes are make
func (a *Agent) PauseSync() { func (a *Agent) PauseSync() {
a.state.Pause() a.sync.Pause()
} }
// ResumeSync is used to unpause anti-entropy after bulk changes are make // ResumeSync is used to unpause anti-entropy after bulk changes are make
func (a *Agent) ResumeSync() { func (a *Agent) ResumeSync() {
a.state.Resume() a.sync.Resume()
} }
// GetLANCoordinate returns the coordinates of this node in the local pools // GetLANCoordinate returns the coordinates of this node in the local pools
@ -1339,29 +1373,31 @@ OUTER:
// reapServicesInternal does a single pass, looking for services to reap. // reapServicesInternal does a single pass, looking for services to reap.
func (a *Agent) reapServicesInternal() { func (a *Agent) reapServicesInternal() {
reaped := make(map[string]struct{}) reaped := make(map[string]bool)
for checkID, check := range a.state.CriticalChecks() { for checkID, cs := range a.State.CriticalCheckStates() {
serviceID := cs.Check.ServiceID
// There's nothing to do if there's no service. // There's nothing to do if there's no service.
if check.Check.ServiceID == "" { if serviceID == "" {
continue continue
} }
// There might be multiple checks for one service, so // There might be multiple checks for one service, so
// we don't need to reap multiple times. // we don't need to reap multiple times.
serviceID := check.Check.ServiceID if reaped[serviceID] {
if _, ok := reaped[serviceID]; ok {
continue continue
} }
// See if there's a timeout. // See if there's a timeout.
// todo(fs): this looks fishy... why is there anoter data structure in the agent with its own lock?
a.checkLock.Lock() a.checkLock.Lock()
timeout, ok := a.checkReapAfter[checkID] timeout := a.checkReapAfter[checkID]
a.checkLock.Unlock() a.checkLock.Unlock()
// Reap, if necessary. We keep track of which service // Reap, if necessary. We keep track of which service
// this is so that we won't try to remove it again. // this is so that we won't try to remove it again.
if ok && check.CriticalFor > timeout { if timeout > 0 && cs.CriticalFor() > timeout {
reaped[serviceID] = struct{}{} reaped[serviceID] = true
a.RemoveService(serviceID, true) a.RemoveService(serviceID, true)
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service", a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
checkID, serviceID) checkID, serviceID)
@ -1396,7 +1432,7 @@ func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
wrapped := persistedService{ wrapped := persistedService{
Token: a.state.ServiceToken(service.ID), Token: a.State.ServiceToken(service.ID),
Service: service, Service: service,
} }
encoded, err := json.Marshal(wrapped) encoded, err := json.Marshal(wrapped)
@ -1424,7 +1460,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT
wrapped := persistedCheck{ wrapped := persistedCheck{
Check: check, Check: check,
ChkType: chkType, ChkType: chkType,
Token: a.state.CheckToken(check.CheckID), Token: a.State.CheckToken(check.CheckID),
} }
encoded, err := json.Marshal(wrapped) encoded, err := json.Marshal(wrapped)
@ -1523,7 +1559,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
defer a.restoreCheckState(snap) defer a.restoreCheckState(snap)
// Add the service // Add the service
a.state.AddService(service, token) a.State.AddService(service, token)
// Persist the service to a file // Persist the service to a file
if persist && !a.config.DevMode { if persist && !a.config.DevMode {
@ -1573,7 +1609,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
} }
// Remove service immediately // Remove service immediately
if err := a.state.RemoveService(serviceID); err != nil { if err := a.State.RemoveService(serviceID); err != nil {
a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err) a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err)
return nil return nil
} }
@ -1586,8 +1622,8 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
} }
// Deregister any associated health checks // Deregister any associated health checks
for checkID, health := range a.state.Checks() { for checkID, check := range a.State.Checks() {
if health.ServiceID != serviceID { if check.ServiceID != serviceID {
continue continue
} }
if err := a.RemoveCheck(checkID, persist); err != nil { if err := a.RemoveCheck(checkID, persist); err != nil {
@ -1619,11 +1655,11 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
if check.ServiceID != "" { if check.ServiceID != "" {
svc, ok := a.state.Services()[check.ServiceID] s := a.State.Service(check.ServiceID)
if !ok { if s == nil {
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID) return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
} }
check.ServiceName = svc.Service check.ServiceName = s.Service
} }
a.checkLock.Lock() a.checkLock.Lock()
@ -1640,7 +1676,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
ttl := &CheckTTL{ ttl := &CheckTTL{
Notify: a.state, Notify: a.State,
CheckID: check.CheckID, CheckID: check.CheckID,
TTL: chkType.TTL, TTL: chkType.TTL,
Logger: a.logger, Logger: a.logger,
@ -1667,7 +1703,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
http := &CheckHTTP{ http := &CheckHTTP{
Notify: a.state, Notify: a.State,
CheckID: check.CheckID, CheckID: check.CheckID,
HTTP: chkType.HTTP, HTTP: chkType.HTTP,
Header: chkType.Header, Header: chkType.Header,
@ -1692,7 +1728,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
tcp := &CheckTCP{ tcp := &CheckTCP{
Notify: a.state, Notify: a.State,
CheckID: check.CheckID, CheckID: check.CheckID,
TCP: chkType.TCP, TCP: chkType.TCP,
Interval: chkType.Interval, Interval: chkType.Interval,
@ -1729,7 +1765,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
dockerCheck := &CheckDocker{ dockerCheck := &CheckDocker{
Notify: a.state, Notify: a.State,
CheckID: check.CheckID, CheckID: check.CheckID,
DockerContainerID: chkType.DockerContainerID, DockerContainerID: chkType.DockerContainerID,
Shell: chkType.Shell, Shell: chkType.Shell,
@ -1759,7 +1795,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
monitor := &CheckMonitor{ monitor := &CheckMonitor{
Notify: a.state, Notify: a.State,
CheckID: check.CheckID, CheckID: check.CheckID,
Script: chkType.Script, Script: chkType.Script,
ScriptArgs: chkType.ScriptArgs, ScriptArgs: chkType.ScriptArgs,
@ -1788,7 +1824,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
// Add to the local state for anti-entropy // Add to the local state for anti-entropy
err := a.state.AddCheck(check, token) err := a.State.AddCheck(check, token)
if err != nil { if err != nil {
a.cancelCheckMonitors(check.CheckID) a.cancelCheckMonitors(check.CheckID)
return err return err
@ -1811,7 +1847,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
} }
// Add to the local state for anti-entropy // Add to the local state for anti-entropy
a.state.RemoveCheck(checkID) a.State.RemoveCheck(checkID)
a.checkLock.Lock() a.checkLock.Lock()
defer a.checkLock.Unlock() defer a.checkLock.Unlock()
@ -1971,15 +2007,13 @@ func (a *Agent) GossipEncrypted() bool {
// Stats is used to get various debugging state from the sub-systems // Stats is used to get various debugging state from the sub-systems
func (a *Agent) Stats() map[string]map[string]string { func (a *Agent) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
stats := a.delegate.Stats() stats := a.delegate.Stats()
stats["agent"] = map[string]string{ stats["agent"] = map[string]string{
"check_monitors": toString(uint64(len(a.checkMonitors))), "check_monitors": strconv.Itoa(len(a.checkMonitors)),
"check_ttls": toString(uint64(len(a.checkTTLs))), "check_ttls": strconv.Itoa(len(a.checkTTLs)),
"checks": toString(uint64(len(a.state.checks))), }
"services": toString(uint64(len(a.state.services))), for k, v := range a.State.Stats() {
stats["agent"][k] = v
} }
revision := a.config.Revision revision := a.config.Revision
@ -2102,7 +2136,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
} }
serviceID := p.Service.ID serviceID := p.Service.ID
if _, ok := a.state.services[serviceID]; ok { if a.State.Service(serviceID) != nil {
// Purge previously persisted service. This allows config to be // Purge previously persisted service. This allows config to be
// preferred over services persisted from the API. // preferred over services persisted from the API.
a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q", a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q",
@ -2122,15 +2156,13 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
return nil return nil
} }
// unloadServices will deregister all services other than the 'consul' service // unloadServices will deregister all services.
// known to the local agent.
func (a *Agent) unloadServices() error { func (a *Agent) unloadServices() error {
for _, service := range a.state.Services() { for id := range a.State.Services() {
if err := a.RemoveService(service.ID, false); err != nil { if err := a.RemoveService(id, false); err != nil {
return fmt.Errorf("Failed deregistering service '%s': %v", service.ID, err) return fmt.Errorf("Failed deregistering service '%s': %v", id, err)
} }
} }
return nil return nil
} }
@ -2182,7 +2214,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
} }
checkID := p.Check.CheckID checkID := p.Check.CheckID
if _, ok := a.state.checks[checkID]; ok { if a.State.Check(checkID) != nil {
// Purge previously persisted check. This allows config to be // Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API. // preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q", a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q",
@ -2213,12 +2245,11 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
// unloadChecks will deregister all checks known to the local agent. // unloadChecks will deregister all checks known to the local agent.
func (a *Agent) unloadChecks() error { func (a *Agent) unloadChecks() error {
for _, check := range a.state.Checks() { for id := range a.State.Checks() {
if err := a.RemoveCheck(check.CheckID, false); err != nil { if err := a.RemoveCheck(id, false); err != nil {
return fmt.Errorf("Failed deregistering check '%s': %s", check.CheckID, err) return fmt.Errorf("Failed deregistering check '%s': %s", id, err)
} }
} }
return nil return nil
} }
@ -2226,7 +2257,7 @@ func (a *Agent) unloadChecks() error {
// checks. This is done before we reload our checks, so that we can properly // checks. This is done before we reload our checks, so that we can properly
// restore into the same state. // restore into the same state.
func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck { func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck {
return a.state.Checks() return a.State.Checks()
} }
// restoreCheckState is used to reset the health state based on a snapshot. // restoreCheckState is used to reset the health state based on a snapshot.
@ -2234,33 +2265,24 @@ func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck {
// in health state and potential session invalidations. // in health state and potential session invalidations.
func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
for id, check := range snap { for id, check := range snap {
a.state.UpdateCheck(id, check.Status, check.Output) a.State.UpdateCheck(id, check.Status, check.Output)
} }
} }
// loadMetadata loads node metadata fields from the agent config and // loadMetadata loads node metadata fields from the agent config and
// updates them on the local agent. // updates them on the local agent.
func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error { func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error {
a.state.Lock() meta := map[string]string{}
defer a.state.Unlock() for k, v := range conf.NodeMeta {
meta[k] = v
for key, value := range conf.NodeMeta {
a.state.metadata[key] = value
} }
meta[structs.MetaSegmentKey] = conf.SegmentName
a.state.metadata[structs.MetaSegmentKey] = conf.SegmentName return a.State.LoadMetadata(meta)
a.state.changeMade()
return nil
} }
// unloadMetadata resets the local metadata state // unloadMetadata resets the local metadata state
func (a *Agent) unloadMetadata() { func (a *Agent) unloadMetadata() {
a.state.Lock() a.State.UnloadMetadata()
defer a.state.Unlock()
a.state.metadata = make(map[string]string)
} }
// serviceMaintCheckID returns the ID of a given service's maintenance check // serviceMaintCheckID returns the ID of a given service's maintenance check
@ -2271,14 +2293,14 @@ func serviceMaintCheckID(serviceID string) types.CheckID {
// EnableServiceMaintenance will register a false health check against the given // EnableServiceMaintenance will register a false health check against the given
// service ID with critical status. This will exclude the service from queries. // service ID with critical status. This will exclude the service from queries.
func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error { func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error {
service, ok := a.state.Services()[serviceID] service, ok := a.State.Services()[serviceID]
if !ok { if !ok {
return fmt.Errorf("No service registered with ID %q", serviceID) return fmt.Errorf("No service registered with ID %q", serviceID)
} }
// Check if maintenance mode is not already enabled // Check if maintenance mode is not already enabled
checkID := serviceMaintCheckID(serviceID) checkID := serviceMaintCheckID(serviceID)
if _, ok := a.state.Checks()[checkID]; ok { if _, ok := a.State.Checks()[checkID]; ok {
return nil return nil
} }
@ -2306,13 +2328,13 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error
// DisableServiceMaintenance will deregister the fake maintenance mode check // DisableServiceMaintenance will deregister the fake maintenance mode check
// if the service has been marked as in maintenance. // if the service has been marked as in maintenance.
func (a *Agent) DisableServiceMaintenance(serviceID string) error { func (a *Agent) DisableServiceMaintenance(serviceID string) error {
if _, ok := a.state.Services()[serviceID]; !ok { if _, ok := a.State.Services()[serviceID]; !ok {
return fmt.Errorf("No service registered with ID %q", serviceID) return fmt.Errorf("No service registered with ID %q", serviceID)
} }
// Check if maintenance mode is enabled // Check if maintenance mode is enabled
checkID := serviceMaintCheckID(serviceID) checkID := serviceMaintCheckID(serviceID)
if _, ok := a.state.Checks()[checkID]; !ok { if _, ok := a.State.Checks()[checkID]; !ok {
return nil return nil
} }
@ -2326,7 +2348,7 @@ func (a *Agent) DisableServiceMaintenance(serviceID string) error {
// EnableNodeMaintenance places a node into maintenance mode. // EnableNodeMaintenance places a node into maintenance mode.
func (a *Agent) EnableNodeMaintenance(reason, token string) { func (a *Agent) EnableNodeMaintenance(reason, token string) {
// Ensure node maintenance is not already enabled // Ensure node maintenance is not already enabled
if _, ok := a.state.Checks()[structs.NodeMaint]; ok { if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
return return
} }
@ -2349,7 +2371,7 @@ func (a *Agent) EnableNodeMaintenance(reason, token string) {
// DisableNodeMaintenance removes a node from maintenance mode // DisableNodeMaintenance removes a node from maintenance mode
func (a *Agent) DisableNodeMaintenance() { func (a *Agent) DisableNodeMaintenance() {
if _, ok := a.state.Checks()[structs.NodeMaint]; !ok { if _, ok := a.State.Checks()[structs.NodeMaint]; !ok {
return return
} }
a.RemoveCheck(structs.NodeMaint, true) a.RemoveCheck(structs.NodeMaint, true)
@ -2393,7 +2415,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
// Update filtered metrics // Update filtered metrics
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes) metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)
a.state.SetDiscardCheckOutput(newCfg.DiscardCheckOutput) a.State.SetDiscardCheckOutput(newCfg.DiscardCheckOutput)
return nil return nil
} }

View File

@ -72,7 +72,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
Coord: cs[s.agent.config.SegmentName], Coord: cs[s.agent.config.SegmentName],
Member: s.agent.LocalMember(), Member: s.agent.LocalMember(),
Stats: s.agent.Stats(), Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(), Meta: s.agent.State.Metadata(),
}, nil }, nil
} }
@ -137,7 +137,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
var token string var token string
s.parseToken(req, &token) s.parseToken(req, &token)
services := s.agent.state.Services() services := s.agent.State.Services()
if err := s.agent.filterServices(token, &services); err != nil { if err := s.agent.filterServices(token, &services); err != nil {
return nil, err return nil, err
} }
@ -161,7 +161,7 @@ func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (i
var token string var token string
s.parseToken(req, &token) s.parseToken(req, &token)
checks := s.agent.state.Checks() checks := s.agent.State.Checks()
if err := s.agent.filterChecks(token, &checks); err != nil { if err := s.agent.filterChecks(token, &checks); err != nil {
return nil, err return nil, err
} }
@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request
// services and checks to the server. If the operation fails, we only // services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later. // only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPServer) syncChanges() { func (s *HTTPServer) syncChanges() {
if err := s.agent.state.syncChanges(); err != nil { if err := s.agent.State.SyncChanges(); err != nil {
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err) s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
} }
} }

View File

@ -51,7 +51,7 @@ func TestAgent_Services(t *testing.T) {
Tags: []string{"master"}, Tags: []string{"master"},
Port: 5000, Port: 5000,
} }
a.state.AddService(srv1, "") a.State.AddService(srv1, "")
req, _ := http.NewRequest("GET", "/v1/agent/services", nil) req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
obj, err := a.srv.AgentServices(nil, req) obj, err := a.srv.AgentServices(nil, req)
@ -78,7 +78,7 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
Tags: []string{"master"}, Tags: []string{"master"},
Port: 5000, Port: 5000,
} }
a.state.AddService(srv1, "") a.State.AddService(srv1, "")
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/agent/services", nil) req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
@ -116,7 +116,7 @@ func TestAgent_Checks(t *testing.T) {
Name: "mysql", Name: "mysql",
Status: api.HealthPassing, Status: api.HealthPassing,
} }
a.state.AddCheck(chk1, "") a.State.AddCheck(chk1, "")
req, _ := http.NewRequest("GET", "/v1/agent/checks", nil) req, _ := http.NewRequest("GET", "/v1/agent/checks", nil)
obj, err := a.srv.AgentChecks(nil, req) obj, err := a.srv.AgentChecks(nil, req)
@ -143,7 +143,7 @@ func TestAgent_Checks_ACLFilter(t *testing.T) {
Name: "mysql", Name: "mysql",
Status: api.HealthPassing, Status: api.HealthPassing,
} }
a.state.AddCheck(chk1, "") a.State.AddCheck(chk1, "")
t.Run("no token", func(t *testing.T) { t.Run("no token", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/agent/checks", nil) req, _ := http.NewRequest("GET", "/v1/agent/checks", nil)
@ -283,8 +283,8 @@ func TestAgent_Reload(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
if _, ok := a.state.services["redis"]; !ok { if a.State.Service("redis") == nil {
t.Fatalf("missing redis service") t.Fatal("missing redis service")
} }
cfg2 := TestConfig(config.Source{ cfg2 := TestConfig(config.Source{
@ -307,8 +307,8 @@ func TestAgent_Reload(t *testing.T) {
if err := a.ReloadConfig(cfg2); err != nil { if err := a.ReloadConfig(cfg2); err != nil {
t.Fatalf("got error %v want nil", err) t.Fatalf("got error %v want nil", err)
} }
if _, ok := a.state.services["redis-reloaded"]; !ok { if a.State.Service("redis-reloaded") == nil {
t.Fatalf("missing redis-reloaded service") t.Fatal("missing redis-reloaded service")
} }
for _, wp := range a.watchPlans { for _, wp := range a.watchPlans {
@ -682,7 +682,7 @@ func TestAgent_RegisterCheck(t *testing.T) {
// Ensure we have a check mapping // Ensure we have a check mapping
checkID := types.CheckID("test") checkID := types.CheckID("test")
if _, ok := a.state.Checks()[checkID]; !ok { if _, ok := a.State.Checks()[checkID]; !ok {
t.Fatalf("missing test check") t.Fatalf("missing test check")
} }
@ -691,12 +691,12 @@ func TestAgent_RegisterCheck(t *testing.T) {
} }
// Ensure the token was configured // Ensure the token was configured
if token := a.state.CheckToken(checkID); token == "" { if token := a.State.CheckToken(checkID); token == "" {
t.Fatalf("missing token") t.Fatalf("missing token")
} }
// By default, checks start in critical state. // By default, checks start in critical state.
state := a.state.Checks()[checkID] state := a.State.Checks()[checkID]
if state.Status != api.HealthCritical { if state.Status != api.HealthCritical {
t.Fatalf("bad: %v", state) t.Fatalf("bad: %v", state)
} }
@ -817,7 +817,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
// Ensure we have a check mapping // Ensure we have a check mapping
checkID := types.CheckID("test") checkID := types.CheckID("test")
if _, ok := a.state.Checks()[checkID]; !ok { if _, ok := a.State.Checks()[checkID]; !ok {
t.Fatalf("missing test check") t.Fatalf("missing test check")
} }
@ -825,7 +825,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
t.Fatalf("missing test check ttl") t.Fatalf("missing test check ttl")
} }
state := a.state.Checks()[checkID] state := a.State.Checks()[checkID]
if state.Status != api.HealthPassing { if state.Status != api.HealthPassing {
t.Fatalf("bad: %v", state) t.Fatalf("bad: %v", state)
} }
@ -896,7 +896,7 @@ func TestAgent_DeregisterCheck(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
if _, ok := a.state.Checks()["test"]; ok { if _, ok := a.State.Checks()["test"]; ok {
t.Fatalf("have test check") t.Fatalf("have test check")
} }
} }
@ -947,7 +947,7 @@ func TestAgent_PassCheck(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
state := a.state.Checks()["test"] state := a.State.Checks()["test"]
if state.Status != api.HealthPassing { if state.Status != api.HealthPassing {
t.Fatalf("bad: %v", state) t.Fatalf("bad: %v", state)
} }
@ -1000,7 +1000,7 @@ func TestAgent_WarnCheck(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
state := a.state.Checks()["test"] state := a.State.Checks()["test"]
if state.Status != api.HealthWarning { if state.Status != api.HealthWarning {
t.Fatalf("bad: %v", state) t.Fatalf("bad: %v", state)
} }
@ -1053,7 +1053,7 @@ func TestAgent_FailCheck(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
state := a.state.Checks()["test"] state := a.State.Checks()["test"]
if state.Status != api.HealthCritical { if state.Status != api.HealthCritical {
t.Fatalf("bad: %v", state) t.Fatalf("bad: %v", state)
} }
@ -1117,7 +1117,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
t.Fatalf("expected 200, got %d", resp.Code) t.Fatalf("expected 200, got %d", resp.Code)
} }
state := a.state.Checks()["test"] state := a.State.Checks()["test"]
if state.Status != c.Status || state.Output != c.Output { if state.Status != c.Status || state.Output != c.Output {
t.Fatalf("bad: %v", state) t.Fatalf("bad: %v", state)
} }
@ -1145,7 +1145,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
// Since we append some notes about truncating, we just do a // Since we append some notes about truncating, we just do a
// rough check that the output buffer was cut down so this test // rough check that the output buffer was cut down so this test
// isn't super brittle. // isn't super brittle.
state := a.state.Checks()["test"] state := a.State.Checks()["test"]
if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize { if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize {
t.Fatalf("bad: %v", state) t.Fatalf("bad: %v", state)
} }
@ -1228,12 +1228,12 @@ func TestAgent_RegisterService(t *testing.T) {
} }
// Ensure the servie // Ensure the servie
if _, ok := a.state.Services()["test"]; !ok { if _, ok := a.State.Services()["test"]; !ok {
t.Fatalf("missing test service") t.Fatalf("missing test service")
} }
// Ensure we have a check mapping // Ensure we have a check mapping
checks := a.state.Checks() checks := a.State.Checks()
if len(checks) != 3 { if len(checks) != 3 {
t.Fatalf("bad: %v", checks) t.Fatalf("bad: %v", checks)
} }
@ -1243,7 +1243,7 @@ func TestAgent_RegisterService(t *testing.T) {
} }
// Ensure the token was configured // Ensure the token was configured
if token := a.state.ServiceToken("test"); token == "" { if token := a.State.ServiceToken("test"); token == "" {
t.Fatalf("missing token") t.Fatalf("missing token")
} }
} }
@ -1271,7 +1271,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
EnableTagOverride: true, EnableTagOverride: true,
} }
if got, want := a.state.Services()["test"], svc; !verify.Values(t, "", got, want) { if got, want := a.State.Service("test"), svc; !verify.Values(t, "", got, want) {
t.Fail() t.Fail()
} }
} }
@ -1364,11 +1364,11 @@ func TestAgent_DeregisterService(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
if _, ok := a.state.Services()["test"]; ok { if _, ok := a.State.Services()["test"]; ok {
t.Fatalf("have test service") t.Fatalf("have test service")
} }
if _, ok := a.state.Checks()["test"]; ok { if _, ok := a.State.Checks()["test"]; ok {
t.Fatalf("have test check") t.Fatalf("have test check")
} }
} }
@ -1466,13 +1466,13 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
// Ensure the maintenance check was registered // Ensure the maintenance check was registered
checkID := serviceMaintCheckID("test") checkID := serviceMaintCheckID("test")
check, ok := a.state.Checks()[checkID] check, ok := a.State.Checks()[checkID]
if !ok { if !ok {
t.Fatalf("should have registered maintenance check") t.Fatalf("should have registered maintenance check")
} }
// Ensure the token was added // Ensure the token was added
if token := a.state.CheckToken(checkID); token != "mytoken" { if token := a.State.CheckToken(checkID); token != "mytoken" {
t.Fatalf("expected 'mytoken', got '%s'", token) t.Fatalf("expected 'mytoken', got '%s'", token)
} }
@ -1513,7 +1513,7 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
// Ensure the maintenance check was removed // Ensure the maintenance check was removed
checkID := serviceMaintCheckID("test") checkID := serviceMaintCheckID("test")
if _, ok := a.state.Checks()[checkID]; ok { if _, ok := a.State.Checks()[checkID]; ok {
t.Fatalf("should have removed maintenance check") t.Fatalf("should have removed maintenance check")
} }
} }
@ -1579,13 +1579,13 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) {
} }
// Ensure the maintenance check was registered // Ensure the maintenance check was registered
check, ok := a.state.Checks()[structs.NodeMaint] check, ok := a.State.Checks()[structs.NodeMaint]
if !ok { if !ok {
t.Fatalf("should have registered maintenance check") t.Fatalf("should have registered maintenance check")
} }
// Check that the token was used // Check that the token was used
if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" { if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" {
t.Fatalf("expected 'mytoken', got '%s'", token) t.Fatalf("expected 'mytoken', got '%s'", token)
} }
@ -1614,7 +1614,7 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) {
} }
// Ensure the maintenance check was removed // Ensure the maintenance check was removed
if _, ok := a.state.Checks()[structs.NodeMaint]; ok { if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
t.Fatalf("should have removed maintenance check") t.Fatalf("should have removed maintenance check")
} }
} }
@ -1670,7 +1670,7 @@ func TestAgent_RegisterCheck_Service(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
result := a.state.Checks() result := a.State.Checks()
if _, ok := result["service:memcache"]; !ok { if _, ok := result["service:memcache"]; !ok {
t.Fatalf("missing memcached check") t.Fatalf("missing memcached check")
} }

View File

@ -363,14 +363,14 @@ func TestAgent_AddService(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
got, want := a.state.Services()[tt.srv.ID], tt.srv got, want := a.State.Services()[tt.srv.ID], tt.srv
verify.Values(t, "", got, want) verify.Values(t, "", got, want)
}) })
// check the health checks // check the health checks
for k, v := range tt.healthChks { for k, v := range tt.healthChks {
t.Run(k, func(t *testing.T) { t.Run(k, func(t *testing.T) {
got, want := a.state.Checks()[types.CheckID(k)], v got, want := a.State.Checks()[types.CheckID(k)], v
verify.Values(t, k, got, want) verify.Values(t, k, got, want)
}) })
} }
@ -437,10 +437,10 @@ func TestAgent_RemoveService(t *testing.T) {
if err := a.RemoveService("memcache", false); err != nil { if err := a.RemoveService("memcache", false); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if _, ok := a.state.Checks()["service:memcache"]; ok { if _, ok := a.State.Checks()["service:memcache"]; ok {
t.Fatalf("have memcache check") t.Fatalf("have memcache check")
} }
if _, ok := a.state.Checks()["check2"]; ok { if _, ok := a.State.Checks()["check2"]; ok {
t.Fatalf("have check2 check") t.Fatalf("have check2 check")
} }
} }
@ -466,15 +466,15 @@ func TestAgent_RemoveService(t *testing.T) {
} }
// Ensure we have a state mapping // Ensure we have a state mapping
if _, ok := a.state.Services()["redis"]; ok { if _, ok := a.State.Services()["redis"]; ok {
t.Fatalf("have redis service") t.Fatalf("have redis service")
} }
// Ensure checks were removed // Ensure checks were removed
if _, ok := a.state.Checks()["service:redis:1"]; ok { if _, ok := a.State.Checks()["service:redis:1"]; ok {
t.Fatalf("check redis:1 should be removed") t.Fatalf("check redis:1 should be removed")
} }
if _, ok := a.state.Checks()["service:redis:2"]; ok { if _, ok := a.State.Checks()["service:redis:2"]; ok {
t.Fatalf("check redis:2 should be removed") t.Fatalf("check redis:2 should be removed")
} }
@ -507,7 +507,7 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
} }
// verify chk1 exists // verify chk1 exists
if a.state.Checks()["chk1"] == nil { if a.State.Checks()["chk1"] == nil {
t.Fatal("Could not find health check chk1") t.Fatal("Could not find health check chk1")
} }
@ -517,10 +517,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
} }
// check that both checks are there // check that both checks are there
if got, want := a.state.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { if got, want := a.State.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) {
t.FailNow() t.FailNow()
} }
if got, want := a.state.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { if got, want := a.State.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) {
t.FailNow() t.FailNow()
} }
@ -530,10 +530,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
} }
// Check that both checks are gone // Check that both checks are gone
if a.state.Checks()["chk1"] != nil { if a.State.Checks()["chk1"] != nil {
t.Fatal("Found health check chk1 want nil") t.Fatal("Found health check chk1 want nil")
} }
if a.state.Checks()["chk2"] != nil { if a.State.Checks()["chk2"] != nil {
t.Fatal("Found health check chk2 want nil") t.Fatal("Found health check chk2 want nil")
} }
} }
@ -561,7 +561,7 @@ func TestAgent_AddCheck(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
sChk, ok := a.state.Checks()["mem"] sChk, ok := a.State.Checks()["mem"]
if !ok { if !ok {
t.Fatalf("missing mem check") t.Fatalf("missing mem check")
} }
@ -600,7 +600,7 @@ func TestAgent_AddCheck_StartPassing(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
sChk, ok := a.state.Checks()["mem"] sChk, ok := a.State.Checks()["mem"]
if !ok { if !ok {
t.Fatalf("missing mem check") t.Fatalf("missing mem check")
} }
@ -639,7 +639,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
if _, ok := a.state.Checks()["mem"]; !ok { if _, ok := a.State.Checks()["mem"]; !ok {
t.Fatalf("missing mem check") t.Fatalf("missing mem check")
} }
@ -704,7 +704,7 @@ func TestAgent_AddCheck_RestoreState(t *testing.T) {
} }
// Ensure the check status was restored during registration // Ensure the check status was restored during registration
checks := a.state.Checks() checks := a.State.Checks()
check, ok := checks["baz"] check, ok := checks["baz"]
if !ok { if !ok {
t.Fatalf("missing check") t.Fatalf("missing check")
@ -739,7 +739,7 @@ func TestAgent_AddCheck_ExecDisable(t *testing.T) {
} }
// Ensure we don't have a check mapping // Ensure we don't have a check mapping
if memChk := a.state.Checks()["mem"]; memChk != nil { if memChk := a.State.Checks()["mem"]; memChk != nil {
t.Fatalf("should be missing mem check") t.Fatalf("should be missing mem check")
} }
} }
@ -782,7 +782,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
} }
// Ensure we have a check mapping // Ensure we have a check mapping
if _, ok := a.state.Checks()["mem"]; ok { if _, ok := a.State.Checks()["mem"]; ok {
t.Fatalf("have mem check") t.Fatalf("have mem check")
} }
@ -817,7 +817,7 @@ func TestAgent_updateTTLCheck(t *testing.T) {
} }
// Ensure we have a check mapping. // Ensure we have a check mapping.
status := a.state.Checks()["mem"] status := a.State.Checks()["mem"]
if status.Status != api.HealthPassing { if status.Status != api.HealthPassing {
t.Fatalf("bad: %v", status) t.Fatalf("bad: %v", status)
} }
@ -904,15 +904,15 @@ func TestAgent_PersistService(t *testing.T) {
a2.Start() a2.Start()
defer a2.Shutdown() defer a2.Shutdown()
restored, ok := a2.state.services[svc.ID] restored := a2.State.ServiceState(svc.ID)
if !ok { if restored == nil {
t.Fatalf("bad: %#v", a2.state.services) t.Fatalf("service %q missing", svc.ID)
} }
if a2.state.serviceTokens[svc.ID] != "mytoken" { if got, want := restored.Token, "mytoken"; got != want {
t.Fatalf("bad: %#v", a2.state.services[svc.ID]) t.Fatalf("got token %q want %q", got, want)
} }
if restored.Port != 8001 { if got, want := restored.Service.Port, 8001; got != want {
t.Fatalf("bad: %#v", restored) t.Fatalf("got port %d want %d", got, want)
} }
} }
@ -951,7 +951,7 @@ func TestAgent_persistedService_compat(t *testing.T) {
} }
// Ensure the service was restored // Ensure the service was restored
services := a.state.Services() services := a.State.Services()
result, ok := services["redis"] result, ok := services["redis"]
if !ok { if !ok {
t.Fatalf("missing service") t.Fatalf("missing service")
@ -1043,8 +1043,8 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
if _, err := os.Stat(file); err == nil { if _, err := os.Stat(file); err == nil {
t.Fatalf("should have removed persisted service") t.Fatalf("should have removed persisted service")
} }
result, ok := a2.state.services["redis"] result := a2.State.Service("redis")
if !ok { if result == nil {
t.Fatalf("missing service registration") t.Fatalf("missing service registration")
} }
if !reflect.DeepEqual(result.Tags, []string{"bar"}) || result.Port != 9000 { if !reflect.DeepEqual(result.Tags, []string{"bar"}) || result.Port != 9000 {
@ -1137,9 +1137,9 @@ func TestAgent_PersistCheck(t *testing.T) {
a2.Start() a2.Start()
defer a2.Shutdown() defer a2.Shutdown()
result, ok := a2.state.checks[check.CheckID] result := a2.State.Check(check.CheckID)
if !ok { if result == nil {
t.Fatalf("bad: %#v", a2.state.checks) t.Fatalf("bad: %#v", a2.State.Checks())
} }
if result.Status != api.HealthCritical { if result.Status != api.HealthCritical {
t.Fatalf("bad: %#v", result) t.Fatalf("bad: %#v", result)
@ -1152,8 +1152,8 @@ func TestAgent_PersistCheck(t *testing.T) {
if _, ok := a2.checkMonitors[check.CheckID]; !ok { if _, ok := a2.checkMonitors[check.CheckID]; !ok {
t.Fatalf("bad: %#v", a2.checkMonitors) t.Fatalf("bad: %#v", a2.checkMonitors)
} }
if a2.state.checkTokens[check.CheckID] != "mytoken" { if a2.State.CheckState(check.CheckID).Token != "mytoken" {
t.Fatalf("bad: %s", a2.state.checkTokens[check.CheckID]) t.Fatalf("bad: %s", a2.State.CheckState(check.CheckID).Token)
} }
} }
@ -1241,8 +1241,8 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
if _, err := os.Stat(file); err == nil { if _, err := os.Stat(file); err == nil {
t.Fatalf("should have removed persisted check") t.Fatalf("should have removed persisted check")
} }
result, ok := a2.state.checks["mem"] result := a2.State.Check("mem")
if !ok { if result == nil {
t.Fatalf("missing check registration") t.Fatalf("missing check registration")
} }
expected := &structs.HealthCheck{ expected := &structs.HealthCheck{
@ -1269,11 +1269,11 @@ func TestAgent_loadChecks_token(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
checks := a.state.Checks() checks := a.State.Checks()
if _, ok := checks["rabbitmq"]; !ok { if _, ok := checks["rabbitmq"]; !ok {
t.Fatalf("missing check") t.Fatalf("missing check")
} }
if token := a.state.CheckToken("rabbitmq"); token != "abc123" { if token := a.State.CheckToken("rabbitmq"); token != "abc123" {
t.Fatalf("bad: %s", token) t.Fatalf("bad: %s", token)
} }
} }
@ -1307,7 +1307,7 @@ func TestAgent_unloadChecks(t *testing.T) {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
found := false found := false
for check := range a.state.Checks() { for check := range a.State.Checks() {
if check == check1.CheckID { if check == check1.CheckID {
found = true found = true
break break
@ -1323,7 +1323,7 @@ func TestAgent_unloadChecks(t *testing.T) {
} }
// Make sure it was unloaded // Make sure it was unloaded
for check := range a.state.Checks() { for check := range a.State.Checks() {
if check == check1.CheckID { if check == check1.CheckID {
t.Fatalf("should have unloaded checks") t.Fatalf("should have unloaded checks")
} }
@ -1342,11 +1342,11 @@ func TestAgent_loadServices_token(t *testing.T) {
`) `)
defer a.Shutdown() defer a.Shutdown()
services := a.state.Services() services := a.State.Services()
if _, ok := services["rabbitmq"]; !ok { if _, ok := services["rabbitmq"]; !ok {
t.Fatalf("missing service") t.Fatalf("missing service")
} }
if token := a.state.ServiceToken("rabbitmq"); token != "abc123" { if token := a.State.ServiceToken("rabbitmq"); token != "abc123" {
t.Fatalf("bad: %s", token) t.Fatalf("bad: %s", token)
} }
} }
@ -1368,7 +1368,7 @@ func TestAgent_unloadServices(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
found := false found := false
for id := range a.state.Services() { for id := range a.State.Services() {
if id == svc.ID { if id == svc.ID {
found = true found = true
break break
@ -1382,7 +1382,7 @@ func TestAgent_unloadServices(t *testing.T) {
if err := a.unloadServices(); err != nil { if err := a.unloadServices(); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if len(a.state.Services()) != 0 { if len(a.State.Services()) != 0 {
t.Fatalf("should have unloaded services") t.Fatalf("should have unloaded services")
} }
} }
@ -1411,13 +1411,13 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
// Make sure the critical health check was added // Make sure the critical health check was added
checkID := serviceMaintCheckID("redis") checkID := serviceMaintCheckID("redis")
check, ok := a.state.Checks()[checkID] check, ok := a.State.Checks()[checkID]
if !ok { if !ok {
t.Fatalf("should have registered critical maintenance check") t.Fatalf("should have registered critical maintenance check")
} }
// Check that the token was used to register the check // Check that the token was used to register the check
if token := a.state.CheckToken(checkID); token != "mytoken" { if token := a.State.CheckToken(checkID); token != "mytoken" {
t.Fatalf("expected 'mytoken', got: '%s'", token) t.Fatalf("expected 'mytoken', got: '%s'", token)
} }
@ -1432,7 +1432,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
} }
// Ensure the check was deregistered // Ensure the check was deregistered
if _, ok := a.state.Checks()[checkID]; ok { if _, ok := a.State.Checks()[checkID]; ok {
t.Fatalf("should have deregistered maintenance check") t.Fatalf("should have deregistered maintenance check")
} }
@ -1442,7 +1442,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
} }
// Ensure the check was registered with the default notes // Ensure the check was registered with the default notes
check, ok = a.state.Checks()[checkID] check, ok = a.State.Checks()[checkID]
if !ok { if !ok {
t.Fatalf("should have registered critical check") t.Fatalf("should have registered critical check")
} }
@ -1479,19 +1479,19 @@ func TestAgent_Service_Reap(t *testing.T) {
} }
// Make sure it's there and there's no critical check yet. // Make sure it's there and there's no critical check yet.
if _, ok := a.state.Services()["redis"]; !ok { if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service") t.Fatalf("should have redis service")
} }
if checks := a.state.CriticalChecks(); len(checks) > 0 { if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks") t.Fatalf("should not have critical checks")
} }
// Wait for the check TTL to fail but before the check is reaped. // Wait for the check TTL to fail but before the check is reaped.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok { if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service") t.Fatalf("should have redis service")
} }
if checks := a.state.CriticalChecks(); len(checks) != 1 { if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check") t.Fatalf("should have a critical check")
} }
@ -1499,28 +1499,28 @@ func TestAgent_Service_Reap(t *testing.T) {
if err := a.updateTTLCheck("service:redis", api.HealthPassing, "foo"); err != nil { if err := a.updateTTLCheck("service:redis", api.HealthPassing, "foo"); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if _, ok := a.state.Services()["redis"]; !ok { if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service") t.Fatalf("should have redis service")
} }
if checks := a.state.CriticalChecks(); len(checks) > 0 { if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks") t.Fatalf("should not have critical checks")
} }
// Wait for the check TTL to fail again. // Wait for the check TTL to fail again.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok { if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service") t.Fatalf("should have redis service")
} }
if checks := a.state.CriticalChecks(); len(checks) != 1 { if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check") t.Fatalf("should have a critical check")
} }
// Wait for the reap. // Wait for the reap.
time.Sleep(400 * time.Millisecond) time.Sleep(400 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; ok { if _, ok := a.State.Services()["redis"]; ok {
t.Fatalf("redis service should have been reaped") t.Fatalf("redis service should have been reaped")
} }
if checks := a.state.CriticalChecks(); len(checks) > 0 { if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks") t.Fatalf("should not have critical checks")
} }
} }
@ -1552,28 +1552,28 @@ func TestAgent_Service_NoReap(t *testing.T) {
} }
// Make sure it's there and there's no critical check yet. // Make sure it's there and there's no critical check yet.
if _, ok := a.state.Services()["redis"]; !ok { if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service") t.Fatalf("should have redis service")
} }
if checks := a.state.CriticalChecks(); len(checks) > 0 { if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks") t.Fatalf("should not have critical checks")
} }
// Wait for the check TTL to fail. // Wait for the check TTL to fail.
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok { if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service") t.Fatalf("should have redis service")
} }
if checks := a.state.CriticalChecks(); len(checks) != 1 { if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check") t.Fatalf("should have a critical check")
} }
// Wait a while and make sure it doesn't reap. // Wait a while and make sure it doesn't reap.
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok { if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service") t.Fatalf("should have redis service")
} }
if checks := a.state.CriticalChecks(); len(checks) != 1 { if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check") t.Fatalf("should have a critical check")
} }
} }
@ -1612,7 +1612,7 @@ func TestAgent_addCheck_restoresSnapshot(t *testing.T) {
if err := a.AddService(svc, chkTypes, false, ""); err != nil { if err := a.AddService(svc, chkTypes, false, ""); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
check, ok := a.state.Checks()["service:redis"] check, ok := a.State.Checks()["service:redis"]
if !ok { if !ok {
t.Fatalf("missing check") t.Fatalf("missing check")
} }
@ -1630,13 +1630,13 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
a.EnableNodeMaintenance("broken", "mytoken") a.EnableNodeMaintenance("broken", "mytoken")
// Make sure the critical health check was added // Make sure the critical health check was added
check, ok := a.state.Checks()[structs.NodeMaint] check, ok := a.State.Checks()[structs.NodeMaint]
if !ok { if !ok {
t.Fatalf("should have registered critical node check") t.Fatalf("should have registered critical node check")
} }
// Check that the token was used to register the check // Check that the token was used to register the check
if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" { if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" {
t.Fatalf("expected 'mytoken', got: '%s'", token) t.Fatalf("expected 'mytoken', got: '%s'", token)
} }
@ -1649,7 +1649,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
a.DisableNodeMaintenance() a.DisableNodeMaintenance()
// Ensure the check was deregistered // Ensure the check was deregistered
if _, ok := a.state.Checks()[structs.NodeMaint]; ok { if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
t.Fatalf("should have deregistered critical node check") t.Fatalf("should have deregistered critical node check")
} }
@ -1657,7 +1657,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
a.EnableNodeMaintenance("", "") a.EnableNodeMaintenance("", "")
// Make sure the check was registered with the default note // Make sure the check was registered with the default note
check, ok = a.state.Checks()[structs.NodeMaint] check, ok = a.State.Checks()[structs.NodeMaint]
if !ok { if !ok {
t.Fatalf("should have registered critical node check") t.Fatalf("should have registered critical node check")
} }
@ -1712,7 +1712,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) {
a.restoreCheckState(snap) a.restoreCheckState(snap)
// Search for the check // Search for the check
out, ok := a.state.Checks()[check1.CheckID] out, ok := a.State.Checks()[check1.CheckID]
if !ok { if !ok {
t.Fatalf("check should have been registered") t.Fatalf("check should have been registered")
} }

View File

@ -12,45 +12,6 @@ import (
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
) )
func TestCatalogRegister(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
// Register node
args := &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.1",
}
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
obj, err := a.srv.CatalogRegister(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
res := obj.(bool)
if res != true {
t.Fatalf("bad: %v", res)
}
// data race
func() {
a.state.Lock()
defer a.state.Unlock()
// Service should be in sync
if err := a.state.syncService("foo"); err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := a.state.serviceStatus["foo"]; !ok {
t.Fatalf("bad: %#v", a.state.serviceStatus)
}
if !a.state.serviceStatus["foo"].inSync {
t.Fatalf("should be in sync")
}
}()
}
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
t.Parallel() t.Parallel()
a := NewTestAgent(t.Name(), "") a := NewTestAgent(t.Name(), "")

View File

@ -1,822 +0,0 @@
package agent
import (
"fmt"
"log"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
)
const (
syncStaggerIntv = 3 * time.Second
syncRetryIntv = 15 * time.Second
)
// syncStatus is used to represent the difference between
// the local and remote state, and if action needs to be taken
type syncStatus struct {
inSync bool // Is this in sync with the server
}
// localStateConfig is the configuration for the localState. It is
// populated during NewLocalAgent from the agent configuration to avoid
// race conditions with the agent configuration.
type localStateConfig struct {
AEInterval time.Duration
AdvertiseAddr string
CheckUpdateInterval time.Duration
Datacenter string
NodeID types.NodeID
NodeName string
TaggedAddresses map[string]string
Tokens *token.Store
}
// localState is used to represent the node's services,
// and checks. We used it to perform anti-entropy with the
// catalog representation
type localState struct {
// paused is used to check if we are paused. Must be the first
// element due to a go bug.
paused int32
sync.RWMutex
logger *log.Logger
// Config is the agent config
config localStateConfig
// delegate is the consul interface to use for keeping in sync
delegate delegate
// nodeInfoInSync tracks whether the server has our correct top-level
// node information in sync
nodeInfoInSync bool
// Services tracks the local services
services map[string]*structs.NodeService
serviceStatus map[string]syncStatus
serviceTokens map[string]string
// Checks tracks the local checks
checks map[types.CheckID]*structs.HealthCheck
checkStatus map[types.CheckID]syncStatus
checkTokens map[types.CheckID]string
checkCriticalTime map[types.CheckID]time.Time
// Used to track checks that are being deferred
deferCheck map[types.CheckID]*time.Timer
// metadata tracks the local metadata fields
metadata map[string]string
// consulCh is used to inform of a change to the known
// consul nodes. This may be used to retry a sync run
consulCh chan struct{}
// triggerCh is used to inform of a change to local state
// that requires anti-entropy with the server
triggerCh chan struct{}
// discardCheckOutput stores whether the output of health checks
// is stored in the raft log.
discardCheckOutput atomic.Value // bool
}
// NewLocalState creates a is used to initialize the local state
func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store) *localState {
lc := localStateConfig{
AEInterval: c.AEInterval,
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
CheckUpdateInterval: c.CheckUpdateInterval,
Datacenter: c.Datacenter,
NodeID: c.NodeID,
NodeName: c.NodeName,
TaggedAddresses: map[string]string{},
Tokens: tokens,
}
for k, v := range c.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
l := &localState{
config: lc,
logger: lg,
services: make(map[string]*structs.NodeService),
serviceStatus: make(map[string]syncStatus),
serviceTokens: make(map[string]string),
checks: make(map[types.CheckID]*structs.HealthCheck),
checkStatus: make(map[types.CheckID]syncStatus),
checkTokens: make(map[types.CheckID]string),
checkCriticalTime: make(map[types.CheckID]time.Time),
deferCheck: make(map[types.CheckID]*time.Timer),
metadata: make(map[string]string),
consulCh: make(chan struct{}, 1),
triggerCh: make(chan struct{}, 1),
}
l.discardCheckOutput.Store(c.DiscardCheckOutput)
return l
}
// changeMade is used to trigger an anti-entropy run
func (l *localState) changeMade() {
select {
case l.triggerCh <- struct{}{}:
default:
}
}
// ConsulServerUp is used to inform that a new consul server is now
// up. This can be used to speed up the sync process if we are blocking
// waiting to discover a consul server
func (l *localState) ConsulServerUp() {
select {
case l.consulCh <- struct{}{}:
default:
}
}
// Pause is used to pause state synchronization, this can be
// used to make batch changes
func (l *localState) Pause() {
atomic.AddInt32(&l.paused, 1)
}
// Resume is used to resume state synchronization
func (l *localState) Resume() {
paused := atomic.AddInt32(&l.paused, -1)
if paused < 0 {
panic("unbalanced localState.Resume() detected")
}
l.changeMade()
}
// isPaused is used to check if we are paused
func (l *localState) isPaused() bool {
return atomic.LoadInt32(&l.paused) > 0
}
func (l *localState) SetDiscardCheckOutput(b bool) {
l.discardCheckOutput.Store(b)
}
// ServiceToken returns the configured ACL token for the given
// service ID. If none is present, the agent's token is returned.
func (l *localState) ServiceToken(id string) string {
l.RLock()
defer l.RUnlock()
return l.serviceToken(id)
}
// serviceToken returns an ACL token associated with a service.
func (l *localState) serviceToken(id string) string {
token := l.serviceTokens[id]
if token == "" {
token = l.config.Tokens.UserToken()
}
return token
}
// AddService is used to add a service entry to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *localState) AddService(service *structs.NodeService, token string) {
// Assign the ID if none given
if service.ID == "" && service.Service != "" {
service.ID = service.Service
}
l.Lock()
defer l.Unlock()
l.services[service.ID] = service
l.serviceStatus[service.ID] = syncStatus{}
l.serviceTokens[service.ID] = token
l.changeMade()
}
// RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered
func (l *localState) RemoveService(serviceID string) error {
l.Lock()
defer l.Unlock()
if _, ok := l.services[serviceID]; ok {
delete(l.services, serviceID)
// Leave the service token around, if any, until we successfully
// delete the service.
l.serviceStatus[serviceID] = syncStatus{inSync: false}
l.changeMade()
} else {
return fmt.Errorf("Service does not exist")
}
return nil
}
// Services returns the locally registered services that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Services() map[string]*structs.NodeService {
services := make(map[string]*structs.NodeService)
l.RLock()
defer l.RUnlock()
for name, serv := range l.services {
services[name] = serv
}
return services
}
// CheckToken is used to return the configured health check token for a
// Check, or if none is configured, the default agent ACL token.
func (l *localState) CheckToken(checkID types.CheckID) string {
l.RLock()
defer l.RUnlock()
return l.checkToken(checkID)
}
// checkToken returns an ACL token associated with a check.
func (l *localState) checkToken(checkID types.CheckID) string {
token := l.checkTokens[checkID]
if token == "" {
token = l.config.Tokens.UserToken()
}
return token
}
// AddCheck is used to add a health check to the local state.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *localState) AddCheck(check *structs.HealthCheck, token string) error {
l.Lock()
defer l.Unlock()
// Set the node name
check.Node = l.config.NodeName
if l.discardCheckOutput.Load().(bool) {
check.Output = ""
}
// if there is a serviceID associated with the check, make sure it exists before adding it
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
if check.ServiceID != "" && l.services[check.ServiceID] == nil {
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
}
l.checks[check.CheckID] = check
l.checkStatus[check.CheckID] = syncStatus{}
l.checkTokens[check.CheckID] = token
delete(l.checkCriticalTime, check.CheckID)
l.changeMade()
return nil
}
// RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered
func (l *localState) RemoveCheck(checkID types.CheckID) {
l.Lock()
defer l.Unlock()
delete(l.checks, checkID)
// Leave the check token around, if any, until we successfully delete
// the check.
delete(l.checkCriticalTime, checkID)
l.checkStatus[checkID] = syncStatus{inSync: false}
l.changeMade()
}
// UpdateCheck is used to update the status of a check
func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) {
l.Lock()
defer l.Unlock()
check, ok := l.checks[checkID]
if !ok {
return
}
if l.discardCheckOutput.Load().(bool) {
output = ""
}
// Update the critical time tracking (this doesn't cause a server updates
// so we can always keep this up to date).
if status == api.HealthCritical {
_, wasCritical := l.checkCriticalTime[checkID]
if !wasCritical {
l.checkCriticalTime[checkID] = time.Now()
}
} else {
delete(l.checkCriticalTime, checkID)
}
// Do nothing if update is idempotent
if check.Status == status && check.Output == output {
return
}
// Defer a sync if the output has changed. This is an optimization around
// frequent updates of output. Instead, we update the output internally,
// and periodically do a write-back to the servers. If there is a status
// change we do the write immediately.
if l.config.CheckUpdateInterval > 0 && check.Status == status {
check.Output = output
if _, ok := l.deferCheck[checkID]; !ok {
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval)
deferSync := time.AfterFunc(intv, func() {
l.Lock()
if _, ok := l.checkStatus[checkID]; ok {
l.checkStatus[checkID] = syncStatus{inSync: false}
l.changeMade()
}
delete(l.deferCheck, checkID)
l.Unlock()
})
l.deferCheck[checkID] = deferSync
}
return
}
// Update status and mark out of sync
check.Status = status
check.Output = output
l.checkStatus[checkID] = syncStatus{inSync: false}
l.changeMade()
}
// Checks returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck {
l.RLock()
defer l.RUnlock()
checks := make(map[types.CheckID]*structs.HealthCheck)
for id, c := range l.checks {
c2 := new(structs.HealthCheck)
*c2 = *c
checks[id] = c2
}
return checks
}
// CriticalCheck is used to return the duration a check has been critical along
// with its associated health check.
type CriticalCheck struct {
CriticalFor time.Duration
Check *structs.HealthCheck
}
// CriticalChecks returns locally registered health checks that the agent is
// aware of and are being kept in sync with the server, and that are in a
// critical state. This also returns information about how long each check has
// been critical.
func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
checks := make(map[types.CheckID]CriticalCheck)
l.RLock()
defer l.RUnlock()
now := time.Now()
for checkID, criticalTime := range l.checkCriticalTime {
checks[checkID] = CriticalCheck{
CriticalFor: now.Sub(criticalTime),
Check: l.checks[checkID],
}
}
return checks
}
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Metadata() map[string]string {
metadata := make(map[string]string)
l.RLock()
defer l.RUnlock()
for key, value := range l.metadata {
metadata[key] = value
}
return metadata
}
// antiEntropy is a long running method used to perform anti-entropy
// between local and remote state.
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
SYNC:
// Sync our state with the servers
for {
err := l.setSyncState()
if err == nil {
break
}
l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
select {
case <-l.consulCh:
// Stagger the retry on leader election, avoid a thundering heard
select {
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.delegate.LANMembers())))):
case <-shutdownCh:
return
}
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.delegate.LANMembers())))):
case <-shutdownCh:
return
}
}
// Force-trigger AE to pickup any changes
l.changeMade()
// Schedule the next full sync, with a random stagger
aeIntv := aeScale(l.config.AEInterval, len(l.delegate.LANMembers()))
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
aeTimer := time.After(aeIntv)
// Wait for sync events
for {
select {
case <-aeTimer:
goto SYNC
case <-l.triggerCh:
// Skip the sync if we are paused
if l.isPaused() {
continue
}
if err := l.syncChanges(); err != nil {
l.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
case <-shutdownCh:
return
}
}
}
// setSyncState does a read of the server state, and updates
// the local syncStatus as appropriate
func (l *localState) setSyncState() error {
req := structs.NodeSpecificRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
QueryOptions: structs.QueryOptions{Token: l.config.Tokens.AgentToken()},
}
var out1 structs.IndexedNodeServices
var out2 structs.IndexedHealthChecks
if e := l.delegate.RPC("Catalog.NodeServices", &req, &out1); e != nil {
return e
}
if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
return err
}
checks := out2.HealthChecks
l.Lock()
defer l.Unlock()
// Check the node info
if out1.NodeServices == nil || out1.NodeServices.Node == nil ||
out1.NodeServices.Node.ID != l.config.NodeID ||
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) ||
!reflect.DeepEqual(out1.NodeServices.Node.Meta, l.metadata) {
l.nodeInfoInSync = false
}
// Check all our services
services := make(map[string]*structs.NodeService)
if out1.NodeServices != nil {
services = out1.NodeServices.Services
}
for id := range l.services {
// If the local service doesn't exist remotely, then sync it
if _, ok := services[id]; !ok {
l.serviceStatus[id] = syncStatus{inSync: false}
}
}
for id, service := range services {
// If we don't have the service locally, deregister it
existing, ok := l.services[id]
if !ok {
// The consul service is created automatically, and does
// not need to be deregistered.
if id == structs.ConsulServiceID {
continue
}
l.serviceStatus[id] = syncStatus{inSync: false}
continue
}
// If our definition is different, we need to update it. Make a
// copy so that we don't retain a pointer to any actual state
// store info for in-memory RPCs.
if existing.EnableTagOverride {
existing.Tags = make([]string, len(service.Tags))
copy(existing.Tags, service.Tags)
}
equal := existing.IsSame(service)
l.serviceStatus[id] = syncStatus{inSync: equal}
}
// Index the remote health checks to improve efficiency
checkIndex := make(map[types.CheckID]*structs.HealthCheck, len(checks))
for _, check := range checks {
checkIndex[check.CheckID] = check
}
// Sync any check which doesn't exist on the remote side
for id := range l.checks {
if _, ok := checkIndex[id]; !ok {
l.checkStatus[id] = syncStatus{inSync: false}
}
}
for _, check := range checks {
// If we don't have the check locally, deregister it
id := check.CheckID
existing, ok := l.checks[id]
if !ok {
// The Serf check is created automatically, and does not
// need to be deregistered.
if id == structs.SerfCheckID {
continue
}
l.checkStatus[id] = syncStatus{inSync: false}
continue
}
// If our definition is different, we need to update it
var equal bool
if l.config.CheckUpdateInterval == 0 {
equal = existing.IsSame(check)
} else {
// Copy the existing check before potentially modifying
// it before the compare operation.
eCopy := existing.Clone()
// Copy the server's check before modifying, otherwise
// in-memory RPCs will have side effects.
cCopy := check.Clone()
// If there's a defer timer active then we've got a
// potentially spammy check so we don't sync the output
// during this sweep since the timer will mark the check
// out of sync for us. Otherwise, it is safe to sync the
// output now. This is especially important for checks
// that don't change state after they are created, in
// which case we'd never see their output synced back ever.
if _, ok := l.deferCheck[id]; ok {
eCopy.Output = ""
cCopy.Output = ""
}
equal = eCopy.IsSame(cCopy)
}
// Update the status
l.checkStatus[id] = syncStatus{inSync: equal}
}
return nil
}
// syncChanges is used to scan the status our local services and checks
// and update any that are out of sync with the server
func (l *localState) syncChanges() error {
l.Lock()
defer l.Unlock()
// We will do node-level info syncing at the end, since it will get
// updated by a service or check sync anyway, given how the register
// API works.
// Sync the services
for id, status := range l.serviceStatus {
if _, ok := l.services[id]; !ok {
if err := l.deleteService(id); err != nil {
return err
}
} else if !status.inSync {
if err := l.syncService(id); err != nil {
return err
}
} else {
l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id)
}
}
// Sync the checks
for id, status := range l.checkStatus {
if _, ok := l.checks[id]; !ok {
if err := l.deleteCheck(id); err != nil {
return err
}
} else if !status.inSync {
// Cancel a deferred sync
if timer := l.deferCheck[id]; timer != nil {
timer.Stop()
delete(l.deferCheck, id)
}
if err := l.syncCheck(id); err != nil {
return err
}
} else {
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
}
}
// Now sync the node level info if we need to, and didn't do any of
// the other sync operations.
if !l.nodeInfoInSync {
if err := l.syncNodeInfo(); err != nil {
return err
}
} else {
l.logger.Printf("[DEBUG] agent: Node info in sync")
}
return nil
}
// deleteService is used to delete a service from the server
func (l *localState) deleteService(id string) error {
if id == "" {
return fmt.Errorf("ServiceID missing")
}
req := structs.DeregisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
ServiceID: id,
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
}
var out struct{}
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
if err == nil || strings.Contains(err.Error(), "Unknown service") {
delete(l.serviceStatus, id)
delete(l.serviceTokens, id)
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
return nil
} else if acl.IsErrPermissionDenied(err) {
l.serviceStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
return nil
}
return err
}
// deleteCheck is used to delete a check from the server
func (l *localState) deleteCheck(id types.CheckID) error {
if id == "" {
return fmt.Errorf("CheckID missing")
}
req := structs.DeregisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
CheckID: id,
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
}
var out struct{}
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
if err == nil || strings.Contains(err.Error(), "Unknown check") {
delete(l.checkStatus, id)
delete(l.checkTokens, id)
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
return nil
} else if acl.IsErrPermissionDenied(err) {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
return nil
}
return err
}
// syncService is used to sync a service to the server
func (l *localState) syncService(id string) error {
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
ID: l.config.NodeID,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
Service: l.services[id],
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
}
// If the service has associated checks that are out of sync,
// piggyback them on the service sync so they are part of the
// same transaction and are registered atomically. We only let
// checks ride on service registrations with the same token,
// otherwise we need to register them separately so they don't
// pick up privileges from the service token.
var checks structs.HealthChecks
for _, check := range l.checks {
if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) {
if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync {
checks = append(checks, check)
}
}
}
// Backwards-compatibility for Consul < 0.5
if len(checks) == 1 {
req.Check = checks[0]
} else {
req.Checks = checks
}
var out struct{}
err := l.delegate.RPC("Catalog.Register", &req, &out)
if err == nil {
l.serviceStatus[id] = syncStatus{inSync: true}
// Given how the register API works, this info is also updated
// every time we sync a service.
l.nodeInfoInSync = true
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
}
} else if acl.IsErrPermissionDenied(err) {
l.serviceStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
}
return nil
}
return err
}
// syncCheck is used to sync a check to the server
func (l *localState) syncCheck(id types.CheckID) error {
// Pull in the associated service if any
check := l.checks[id]
var service *structs.NodeService
if check.ServiceID != "" {
if serv, ok := l.services[check.ServiceID]; ok {
service = serv
}
}
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
ID: l.config.NodeID,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
Service: service,
Check: l.checks[id],
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
}
var out struct{}
err := l.delegate.RPC("Catalog.Register", &req, &out)
if err == nil {
l.checkStatus[id] = syncStatus{inSync: true}
// Given how the register API works, this info is also updated
// every time we sync a check.
l.nodeInfoInSync = true
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
} else if acl.IsErrPermissionDenied(err) {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
return nil
}
return err
}
func (l *localState) syncNodeInfo() error {
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
ID: l.config.NodeID,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
WriteRequest: structs.WriteRequest{Token: l.config.Tokens.AgentToken()},
}
var out struct{}
err := l.delegate.RPC("Catalog.Register", &req, &out)
if err == nil {
l.nodeInfoInSync = true
l.logger.Printf("[INFO] agent: Synced node info")
} else if acl.IsErrPermissionDenied(err) {
l.nodeInfoInSync = true
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
return nil
}
return err
}

1034
agent/local/state.go Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -65,10 +65,6 @@ type TestAgent struct {
// Key is the optional encryption key for the LAN and WAN keyring. // Key is the optional encryption key for the LAN and WAN keyring.
Key string Key string
// NoInitialSync determines whether an anti-entropy run
// will be scheduled after the agent started.
NoInitialSync bool
// dns is a reference to the first started DNS endpoint. // dns is a reference to the first started DNS endpoint.
// It is valid after Start(). // It is valid after Start().
dns *DNSServer dns *DNSServer
@ -175,9 +171,9 @@ func (a *TestAgent) Start() *TestAgent {
} }
} }
} }
if !a.NoInitialSync {
a.Agent.StartSync() // Start the anti-entropy syncer
} a.Agent.StartSync()
var out structs.IndexedNodes var out structs.IndexedNodes
retry.Run(&panicFailer{}, func(r *retry.R) { retry.Run(&panicFailer{}, func(r *retry.R) {
@ -200,7 +196,7 @@ func (a *TestAgent) Start() *TestAgent {
r.Fatal(a.Name, "No leader") r.Fatal(a.Name, "No leader")
} }
if out.Index == 0 { if out.Index == 0 {
r.Fatal(a.Name, "Consul index is 0") r.Fatal(a.Name, ": Consul index is 0")
} }
} else { } else {
req, _ := http.NewRequest("GET", "/v1/agent/self", nil) req, _ := http.NewRequest("GET", "/v1/agent/self", nil)

View File

@ -173,7 +173,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
} }
// Scan for a match // Scan for a match
services := a.state.Services() services := a.State.Services()
found := false found := false
OUTER: OUTER:
for name, info := range services { for name, info := range services {

View File

@ -57,7 +57,7 @@ func TestShouldProcessUserEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "master"}, Tags: []string{"test", "foo", "bar", "master"},
Port: 5000, Port: 5000,
} }
a.state.AddService(srv1, "") a.State.AddService(srv1, "")
p := &UserEvent{} p := &UserEvent{}
if !a.shouldProcessUserEvent(p) { if !a.shouldProcessUserEvent(p) {
@ -157,7 +157,7 @@ func TestFireReceiveEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "master"}, Tags: []string{"test", "foo", "bar", "master"},
Port: 5000, Port: 5000,
} }
a.state.AddService(srv1, "") a.State.AddService(srv1, "")
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
err := a.UserEvent("dc1", "root", p1) err := a.UserEvent("dc1", "root", p1)

View File

@ -4,28 +4,16 @@ import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"math"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
osuser "os/user" osuser "os/user"
"strconv" "strconv"
"time"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
) )
const (
// This scale factor means we will add a minute after we cross 128 nodes,
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
// by a factor of 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
aeScaleThreshold = 128
)
// msgpackHandle is a shared handle for encoding/decoding of // msgpackHandle is a shared handle for encoding/decoding of
// messages // messages
var msgpackHandle = &codec.MsgpackHandle{ var msgpackHandle = &codec.MsgpackHandle{
@ -33,18 +21,6 @@ var msgpackHandle = &codec.MsgpackHandle{
WriteExt: true, WriteExt: true,
} }
// aeScale is used to scale the time interval at which anti-entropy updates take
// place. It is used to prevent saturation as the cluster size grows.
func aeScale(interval time.Duration, n int) time.Duration {
// Don't scale until we cross the threshold
if n <= aeScaleThreshold {
return interval
}
multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0
return time.Duration(multiplier) * interval
}
// decodeMsgPack is used to decode a MsgPack encoded object // decodeMsgPack is used to decode a MsgPack encoded object
func decodeMsgPack(buf []byte, out interface{}) error { func decodeMsgPack(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)

View File

@ -4,28 +4,10 @@ import (
"os" "os"
"runtime" "runtime"
"testing" "testing"
"time"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
) )
func TestAEScale(t *testing.T) {
t.Parallel()
intv := time.Minute
if v := aeScale(intv, 100); v != intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 200); v != 2*intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 1000); v != 4*intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 10000); v != 8*intv {
t.Fatalf("Bad: %v", v)
}
}
func TestStringHash(t *testing.T) { func TestStringHash(t *testing.T) {
t.Parallel() t.Parallel()
in := "hello world" in := "hello world"