local state: tests compile

pull/3609/head
Frank Schroeder 2017-08-28 14:17:13 +02:00
parent 0a9ac9749e
commit b803bf3091
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
11 changed files with 444 additions and 480 deletions

View File

@ -259,7 +259,7 @@ func (a *Agent) vetServiceRegister(token string, service *structs.NodeService) e
}
// Vet any service that might be getting overwritten.
services := a.state.Services()
services := a.State.Services()
if existing, ok := services[service.ID]; ok {
if !rule.ServiceWrite(existing.Service, nil) {
return acl.ErrPermissionDenied
@ -282,7 +282,7 @@ func (a *Agent) vetServiceUpdate(token string, serviceID string) error {
}
// Vet any changes based on the existing services's info.
services := a.state.Services()
services := a.State.Services()
if existing, ok := services[serviceID]; ok {
if !rule.ServiceWrite(existing.Service, nil) {
return acl.ErrPermissionDenied
@ -318,7 +318,7 @@ func (a *Agent) vetCheckRegister(token string, check *structs.HealthCheck) error
}
// Vet any check that might be getting overwritten.
checks := a.state.Checks()
checks := a.State.Checks()
if existing, ok := checks[check.CheckID]; ok {
if len(existing.ServiceName) > 0 {
if !rule.ServiceWrite(existing.ServiceName, nil) {
@ -346,7 +346,7 @@ func (a *Agent) vetCheckUpdate(token string, checkID types.CheckID) error {
}
// Vet any changes based on the existing check's info.
checks := a.state.Checks()
checks := a.State.Checks()
if existing, ok := checks[checkID]; ok {
if len(existing.ServiceName) > 0 {
if !rule.ServiceWrite(existing.ServiceName, nil) {

View File

@ -564,7 +564,7 @@ func TestACL_vetServiceRegister(t *testing.T) {
// Try to register over a service without write privs to the existing
// service.
a.state.AddService(&structs.NodeService{
a.State.AddService(&structs.NodeService{
ID: "my-service",
Service: "other",
}, "")
@ -596,7 +596,7 @@ func TestACL_vetServiceUpdate(t *testing.T) {
}
// Update with write privs.
a.state.AddService(&structs.NodeService{
a.State.AddService(&structs.NodeService{
ID: "my-service",
Service: "service",
}, "")
@ -662,11 +662,11 @@ func TestACL_vetCheckRegister(t *testing.T) {
// Try to register over a service check without write privs to the
// existing service.
a.state.AddService(&structs.NodeService{
a.State.AddService(&structs.NodeService{
ID: "my-service",
Service: "service",
}, "")
a.state.AddCheck(&structs.HealthCheck{
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-check"),
ServiceID: "my-service",
ServiceName: "other",
@ -681,7 +681,7 @@ func TestACL_vetCheckRegister(t *testing.T) {
}
// Try to register over a node check without write privs to the node.
a.state.AddCheck(&structs.HealthCheck{
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-node-check"),
}, "")
err = a.vetCheckRegister("service-rw", &structs.HealthCheck{
@ -713,11 +713,11 @@ func TestACL_vetCheckUpdate(t *testing.T) {
}
// Update service check with write privs.
a.state.AddService(&structs.NodeService{
a.State.AddService(&structs.NodeService{
ID: "my-service",
Service: "service",
}, "")
a.state.AddCheck(&structs.HealthCheck{
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-service-check"),
ServiceID: "my-service",
ServiceName: "service",
@ -734,7 +734,7 @@ func TestACL_vetCheckUpdate(t *testing.T) {
}
// Update node check with write privs.
a.state.AddCheck(&structs.HealthCheck{
a.State.AddCheck(&structs.HealthCheck{
CheckID: types.CheckID("my-node-check"),
}, "")
err = a.vetCheckUpdate("node-rw", "my-node-check")

View File

@ -109,7 +109,7 @@ type Agent struct {
// state stores a local representation of the node,
// services and checks. Used for anti-entropy.
state *local.State
State *local.State
// sync manages the synchronization of the local
// and the remote state.
@ -230,6 +230,22 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
return a, nil
}
func LocalConfig(cfg *config.RuntimeConfig) local.Config {
lc := local.Config{
AdvertiseAddr: cfg.AdvertiseAddrLAN.String(),
CheckUpdateInterval: cfg.CheckUpdateInterval,
Datacenter: cfg.Datacenter,
DiscardCheckOutput: cfg.DiscardCheckOutput,
NodeID: cfg.NodeID,
NodeName: cfg.NodeName,
TaggedAddresses: map[string]string{},
}
for k, v := range cfg.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
return lc
}
func (a *Agent) Start() error {
c := a.config
@ -256,24 +272,12 @@ func (a *Agent) Start() error {
triggerCh := make(chan struct{}, 1)
// create the local state
lc := local.Config{
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
CheckUpdateInterval: c.CheckUpdateInterval,
Datacenter: c.Datacenter,
DiscardCheckOutput: c.DiscardCheckOutput,
NodeID: c.NodeID,
NodeName: c.NodeName,
TaggedAddresses: map[string]string{},
}
for k, v := range c.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
a.state = local.NewState(lc, a.logger, a.tokens, triggerCh)
a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, triggerCh)
// create the state synchronization manager which performs
// regular and on-demand state synchronizations (anti-entropy).
a.sync = &ae.StateSyncer{
State: a.state,
State: a.State,
Interval: c.AEInterval,
ShutdownCh: a.shutdownCh,
ServerUpCh: serverUpCh,
@ -306,7 +310,7 @@ func (a *Agent) Start() error {
}
a.delegate = server
a.state.SetDelegate(server)
a.State.SetDelegate(server)
a.sync.ClusterSize = func() int { return len(server.LANMembers()) }
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger)
@ -315,7 +319,7 @@ func (a *Agent) Start() error {
}
a.delegate = client
a.state.SetDelegate(client)
a.State.SetDelegate(client)
a.sync.ClusterSize = func() int { return len(client.LANMembers()) }
}
@ -1387,7 +1391,7 @@ OUTER:
// reapServicesInternal does a single pass, looking for services to reap.
func (a *Agent) reapServicesInternal() {
reaped := make(map[string]bool)
for checkID, cs := range a.state.CriticalCheckStates() {
for checkID, cs := range a.State.CriticalCheckStates() {
serviceID := cs.Check.ServiceID
// There's nothing to do if there's no service.
@ -1445,7 +1449,7 @@ func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
wrapped := persistedService{
Token: a.state.ServiceToken(service.ID),
Token: a.State.ServiceToken(service.ID),
Service: service,
}
encoded, err := json.Marshal(wrapped)
@ -1473,7 +1477,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT
wrapped := persistedCheck{
Check: check,
ChkType: chkType,
Token: a.state.CheckToken(check.CheckID),
Token: a.State.CheckToken(check.CheckID),
}
encoded, err := json.Marshal(wrapped)
@ -1572,7 +1576,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
defer a.restoreCheckState(snap)
// Add the service
a.state.AddService(service, token)
a.State.AddService(service, token)
// Persist the service to a file
if persist && !a.config.DevMode {
@ -1622,7 +1626,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
}
// Remove service immediately
if err := a.state.RemoveService(serviceID); err != nil {
if err := a.State.RemoveService(serviceID); err != nil {
a.logger.Printf("[WARN] agent: Failed to deregister service %q: %s", serviceID, err)
return nil
}
@ -1635,7 +1639,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
}
// Deregister any associated health checks
for checkID, check := range a.state.Checks() {
for checkID, check := range a.State.Checks() {
if check.ServiceID != serviceID {
continue
}
@ -1668,7 +1672,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
if check.ServiceID != "" {
s := a.state.Services()[check.ServiceID]
s := a.State.Service(check.ServiceID)
if s == nil {
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
}
@ -1689,7 +1693,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
ttl := &CheckTTL{
Notify: a.state,
Notify: a.State,
CheckID: check.CheckID,
TTL: chkType.TTL,
Logger: a.logger,
@ -1716,7 +1720,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
http := &CheckHTTP{
Notify: a.state,
Notify: a.State,
CheckID: check.CheckID,
HTTP: chkType.HTTP,
Header: chkType.Header,
@ -1741,7 +1745,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
tcp := &CheckTCP{
Notify: a.state,
Notify: a.State,
CheckID: check.CheckID,
TCP: chkType.TCP,
Interval: chkType.Interval,
@ -1778,7 +1782,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
dockerCheck := &CheckDocker{
Notify: a.state,
Notify: a.State,
CheckID: check.CheckID,
DockerContainerID: chkType.DockerContainerID,
Shell: chkType.Shell,
@ -1808,7 +1812,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
monitor := &CheckMonitor{
Notify: a.state,
Notify: a.State,
CheckID: check.CheckID,
Script: chkType.Script,
ScriptArgs: chkType.ScriptArgs,
@ -1837,7 +1841,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
// Add to the local state for anti-entropy
err := a.state.AddCheck(check, token)
err := a.State.AddCheck(check, token)
if err != nil {
a.cancelCheckMonitors(check.CheckID)
return err
@ -1860,7 +1864,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
}
// Add to the local state for anti-entropy
a.state.RemoveCheck(checkID)
a.State.RemoveCheck(checkID)
a.checkLock.Lock()
defer a.checkLock.Unlock()
@ -2025,7 +2029,7 @@ func (a *Agent) Stats() map[string]map[string]string {
"check_monitors": strconv.Itoa(len(a.checkMonitors)),
"check_ttls": strconv.Itoa(len(a.checkTTLs)),
}
for k, v := range a.state.Stats() {
for k, v := range a.State.Stats() {
stats["agent"][k] = v
}
@ -2149,7 +2153,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
}
serviceID := p.Service.ID
if a.state.Service(serviceID) != nil {
if a.State.Service(serviceID) != nil {
// Purge previously persisted service. This allows config to be
// preferred over services persisted from the API.
a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q",
@ -2172,7 +2176,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// unloadServices will deregister all services other than the 'consul' service
// known to the local agent.
func (a *Agent) unloadServices() error {
for id := range a.state.Services() {
for id := range a.State.Services() {
if err := a.RemoveService(id, false); err != nil {
return fmt.Errorf("Failed deregistering service '%s': %v", id, err)
}
@ -2228,7 +2232,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
}
checkID := p.Check.CheckID
if a.state.Check(checkID) != nil {
if a.State.Check(checkID) != nil {
// Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q",
@ -2259,7 +2263,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
// unloadChecks will deregister all checks known to the local agent.
func (a *Agent) unloadChecks() error {
for id := range a.state.Checks() {
for id := range a.State.Checks() {
if err := a.RemoveCheck(id, false); err != nil {
return fmt.Errorf("Failed deregistering check '%s': %s", id, err)
}
@ -2271,7 +2275,7 @@ func (a *Agent) unloadChecks() error {
// checks. This is done before we reload our checks, so that we can properly
// restore into the same state.
func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck {
return a.state.Checks()
return a.State.Checks()
}
// restoreCheckState is used to reset the health state based on a snapshot.
@ -2279,7 +2283,7 @@ func (a *Agent) snapshotCheckState() map[types.CheckID]*structs.HealthCheck {
// in health state and potential session invalidations.
func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
for id, check := range snap {
a.state.UpdateCheck(id, check.Status, check.Output)
a.State.UpdateCheck(id, check.Status, check.Output)
}
}
@ -2291,12 +2295,12 @@ func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error {
meta[k] = v
}
meta[structs.MetaSegmentKey] = conf.SegmentName
return a.state.LoadMetadata(meta)
return a.State.LoadMetadata(meta)
}
// unloadMetadata resets the local metadata state
func (a *Agent) unloadMetadata() {
a.state.UnloadMetadata()
a.State.UnloadMetadata()
}
// serviceMaintCheckID returns the ID of a given service's maintenance check
@ -2307,14 +2311,14 @@ func serviceMaintCheckID(serviceID string) types.CheckID {
// EnableServiceMaintenance will register a false health check against the given
// service ID with critical status. This will exclude the service from queries.
func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error {
service, ok := a.state.Services()[serviceID]
service, ok := a.State.Services()[serviceID]
if !ok {
return fmt.Errorf("No service registered with ID %q", serviceID)
}
// Check if maintenance mode is not already enabled
checkID := serviceMaintCheckID(serviceID)
if _, ok := a.state.Checks()[checkID]; ok {
if _, ok := a.State.Checks()[checkID]; ok {
return nil
}
@ -2342,13 +2346,13 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason, token string) error
// DisableServiceMaintenance will deregister the fake maintenance mode check
// if the service has been marked as in maintenance.
func (a *Agent) DisableServiceMaintenance(serviceID string) error {
if _, ok := a.state.Services()[serviceID]; !ok {
if _, ok := a.State.Services()[serviceID]; !ok {
return fmt.Errorf("No service registered with ID %q", serviceID)
}
// Check if maintenance mode is enabled
checkID := serviceMaintCheckID(serviceID)
if _, ok := a.state.Checks()[checkID]; !ok {
if _, ok := a.State.Checks()[checkID]; !ok {
return nil
}
@ -2362,7 +2366,7 @@ func (a *Agent) DisableServiceMaintenance(serviceID string) error {
// EnableNodeMaintenance places a node into maintenance mode.
func (a *Agent) EnableNodeMaintenance(reason, token string) {
// Ensure node maintenance is not already enabled
if _, ok := a.state.Checks()[structs.NodeMaint]; ok {
if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
return
}
@ -2385,7 +2389,7 @@ func (a *Agent) EnableNodeMaintenance(reason, token string) {
// DisableNodeMaintenance removes a node from maintenance mode
func (a *Agent) DisableNodeMaintenance() {
if _, ok := a.state.Checks()[structs.NodeMaint]; !ok {
if _, ok := a.State.Checks()[structs.NodeMaint]; !ok {
return
}
a.RemoveCheck(structs.NodeMaint, true)
@ -2429,7 +2433,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
// Update filtered metrics
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)
a.state.SetDiscardCheckOutput(newCfg.DiscardCheckOutput)
a.State.SetDiscardCheckOutput(newCfg.DiscardCheckOutput)
return nil
}

View File

@ -72,7 +72,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
Coord: cs[s.agent.config.SegmentName],
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(),
Meta: s.agent.State.Metadata(),
}, nil
}
@ -137,7 +137,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
var token string
s.parseToken(req, &token)
services := s.agent.state.Services()
services := s.agent.State.Services()
if err := s.agent.filterServices(token, &services); err != nil {
return nil, err
}
@ -161,7 +161,7 @@ func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (i
var token string
s.parseToken(req, &token)
checks := s.agent.state.Checks()
checks := s.agent.State.Checks()
if err := s.agent.filterChecks(token, &checks); err != nil {
return nil, err
}
@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request
// services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPServer) syncChanges() {
if err := s.agent.state.SyncChanges(); err != nil {
if err := s.agent.State.SyncChanges(); err != nil {
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
}

View File

@ -51,7 +51,7 @@ func TestAgent_Services(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
a.state.AddService(srv1, "")
a.State.AddService(srv1, "")
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
obj, err := a.srv.AgentServices(nil, req)
@ -78,7 +78,7 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
a.state.AddService(srv1, "")
a.State.AddService(srv1, "")
t.Run("no token", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
@ -116,7 +116,7 @@ func TestAgent_Checks(t *testing.T) {
Name: "mysql",
Status: api.HealthPassing,
}
a.state.AddCheck(chk1, "")
a.State.AddCheck(chk1, "")
req, _ := http.NewRequest("GET", "/v1/agent/checks", nil)
obj, err := a.srv.AgentChecks(nil, req)
@ -143,7 +143,7 @@ func TestAgent_Checks_ACLFilter(t *testing.T) {
Name: "mysql",
Status: api.HealthPassing,
}
a.state.AddCheck(chk1, "")
a.State.AddCheck(chk1, "")
t.Run("no token", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/agent/checks", nil)
@ -283,8 +283,8 @@ func TestAgent_Reload(t *testing.T) {
`)
defer a.Shutdown()
if _, ok := a.state.services["redis"]; !ok {
t.Fatalf("missing redis service")
if a.State.Service("redis") == nil {
t.Fatal("missing redis service")
}
cfg2 := TestConfig(config.Source{
@ -307,8 +307,8 @@ func TestAgent_Reload(t *testing.T) {
if err := a.ReloadConfig(cfg2); err != nil {
t.Fatalf("got error %v want nil", err)
}
if _, ok := a.state.services["redis-reloaded"]; !ok {
t.Fatalf("missing redis-reloaded service")
if a.State.Service("redis-reloaded") == nil {
t.Fatal("missing redis-reloaded service")
}
for _, wp := range a.watchPlans {
@ -682,7 +682,7 @@ func TestAgent_RegisterCheck(t *testing.T) {
// Ensure we have a check mapping
checkID := types.CheckID("test")
if _, ok := a.state.Checks()[checkID]; !ok {
if _, ok := a.State.Checks()[checkID]; !ok {
t.Fatalf("missing test check")
}
@ -691,12 +691,12 @@ func TestAgent_RegisterCheck(t *testing.T) {
}
// Ensure the token was configured
if token := a.state.CheckToken(checkID); token == "" {
if token := a.State.CheckToken(checkID); token == "" {
t.Fatalf("missing token")
}
// By default, checks start in critical state.
state := a.state.Checks()[checkID]
state := a.State.Checks()[checkID]
if state.Status != api.HealthCritical {
t.Fatalf("bad: %v", state)
}
@ -817,7 +817,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
// Ensure we have a check mapping
checkID := types.CheckID("test")
if _, ok := a.state.Checks()[checkID]; !ok {
if _, ok := a.State.Checks()[checkID]; !ok {
t.Fatalf("missing test check")
}
@ -825,7 +825,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
t.Fatalf("missing test check ttl")
}
state := a.state.Checks()[checkID]
state := a.State.Checks()[checkID]
if state.Status != api.HealthPassing {
t.Fatalf("bad: %v", state)
}
@ -896,7 +896,7 @@ func TestAgent_DeregisterCheck(t *testing.T) {
}
// Ensure we have a check mapping
if _, ok := a.state.Checks()["test"]; ok {
if _, ok := a.State.Checks()["test"]; ok {
t.Fatalf("have test check")
}
}
@ -947,7 +947,7 @@ func TestAgent_PassCheck(t *testing.T) {
}
// Ensure we have a check mapping
state := a.state.Checks()["test"]
state := a.State.Checks()["test"]
if state.Status != api.HealthPassing {
t.Fatalf("bad: %v", state)
}
@ -1000,7 +1000,7 @@ func TestAgent_WarnCheck(t *testing.T) {
}
// Ensure we have a check mapping
state := a.state.Checks()["test"]
state := a.State.Checks()["test"]
if state.Status != api.HealthWarning {
t.Fatalf("bad: %v", state)
}
@ -1053,7 +1053,7 @@ func TestAgent_FailCheck(t *testing.T) {
}
// Ensure we have a check mapping
state := a.state.Checks()["test"]
state := a.State.Checks()["test"]
if state.Status != api.HealthCritical {
t.Fatalf("bad: %v", state)
}
@ -1117,7 +1117,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
t.Fatalf("expected 200, got %d", resp.Code)
}
state := a.state.Checks()["test"]
state := a.State.Checks()["test"]
if state.Status != c.Status || state.Output != c.Output {
t.Fatalf("bad: %v", state)
}
@ -1145,7 +1145,7 @@ func TestAgent_UpdateCheck(t *testing.T) {
// Since we append some notes about truncating, we just do a
// rough check that the output buffer was cut down so this test
// isn't super brittle.
state := a.state.Checks()["test"]
state := a.State.Checks()["test"]
if state.Status != api.HealthPassing || len(state.Output) > 2*CheckBufSize {
t.Fatalf("bad: %v", state)
}
@ -1228,12 +1228,12 @@ func TestAgent_RegisterService(t *testing.T) {
}
// Ensure the servie
if _, ok := a.state.Services()["test"]; !ok {
if _, ok := a.State.Services()["test"]; !ok {
t.Fatalf("missing test service")
}
// Ensure we have a check mapping
checks := a.state.Checks()
checks := a.State.Checks()
if len(checks) != 3 {
t.Fatalf("bad: %v", checks)
}
@ -1243,7 +1243,7 @@ func TestAgent_RegisterService(t *testing.T) {
}
// Ensure the token was configured
if token := a.state.ServiceToken("test"); token == "" {
if token := a.State.ServiceToken("test"); token == "" {
t.Fatalf("missing token")
}
}
@ -1364,11 +1364,11 @@ func TestAgent_DeregisterService(t *testing.T) {
}
// Ensure we have a check mapping
if _, ok := a.state.Services()["test"]; ok {
if _, ok := a.State.Services()["test"]; ok {
t.Fatalf("have test service")
}
if _, ok := a.state.Checks()["test"]; ok {
if _, ok := a.State.Checks()["test"]; ok {
t.Fatalf("have test check")
}
}
@ -1466,13 +1466,13 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
// Ensure the maintenance check was registered
checkID := serviceMaintCheckID("test")
check, ok := a.state.Checks()[checkID]
check, ok := a.State.Checks()[checkID]
if !ok {
t.Fatalf("should have registered maintenance check")
}
// Ensure the token was added
if token := a.state.CheckToken(checkID); token != "mytoken" {
if token := a.State.CheckToken(checkID); token != "mytoken" {
t.Fatalf("expected 'mytoken', got '%s'", token)
}
@ -1513,7 +1513,7 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
// Ensure the maintenance check was removed
checkID := serviceMaintCheckID("test")
if _, ok := a.state.Checks()[checkID]; ok {
if _, ok := a.State.Checks()[checkID]; ok {
t.Fatalf("should have removed maintenance check")
}
}
@ -1579,13 +1579,13 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) {
}
// Ensure the maintenance check was registered
check, ok := a.state.Checks()[structs.NodeMaint]
check, ok := a.State.Checks()[structs.NodeMaint]
if !ok {
t.Fatalf("should have registered maintenance check")
}
// Check that the token was used
if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" {
if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" {
t.Fatalf("expected 'mytoken', got '%s'", token)
}
@ -1614,7 +1614,7 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) {
}
// Ensure the maintenance check was removed
if _, ok := a.state.Checks()[structs.NodeMaint]; ok {
if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
t.Fatalf("should have removed maintenance check")
}
}
@ -1670,7 +1670,7 @@ func TestAgent_RegisterCheck_Service(t *testing.T) {
}
// Ensure we have a check mapping
result := a.state.Checks()
result := a.State.Checks()
if _, ok := result["service:memcache"]; !ok {
t.Fatalf("missing memcached check")
}

View File

@ -363,14 +363,14 @@ func TestAgent_AddService(t *testing.T) {
t.Fatalf("err: %v", err)
}
got, want := a.state.Services()[tt.srv.ID], tt.srv
got, want := a.State.Services()[tt.srv.ID], tt.srv
verify.Values(t, "", got, want)
})
// check the health checks
for k, v := range tt.healthChks {
t.Run(k, func(t *testing.T) {
got, want := a.state.Checks()[types.CheckID(k)], v
got, want := a.State.Checks()[types.CheckID(k)], v
verify.Values(t, k, got, want)
})
}
@ -437,10 +437,10 @@ func TestAgent_RemoveService(t *testing.T) {
if err := a.RemoveService("memcache", false); err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := a.state.Checks()["service:memcache"]; ok {
if _, ok := a.State.Checks()["service:memcache"]; ok {
t.Fatalf("have memcache check")
}
if _, ok := a.state.Checks()["check2"]; ok {
if _, ok := a.State.Checks()["check2"]; ok {
t.Fatalf("have check2 check")
}
}
@ -466,15 +466,15 @@ func TestAgent_RemoveService(t *testing.T) {
}
// Ensure we have a state mapping
if _, ok := a.state.Services()["redis"]; ok {
if _, ok := a.State.Services()["redis"]; ok {
t.Fatalf("have redis service")
}
// Ensure checks were removed
if _, ok := a.state.Checks()["service:redis:1"]; ok {
if _, ok := a.State.Checks()["service:redis:1"]; ok {
t.Fatalf("check redis:1 should be removed")
}
if _, ok := a.state.Checks()["service:redis:2"]; ok {
if _, ok := a.State.Checks()["service:redis:2"]; ok {
t.Fatalf("check redis:2 should be removed")
}
@ -507,7 +507,7 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
}
// verify chk1 exists
if a.state.Checks()["chk1"] == nil {
if a.State.Checks()["chk1"] == nil {
t.Fatal("Could not find health check chk1")
}
@ -517,10 +517,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
}
// check that both checks are there
if got, want := a.state.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) {
if got, want := a.State.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) {
t.FailNow()
}
if got, want := a.state.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) {
if got, want := a.State.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) {
t.FailNow()
}
@ -530,10 +530,10 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
}
// Check that both checks are gone
if a.state.Checks()["chk1"] != nil {
if a.State.Checks()["chk1"] != nil {
t.Fatal("Found health check chk1 want nil")
}
if a.state.Checks()["chk2"] != nil {
if a.State.Checks()["chk2"] != nil {
t.Fatal("Found health check chk2 want nil")
}
}
@ -561,7 +561,7 @@ func TestAgent_AddCheck(t *testing.T) {
}
// Ensure we have a check mapping
sChk, ok := a.state.Checks()["mem"]
sChk, ok := a.State.Checks()["mem"]
if !ok {
t.Fatalf("missing mem check")
}
@ -600,7 +600,7 @@ func TestAgent_AddCheck_StartPassing(t *testing.T) {
}
// Ensure we have a check mapping
sChk, ok := a.state.Checks()["mem"]
sChk, ok := a.State.Checks()["mem"]
if !ok {
t.Fatalf("missing mem check")
}
@ -639,7 +639,7 @@ func TestAgent_AddCheck_MinInterval(t *testing.T) {
}
// Ensure we have a check mapping
if _, ok := a.state.Checks()["mem"]; !ok {
if _, ok := a.State.Checks()["mem"]; !ok {
t.Fatalf("missing mem check")
}
@ -704,7 +704,7 @@ func TestAgent_AddCheck_RestoreState(t *testing.T) {
}
// Ensure the check status was restored during registration
checks := a.state.Checks()
checks := a.State.Checks()
check, ok := checks["baz"]
if !ok {
t.Fatalf("missing check")
@ -739,7 +739,7 @@ func TestAgent_AddCheck_ExecDisable(t *testing.T) {
}
// Ensure we don't have a check mapping
if memChk := a.state.Checks()["mem"]; memChk != nil {
if memChk := a.State.Checks()["mem"]; memChk != nil {
t.Fatalf("should be missing mem check")
}
}
@ -782,7 +782,7 @@ func TestAgent_RemoveCheck(t *testing.T) {
}
// Ensure we have a check mapping
if _, ok := a.state.Checks()["mem"]; ok {
if _, ok := a.State.Checks()["mem"]; ok {
t.Fatalf("have mem check")
}
@ -817,7 +817,7 @@ func TestAgent_updateTTLCheck(t *testing.T) {
}
// Ensure we have a check mapping.
status := a.state.Checks()["mem"]
status := a.State.Checks()["mem"]
if status.Status != api.HealthPassing {
t.Fatalf("bad: %v", status)
}
@ -904,15 +904,15 @@ func TestAgent_PersistService(t *testing.T) {
a2.Start()
defer a2.Shutdown()
restored, ok := a2.state.services[svc.ID]
if !ok {
t.Fatalf("bad: %#v", a2.state.services)
restored := a2.State.ServiceState(svc.ID)
if restored == nil {
t.Fatalf("service %q missing", svc.ID)
}
if a2.state.serviceTokens[svc.ID] != "mytoken" {
t.Fatalf("bad: %#v", a2.state.services[svc.ID])
if got, want := restored.Token, "mytoken"; got != want {
t.Fatalf("got token %q want %q", got, want)
}
if restored.Port != 8001 {
t.Fatalf("bad: %#v", restored)
if got, want := restored.Service.Port, 8081; got != want {
t.Fatalf("got port %d want %d", got, want)
}
}
@ -951,7 +951,7 @@ func TestAgent_persistedService_compat(t *testing.T) {
}
// Ensure the service was restored
services := a.state.Services()
services := a.State.Services()
result, ok := services["redis"]
if !ok {
t.Fatalf("missing service")
@ -1043,8 +1043,8 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
if _, err := os.Stat(file); err == nil {
t.Fatalf("should have removed persisted service")
}
result, ok := a2.state.services["redis"]
if !ok {
result := a2.State.Service("redis")
if result == nil {
t.Fatalf("missing service registration")
}
if !reflect.DeepEqual(result.Tags, []string{"bar"}) || result.Port != 9000 {
@ -1137,9 +1137,9 @@ func TestAgent_PersistCheck(t *testing.T) {
a2.Start()
defer a2.Shutdown()
result, ok := a2.state.checks[check.CheckID]
if !ok {
t.Fatalf("bad: %#v", a2.state.checks)
result := a2.State.Check(check.CheckID)
if result == nil {
t.Fatalf("bad: %#v", a2.State.Checks())
}
if result.Status != api.HealthCritical {
t.Fatalf("bad: %#v", result)
@ -1152,8 +1152,8 @@ func TestAgent_PersistCheck(t *testing.T) {
if _, ok := a2.checkMonitors[check.CheckID]; !ok {
t.Fatalf("bad: %#v", a2.checkMonitors)
}
if a2.state.checkTokens[check.CheckID] != "mytoken" {
t.Fatalf("bad: %s", a2.state.checkTokens[check.CheckID])
if a2.State.CheckState(check.CheckID).Token != "mytoken" {
t.Fatalf("bad: %s", a2.State.CheckState(check.CheckID).Token)
}
}
@ -1241,8 +1241,8 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
if _, err := os.Stat(file); err == nil {
t.Fatalf("should have removed persisted check")
}
result, ok := a2.state.checks["mem"]
if !ok {
result := a2.State.Check("mem")
if result == nil {
t.Fatalf("missing check registration")
}
expected := &structs.HealthCheck{
@ -1269,11 +1269,11 @@ func TestAgent_loadChecks_token(t *testing.T) {
`)
defer a.Shutdown()
checks := a.state.Checks()
checks := a.State.Checks()
if _, ok := checks["rabbitmq"]; !ok {
t.Fatalf("missing check")
}
if token := a.state.CheckToken("rabbitmq"); token != "abc123" {
if token := a.State.CheckToken("rabbitmq"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
}
@ -1307,7 +1307,7 @@ func TestAgent_unloadChecks(t *testing.T) {
t.Fatalf("err: %s", err)
}
found := false
for check := range a.state.Checks() {
for check := range a.State.Checks() {
if check == check1.CheckID {
found = true
break
@ -1323,7 +1323,7 @@ func TestAgent_unloadChecks(t *testing.T) {
}
// Make sure it was unloaded
for check := range a.state.Checks() {
for check := range a.State.Checks() {
if check == check1.CheckID {
t.Fatalf("should have unloaded checks")
}
@ -1342,11 +1342,11 @@ func TestAgent_loadServices_token(t *testing.T) {
`)
defer a.Shutdown()
services := a.state.Services()
services := a.State.Services()
if _, ok := services["rabbitmq"]; !ok {
t.Fatalf("missing service")
}
if token := a.state.ServiceToken("rabbitmq"); token != "abc123" {
if token := a.State.ServiceToken("rabbitmq"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
}
@ -1368,7 +1368,7 @@ func TestAgent_unloadServices(t *testing.T) {
t.Fatalf("err: %v", err)
}
found := false
for id := range a.state.Services() {
for id := range a.State.Services() {
if id == svc.ID {
found = true
break
@ -1382,7 +1382,7 @@ func TestAgent_unloadServices(t *testing.T) {
if err := a.unloadServices(); err != nil {
t.Fatalf("err: %s", err)
}
if len(a.state.Services()) != 0 {
if len(a.State.Services()) != 0 {
t.Fatalf("should have unloaded services")
}
}
@ -1411,13 +1411,13 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
// Make sure the critical health check was added
checkID := serviceMaintCheckID("redis")
check, ok := a.state.Checks()[checkID]
check, ok := a.State.Checks()[checkID]
if !ok {
t.Fatalf("should have registered critical maintenance check")
}
// Check that the token was used to register the check
if token := a.state.CheckToken(checkID); token != "mytoken" {
if token := a.State.CheckToken(checkID); token != "mytoken" {
t.Fatalf("expected 'mytoken', got: '%s'", token)
}
@ -1432,7 +1432,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
}
// Ensure the check was deregistered
if _, ok := a.state.Checks()[checkID]; ok {
if _, ok := a.State.Checks()[checkID]; ok {
t.Fatalf("should have deregistered maintenance check")
}
@ -1442,7 +1442,7 @@ func TestAgent_Service_MaintenanceMode(t *testing.T) {
}
// Ensure the check was registered with the default notes
check, ok = a.state.Checks()[checkID]
check, ok = a.State.Checks()[checkID]
if !ok {
t.Fatalf("should have registered critical check")
}
@ -1479,19 +1479,19 @@ func TestAgent_Service_Reap(t *testing.T) {
}
// Make sure it's there and there's no critical check yet.
if _, ok := a.state.Services()["redis"]; !ok {
if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := a.state.CriticalChecks(); len(checks) > 0 {
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
// Wait for the check TTL to fail but before the check is reaped.
time.Sleep(100 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok {
if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := a.state.CriticalChecks(); len(checks) != 1 {
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
@ -1499,28 +1499,28 @@ func TestAgent_Service_Reap(t *testing.T) {
if err := a.updateTTLCheck("service:redis", api.HealthPassing, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := a.state.Services()["redis"]; !ok {
if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := a.state.CriticalChecks(); len(checks) > 0 {
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
// Wait for the check TTL to fail again.
time.Sleep(100 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok {
if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := a.state.CriticalChecks(); len(checks) != 1 {
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
// Wait for the reap.
time.Sleep(400 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; ok {
if _, ok := a.State.Services()["redis"]; ok {
t.Fatalf("redis service should have been reaped")
}
if checks := a.state.CriticalChecks(); len(checks) > 0 {
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
}
@ -1552,28 +1552,28 @@ func TestAgent_Service_NoReap(t *testing.T) {
}
// Make sure it's there and there's no critical check yet.
if _, ok := a.state.Services()["redis"]; !ok {
if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := a.state.CriticalChecks(); len(checks) > 0 {
if checks := a.State.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have critical checks")
}
// Wait for the check TTL to fail.
time.Sleep(200 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok {
if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := a.state.CriticalChecks(); len(checks) != 1 {
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
// Wait a while and make sure it doesn't reap.
time.Sleep(200 * time.Millisecond)
if _, ok := a.state.Services()["redis"]; !ok {
if _, ok := a.State.Services()["redis"]; !ok {
t.Fatalf("should have redis service")
}
if checks := a.state.CriticalChecks(); len(checks) != 1 {
if checks := a.State.CriticalCheckStates(); len(checks) != 1 {
t.Fatalf("should have a critical check")
}
}
@ -1612,7 +1612,7 @@ func TestAgent_addCheck_restoresSnapshot(t *testing.T) {
if err := a.AddService(svc, chkTypes, false, ""); err != nil {
t.Fatalf("err: %s", err)
}
check, ok := a.state.Checks()["service:redis"]
check, ok := a.State.Checks()["service:redis"]
if !ok {
t.Fatalf("missing check")
}
@ -1630,13 +1630,13 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
a.EnableNodeMaintenance("broken", "mytoken")
// Make sure the critical health check was added
check, ok := a.state.Checks()[structs.NodeMaint]
check, ok := a.State.Checks()[structs.NodeMaint]
if !ok {
t.Fatalf("should have registered critical node check")
}
// Check that the token was used to register the check
if token := a.state.CheckToken(structs.NodeMaint); token != "mytoken" {
if token := a.State.CheckToken(structs.NodeMaint); token != "mytoken" {
t.Fatalf("expected 'mytoken', got: '%s'", token)
}
@ -1649,7 +1649,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
a.DisableNodeMaintenance()
// Ensure the check was deregistered
if _, ok := a.state.Checks()[structs.NodeMaint]; ok {
if _, ok := a.State.Checks()[structs.NodeMaint]; ok {
t.Fatalf("should have deregistered critical node check")
}
@ -1657,7 +1657,7 @@ func TestAgent_NodeMaintenanceMode(t *testing.T) {
a.EnableNodeMaintenance("", "")
// Make sure the check was registered with the default note
check, ok = a.state.Checks()[structs.NodeMaint]
check, ok = a.State.Checks()[structs.NodeMaint]
if !ok {
t.Fatalf("should have registered critical node check")
}
@ -1712,7 +1712,7 @@ func TestAgent_checkStateSnapshot(t *testing.T) {
a.restoreCheckState(snap)
// Search for the check
out, ok := a.state.Checks()[check1.CheckID]
out, ok := a.State.Checks()[check1.CheckID]
if !ok {
t.Fatalf("check should have been registered")
}

View File

@ -33,22 +33,32 @@ func TestCatalogRegister(t *testing.T) {
t.Fatalf("bad: %v", res)
}
// data race
func() {
a.state.Lock()
defer a.state.Unlock()
// todo(fs): data race
// func() {
// a.State.Lock()
// defer a.State.Unlock()
// Service should be in sync
if err := a.state.syncService("foo"); err != nil {
t.Fatalf("err: %s", err)
// // Service should be in sync
// if err := a.State.syncService("foo"); err != nil {
// t.Fatalf("err: %s", err)
// }
// if _, ok := a.State.serviceStatus["foo"]; !ok {
// t.Fatalf("bad: %#v", a.State.serviceStatus)
// }
// if !a.State.serviceStatus["foo"].inSync {
// t.Fatalf("should be in sync")
// }
// }()
if err := a.State.SyncChanges(); err != nil {
t.Fatal("sync failed: ", err)
}
if _, ok := a.state.serviceStatus["foo"]; !ok {
t.Fatalf("bad: %#v", a.state.serviceStatus)
s := a.State.ServiceState("foo")
if s == nil {
t.Fatal("service 'foo' missing")
}
if !a.state.serviceStatus["foo"].inSync {
t.Fatalf("should be in sync")
if !s.InSync {
t.Fatalf("service 'foo' should be in sync")
}
}()
}
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {

View File

@ -51,6 +51,14 @@ type ServiceState struct {
Deleted bool
}
// Clone returns a shallow copy of the object. The service record still
// points to the original service record and must not be modified.
func (s *ServiceState) Clone() *ServiceState {
s2 := new(ServiceState)
*s2 = *s
return s2
}
// CheckState describes the state of a health check record.
type CheckState struct {
// Check is the local copy of the health check record.
@ -79,6 +87,15 @@ type CheckState struct {
Deleted bool
}
// Clone returns a shallow copy of the object. The check record and the
// defer timer still point to the original values and must not be
// modified.
func (c *CheckState) Clone() *CheckState {
c2 := new(CheckState)
*c2 = *c
return c2
}
// Critical returns true when the health check is in critical state.
func (c *CheckState) Critical() bool {
return !c.CriticalTime.IsZero()
@ -189,9 +206,6 @@ func (l *State) serviceToken(id string) string {
// ensure it is registered
// todo(fs): where is the persistence happening?
func (l *State) AddService(service *structs.NodeService, token string) error {
l.Lock()
defer l.Unlock()
if service == nil {
return fmt.Errorf("no service")
}
@ -202,15 +216,21 @@ func (l *State) AddService(service *structs.NodeService, token string) error {
service.ID = service.Service
}
l.services[service.ID] = &ServiceState{
l.AddServiceState(&ServiceState{
Service: service,
Token: token,
}
l.changeMade()
})
return nil
}
func (l *State) AddServiceState(s *ServiceState) {
l.Lock()
defer l.Unlock()
l.services[s.Service.ID] = s
l.changeMade()
}
// RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered.
func (l *State) RemoveService(id string) error {
@ -261,6 +281,37 @@ func (l *State) Services() map[string]*structs.NodeService {
return m
}
// ServiceState returns a shallow copy of the current service state
// record. The service record still points to the original service
// record and must not be modified.
func (l *State) ServiceState(id string) *ServiceState {
l.RLock()
defer l.RUnlock()
s := l.services[id]
if s == nil || s.Deleted {
return nil
}
return s.Clone()
}
// ServiceStates returns a shallow copy of all service state records.
// The service record still points to the original service record and
// must not be modified.
func (l *State) ServiceStates() map[string]*ServiceState {
l.RLock()
defer l.RUnlock()
m := make(map[string]*ServiceState)
for id, s := range l.services {
if s.Deleted {
continue
}
m[id] = s.Clone()
}
return m
}
// CheckToken is used to return the configured health check token for a
// Check, or if none is configured, the default agent ACL token.
func (l *State) CheckToken(checkID types.CheckID) string {
@ -286,9 +337,6 @@ func (l *State) checkToken(id types.CheckID) string {
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
l.Lock()
defer l.Unlock()
if check == nil {
return fmt.Errorf("no check")
}
@ -306,15 +354,21 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
// hard-set the node name
check.Node = l.config.NodeName
l.checks[check.CheckID] = &CheckState{
l.AddCheckState(&CheckState{
Check: check,
Token: token,
}
l.changeMade()
})
return nil
}
func (l *State) AddCheckState(c *CheckState) {
l.Lock()
defer l.Unlock()
l.checks[c.Check.CheckID] = c
l.changeMade()
}
// RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered
// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well.
@ -418,17 +472,40 @@ func (l *State) Check(id types.CheckID) *structs.HealthCheck {
// Checks returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server
func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
m := make(map[types.CheckID]*structs.HealthCheck)
for id, c := range l.CheckStates() {
m[id] = c.Check
}
return m
}
// CheckState returns a shallow copy of the current health check state
// record. The health check record and the deferred check still point to
// the original values and must not be modified.
func (l *State) CheckState(id types.CheckID) *CheckState {
l.RLock()
defer l.RUnlock()
m := make(map[types.CheckID]*structs.HealthCheck)
c := l.checks[id]
if c == nil || c.Deleted {
return nil
}
return c.Clone()
}
// CheckStates returns a shallow copy of all health check state records.
// The health check records and the deferred checks still point to
// the original values and must not be modified.
func (l *State) CheckStates() map[types.CheckID]*CheckState {
l.RLock()
defer l.RUnlock()
m := make(map[types.CheckID]*CheckState)
for id, c := range l.checks {
if c.Deleted {
continue
}
c2 := new(structs.HealthCheck)
*c2 = *c.Check
m[id] = c2
m[id] = c.Clone()
}
return m
}
@ -444,7 +521,7 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
if c.Deleted || !c.Critical() {
continue
}
m[id] = c
m[id] = c.Clone()
}
return m
}

View File

@ -1,11 +1,14 @@
package local
package local_test
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
@ -16,7 +19,7 @@ import (
func TestAgentAntiEntropy_Services(t *testing.T) {
t.Parallel()
a := &TestAgent{Name: t.Name(), NoInitialSync: true}
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
a.Start()
defer a.Shutdown()
@ -35,7 +38,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
a.state.AddService(srv1, "")
a.State.AddService(srv1, "")
args.Service = srv1
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -48,7 +51,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{},
Port: 8000,
}
a.state.AddService(srv2, "")
a.State.AddService(srv2, "")
srv2_mod := new(structs.NodeService)
*srv2_mod = *srv2
@ -65,7 +68,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{},
Port: 80,
}
a.state.AddService(srv3, "")
a.State.AddService(srv3, "")
// Exists remote (delete)
srv4 := &structs.NodeService{
@ -87,7 +90,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Address: "127.0.0.10",
Port: 8000,
}
a.state.AddService(srv5, "")
a.State.AddService(srv5, "")
srv5_mod := new(structs.NodeService)
*srv5_mod = *srv5
@ -104,12 +107,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{},
Port: 11211,
}
a.state.AddService(srv6, "")
// todo(fs): data race
a.state.Lock()
a.state.serviceStatus["cache"] = syncStatus{inSync: true}
a.state.Unlock()
a.State.AddServiceState(&local.ServiceState{
Service: srv6,
InSync: true,
})
// Trigger anti-entropy run and wait
a.StartSync()
@ -170,26 +171,13 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}
}
// todo(fs): data race
a.state.RLock()
defer a.state.RUnlock()
// Check the local state
if len(a.state.services) != 5 {
r.Fatalf("bad: %v", a.state.services)
}
if len(a.state.serviceStatus) != 5 {
r.Fatalf("bad: %v", a.state.serviceStatus)
}
for name, status := range a.state.serviceStatus {
if !status.inSync {
r.Fatalf("should be in sync: %v %v", name, status)
}
if err := servicesInSync(a.State, 5); err != nil {
r.Fatal(err)
}
})
// Remove one of the services
a.state.RemoveService("api")
a.State.RemoveService("api")
// Trigger anti-entropy run and wait
a.StartSync()
@ -231,28 +219,15 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}
}
// todo(fs): data race
a.state.RLock()
defer a.state.RUnlock()
// Check the local state
if len(a.state.services) != 4 {
r.Fatalf("bad: %v", a.state.services)
}
if len(a.state.serviceStatus) != 4 {
r.Fatalf("bad: %v", a.state.serviceStatus)
}
for name, status := range a.state.serviceStatus {
if !status.inSync {
r.Fatalf("should be in sync: %v %v", name, status)
}
if err := servicesInSync(a.State, 4); err != nil {
r.Fatal(err)
}
})
}
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
t.Parallel()
a := &TestAgent{Name: t.Name(), NoInitialSync: true}
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
a.Start()
defer a.Shutdown()
@ -271,7 +246,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
Port: 6100,
EnableTagOverride: true,
}
a.state.AddService(srv1, "")
a.State.AddService(srv1, "")
srv1_mod := new(structs.NodeService)
*srv1_mod = *srv1
srv1_mod.Port = 7100
@ -289,7 +264,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
Port: 6200,
EnableTagOverride: false,
}
a.state.AddService(srv2, "")
a.State.AddService(srv2, "")
srv2_mod := new(structs.NodeService)
*srv2_mod = *srv2
srv2_mod.Port = 7200
@ -314,8 +289,8 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
r.Fatalf("err: %v", err)
}
a.state.RLock()
defer a.state.RUnlock()
a.State.RLock()
defer a.State.RUnlock()
// All the services should match
for id, serv := range services.NodeServices.Services {
@ -342,21 +317,15 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
}
}
// todo(fs): data race
a.state.RLock()
defer a.state.RUnlock()
for name, status := range a.state.serviceStatus {
if !status.inSync {
r.Fatalf("should be in sync: %v %v", name, status)
}
if err := servicesInSync(a.State, 2); err != nil {
r.Fatal(err)
}
})
}
func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
a := agent.NewTestAgent(t.Name(), "")
defer a.Shutdown()
{
@ -367,7 +336,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
a.state.AddService(srv, "")
a.State.AddService(srv, "")
chk := &structs.HealthCheck{
Node: a.Config.NodeName,
@ -376,18 +345,22 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "mysql",
Status: api.HealthPassing,
}
a.state.AddCheck(chk, "")
a.State.AddCheck(chk, "")
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// func() {
// a.State.RLock()
// defer a.State.RUnlock()
// Sync the service once
if err := a.state.syncService("mysql"); err != nil {
t.Fatalf("err: %s", err)
// // Sync the service once
// if err := a.State.syncService("mysql"); err != nil {
// t.Fatalf("err: %s", err)
// }
// }()
// todo(fs): is this correct?
if err := a.State.SyncChanges(); err != nil {
t.Fatal("sync failed: ", err)
}
}()
// We should have 2 services (consul included)
svcReq := structs.NodeSpecificRequest{
@ -424,7 +397,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
a.state.AddService(srv, "")
a.State.AddService(srv, "")
chk1 := &structs.HealthCheck{
Node: a.Config.NodeName,
@ -433,7 +406,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "redis",
Status: api.HealthPassing,
}
a.state.AddCheck(chk1, "")
a.State.AddCheck(chk1, "")
chk2 := &structs.HealthCheck{
Node: a.Config.NodeName,
@ -442,18 +415,22 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
ServiceID: "redis",
Status: api.HealthPassing,
}
a.state.AddCheck(chk2, "")
a.State.AddCheck(chk2, "")
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// func() {
// a.State.RLock()
// defer a.State.RUnlock()
// Sync the service once
if err := a.state.syncService("redis"); err != nil {
t.Fatalf("err: %s", err)
// // Sync the service once
// if err := a.State.syncService("redis"); err != nil {
// t.Fatalf("err: %s", err)
// }
// }()
// todo(fs): is this correct?
if err := a.State.SyncChanges(); err != nil {
t.Fatal("sync failed: ", err)
}
}()
// We should have 3 services (consul included)
svcReq := structs.NodeSpecificRequest{
@ -499,7 +476,7 @@ var testRegisterRules = `
func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
t.Parallel()
a := &TestAgent{Name: t.Name(), HCL: `
a := &agent.TestAgent{Name: t.Name(), HCL: `
acl_datacenter = "dc1"
acl_master_token = "root"
acl_default_policy = "deny"
@ -533,7 +510,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
a.state.AddService(srv1, token)
a.State.AddService(srv1, token)
// Create service (allowed)
srv2 := &structs.NodeService{
@ -542,7 +519,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
Tags: []string{"foo"},
Port: 5001,
}
a.state.AddService(srv2, token)
a.State.AddService(srv2, token)
// Trigger anti-entropy run and wait
a.StartSync()
@ -584,28 +561,13 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
}
}
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// Check the local state
if len(a.state.services) != 2 {
t.Fatalf("bad: %v", a.state.services)
if err := servicesInSync(a.State, 2); err != nil {
t.Fatal(err)
}
if len(a.state.serviceStatus) != 2 {
t.Fatalf("bad: %v", a.state.serviceStatus)
}
for name, status := range a.state.serviceStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}()
}
// Now remove the service and re-sync
a.state.RemoveService("api")
a.State.RemoveService("api")
a.StartSync()
time.Sleep(200 * time.Millisecond)
@ -643,35 +605,20 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
}
}
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// Check the local state
if len(a.state.services) != 1 {
t.Fatalf("bad: %v", a.state.services)
if err := servicesInSync(a.State, 1); err != nil {
t.Fatal(err)
}
if len(a.state.serviceStatus) != 1 {
t.Fatalf("bad: %v", a.state.serviceStatus)
}
for name, status := range a.state.serviceStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}()
}
// Make sure the token got cleaned up.
if token := a.state.ServiceToken("api"); token != "" {
if token := a.State.ServiceToken("api"); token != "" {
t.Fatalf("bad: %s", token)
}
}
func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Parallel()
a := &TestAgent{Name: t.Name(), NoInitialSync: true}
a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true}
a.Start()
defer a.Shutdown()
@ -690,7 +637,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "mysql",
Status: api.HealthPassing,
}
a.state.AddCheck(chk1, "")
a.State.AddCheck(chk1, "")
args.Check = chk1
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -703,7 +650,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "redis",
Status: api.HealthPassing,
}
a.state.AddCheck(chk2, "")
a.State.AddCheck(chk2, "")
chk2_mod := new(structs.HealthCheck)
*chk2_mod = *chk2
@ -720,7 +667,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "web",
Status: api.HealthPassing,
}
a.state.AddCheck(chk3, "")
a.State.AddCheck(chk3, "")
// Exists remote (delete)
chk4 := &structs.HealthCheck{
@ -741,12 +688,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
Name: "cache",
Status: api.HealthPassing,
}
a.state.AddCheck(chk5, "")
// todo(fs): data race
a.state.Lock()
a.state.checkStatus["cache"] = syncStatus{inSync: true}
a.state.Unlock()
a.State.AddCheckState(&local.CheckState{
Check: chk5,
InSync: true,
})
// Trigger anti-entropy run and wait
a.StartSync()
@ -796,24 +741,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
}
})
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// Check the local state
if len(a.state.checks) != 4 {
t.Fatalf("bad: %v", a.state.checks)
if err := checksInSync(a.State, 4); err != nil {
t.Fatal(err)
}
if len(a.state.checkStatus) != 4 {
t.Fatalf("bad: %v", a.state.checkStatus)
}
for name, status := range a.state.checkStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}()
// Make sure we sent along our node info addresses when we synced.
{
@ -836,7 +766,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
}
// Remove one of the checks
a.state.RemoveCheck("redis")
a.State.RemoveCheck("redis")
// Trigger anti-entropy run and wait
a.StartSync()
@ -876,29 +806,14 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
}
})
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// Check the local state
if len(a.state.checks) != 3 {
t.Fatalf("bad: %v", a.state.checks)
if err := checksInSync(a.State, 3); err != nil {
t.Fatal(err)
}
if len(a.state.checkStatus) != 3 {
t.Fatalf("bad: %v", a.state.checkStatus)
}
for name, status := range a.state.checkStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}()
}
func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
t.Parallel()
a := &TestAgent{Name: t.Name(), HCL: `
a := &agent.TestAgent{Name: t.Name(), HCL: `
acl_datacenter = "dc1"
acl_master_token = "root"
acl_default_policy = "deny"
@ -932,14 +847,14 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
Tags: []string{"master"},
Port: 5000,
}
a.state.AddService(srv1, "root")
a.State.AddService(srv1, "root")
srv2 := &structs.NodeService{
ID: "api",
Service: "api",
Tags: []string{"foo"},
Port: 5001,
}
a.state.AddService(srv2, "root")
a.State.AddService(srv2, "root")
// Trigger anti-entropy run and wait
a.StartSync()
@ -983,24 +898,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
}
}
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// Check the local state
if len(a.state.services) != 2 {
t.Fatalf("bad: %v", a.state.services)
if err := servicesInSync(a.State, 2); err != nil {
t.Fatal(err)
}
if len(a.state.serviceStatus) != 2 {
t.Fatalf("bad: %v", a.state.serviceStatus)
}
for name, status := range a.state.serviceStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}()
}
// This check won't be allowed.
@ -1013,7 +913,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
Name: "mysql",
Status: api.HealthPassing,
}
a.state.AddCheck(chk1, token)
a.State.AddCheck(chk1, token)
// This one will be allowed.
chk2 := &structs.HealthCheck{
@ -1025,7 +925,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
Name: "api",
Status: api.HealthPassing,
}
a.state.AddCheck(chk2, token)
a.State.AddCheck(chk2, token)
// Trigger anti-entropy run and wait.
a.StartSync()
@ -1068,27 +968,12 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
}
})
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// Check the local state.
if len(a.state.checks) != 2 {
t.Fatalf("bad: %v", a.state.checks)
if err := checksInSync(a.State, 2); err != nil {
t.Fatal(err)
}
if len(a.state.checkStatus) != 2 {
t.Fatalf("bad: %v", a.state.checkStatus)
}
for name, status := range a.state.checkStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}()
// Now delete the check and wait for sync.
a.state.RemoveCheck("api-check")
a.State.RemoveCheck("api-check")
a.StartSync()
time.Sleep(200 * time.Millisecond)
// Verify that we are in sync
@ -1126,27 +1011,12 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
}
})
// todo(fs): data race
func() {
a.state.RLock()
defer a.state.RUnlock()
// Check the local state.
if len(a.state.checks) != 1 {
t.Fatalf("bad: %v", a.state.checks)
if err := checksInSync(a.State, 1); err != nil {
t.Fatal(err)
}
if len(a.state.checkStatus) != 1 {
t.Fatalf("bad: %v", a.state.checkStatus)
}
for name, status := range a.state.checkStatus {
if !status.inSync {
t.Fatalf("should be in sync: %v %v", name, status)
}
}
}()
// Make sure the token got cleaned up.
if token := a.state.CheckToken("api-check"); token != "" {
if token := a.State.CheckToken("api-check"); token != "" {
t.Fatalf("bad: %s", token)
}
}
@ -1203,7 +1073,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) {
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
t.Parallel()
a := &TestAgent{Name: t.Name(), HCL: `
a := &agent.TestAgent{Name: t.Name(), HCL: `
check_update_interval = "500ms"
`, NoInitialSync: true}
a.Start()
@ -1217,7 +1087,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
Status: api.HealthPassing,
Output: "",
}
a.state.AddCheck(check, "")
a.State.AddCheck(check, "")
// Trigger anti-entropy run and wait
a.StartSync()
@ -1238,7 +1108,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
})
// Update the check output! Should be deferred
a.state.UpdateCheck("web", api.HealthPassing, "output")
a.State.UpdateCheck("web", api.HealthPassing, "output")
// Should not update for 500 milliseconds
time.Sleep(250 * time.Millisecond)
@ -1337,7 +1207,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
}
// Now make an update that should be deferred.
a.state.UpdateCheck("web", api.HealthPassing, "deferred")
a.State.UpdateCheck("web", api.HealthPassing, "deferred")
// Trigger anti-entropy run and wait.
a.StartSync()
@ -1381,7 +1251,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
nodeMeta := map[string]string{
"somekey": "somevalue",
}
a := &TestAgent{Name: t.Name(), HCL: `
a := &agent.TestAgent{Name: t.Name(), HCL: `
node_id = "40e4a748-2192-161a-0510-9bf59fe950b5"
node_meta {
somekey = "somevalue"
@ -1453,40 +1323,15 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
})
}
func TestAgentAntiEntropy_deleteService_fails(t *testing.T) {
t.Parallel()
l := new(localState)
// todo(fs): data race
l.Lock()
defer l.Unlock()
if err := l.deleteService(""); err == nil {
t.Fatalf("should have failed")
}
}
func TestAgentAntiEntropy_deleteCheck_fails(t *testing.T) {
t.Parallel()
l := new(localState)
// todo(fs): data race
l.Lock()
defer l.Unlock()
if err := l.deleteCheck(""); err == nil {
t.Fatalf("should have errored")
}
}
func TestAgent_serviceTokens(t *testing.T) {
t.Parallel()
cfg := agent.TestConfig()
tokens := new(token.Store)
tokens.UpdateUserToken("default")
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
l.AddService(&structs.NodeService{
ID: "redis",
}, "")
l.AddService(&structs.NodeService{ID: "redis"}, "")
// Returns default when no token is set
if token := l.ServiceToken("redis"); token != "default" {
@ -1494,7 +1339,7 @@ func TestAgent_serviceTokens(t *testing.T) {
}
// Returns configured token
l.serviceTokens["redis"] = "abc123"
l.AddService(&structs.NodeService{ID: "redis"}, "abc123")
if token := l.ServiceToken("redis"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
@ -1509,17 +1354,19 @@ func TestAgent_serviceTokens(t *testing.T) {
func TestAgent_checkTokens(t *testing.T) {
t.Parallel()
cfg := agent.TestConfig()
tokens := new(token.Store)
tokens.UpdateUserToken("default")
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
// Returns default when no token is set
l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "")
if token := l.CheckToken("mem"); token != "default" {
t.Fatalf("bad: %s", token)
}
// Returns configured token
l.checkTokens["mem"] = "abc123"
l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "abc123")
if token := l.CheckToken("mem"); token != "abc123" {
t.Fatalf("bad: %s", token)
}
@ -1533,7 +1380,7 @@ func TestAgent_checkTokens(t *testing.T) {
func TestAgent_checkCriticalTime(t *testing.T) {
t.Parallel()
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
l.AddService(svc, "")
@ -1548,54 +1395,54 @@ func TestAgent_checkCriticalTime(t *testing.T) {
Status: api.HealthPassing,
}
l.AddCheck(chk, "")
if checks := l.CriticalChecks(); len(checks) > 0 {
if checks := l.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have any critical checks")
}
// Set it to warning and make sure that doesn't show up as critical.
l.UpdateCheck(checkID, api.HealthWarning, "")
if checks := l.CriticalChecks(); len(checks) > 0 {
if checks := l.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have any critical checks")
}
// Fail the check and make sure the time looks reasonable.
l.UpdateCheck(checkID, api.HealthCritical, "")
if crit, ok := l.CriticalChecks()[checkID]; !ok {
if c, ok := l.CriticalCheckStates()[checkID]; !ok {
t.Fatalf("should have a critical check")
} else if crit.CriticalFor > time.Millisecond {
t.Fatalf("bad: %#v", crit)
} else if c.CriticalFor() > time.Millisecond {
t.Fatalf("bad: %#v", c)
}
// Wait a while, then fail it again and make sure the time keeps track
// of the initial failure, and doesn't reset here.
time.Sleep(50 * time.Millisecond)
l.UpdateCheck(chk.CheckID, api.HealthCritical, "")
if crit, ok := l.CriticalChecks()[checkID]; !ok {
if c, ok := l.CriticalCheckStates()[checkID]; !ok {
t.Fatalf("should have a critical check")
} else if crit.CriticalFor < 25*time.Millisecond ||
crit.CriticalFor > 75*time.Millisecond {
t.Fatalf("bad: %#v", crit)
} else if c.CriticalFor() < 25*time.Millisecond ||
c.CriticalFor() > 75*time.Millisecond {
t.Fatalf("bad: %#v", c)
}
// Set it passing again.
l.UpdateCheck(checkID, api.HealthPassing, "")
if checks := l.CriticalChecks(); len(checks) > 0 {
if checks := l.CriticalCheckStates(); len(checks) > 0 {
t.Fatalf("should not have any critical checks")
}
// Fail the check and make sure the time looks like it started again
// from the latest failure, not the original one.
l.UpdateCheck(checkID, api.HealthCritical, "")
if crit, ok := l.CriticalChecks()[checkID]; !ok {
if c, ok := l.CriticalCheckStates()[checkID]; !ok {
t.Fatalf("should have a critical check")
} else if crit.CriticalFor > time.Millisecond {
t.Fatalf("bad: %#v", crit)
} else if c.CriticalFor() > time.Millisecond {
t.Fatalf("bad: %#v", c)
}
}
func TestAgent_AddCheckFailure(t *testing.T) {
t.Parallel()
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
l := local.NewState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
// Add a check for a service that does not exist and verify that it fails
checkID := types.CheckID("redis:1")
@ -1615,7 +1462,7 @@ func TestAgent_AddCheckFailure(t *testing.T) {
func TestAgent_sendCoordinate(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), `
a := agent.NewTestAgent(t.Name(), `
sync_coordinate_interval_min = "1ms"
sync_coordinate_rate_target = 10.0
consul = {
@ -1649,3 +1496,29 @@ func TestAgent_sendCoordinate(t *testing.T) {
}
})
}
func servicesInSync(state *local.State, wantServices int) error {
services := state.ServiceStates()
if got, want := len(services), wantServices; got != want {
return fmt.Errorf("got %d services want %d", got, want)
}
for id, s := range services {
if !s.InSync {
return fmt.Errorf("service %q should be in sync", id)
}
}
return nil
}
func checksInSync(state *local.State, wantChecks int) error {
checks := state.CheckStates()
if got, want := len(checks), wantChecks; got != want {
return fmt.Errorf("got %d checks want %d", got, want)
}
for id, c := range checks {
if !c.InSync {
return fmt.Errorf("check %q should be in sync", id)
}
}
return nil
}

View File

@ -173,7 +173,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
}
// Scan for a match
services := a.state.Services()
services := a.State.Services()
found := false
OUTER:
for name, info := range services {

View File

@ -57,7 +57,7 @@ func TestShouldProcessUserEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "master"},
Port: 5000,
}
a.state.AddService(srv1, "")
a.State.AddService(srv1, "")
p := &UserEvent{}
if !a.shouldProcessUserEvent(p) {
@ -157,7 +157,7 @@ func TestFireReceiveEvent(t *testing.T) {
Tags: []string{"test", "foo", "bar", "master"},
Port: 5000,
}
a.state.AddService(srv1, "")
a.State.AddService(srv1, "")
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
err := a.UserEvent("dc1", "root", p1)