mirror of https://github.com/hashicorp/consul
Seperate localState from Agent
parent
0964285761
commit
410a0de0c8
|
@ -3,7 +3,6 @@ package agent
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/hashicorp/consul/consul"
|
"github.com/hashicorp/consul/consul"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
@ -82,29 +81,21 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
||||||
logger: log.New(logOutput, "", log.LstdFlags),
|
logger: log.New(logOutput, "", log.LstdFlags),
|
||||||
logOutput: logOutput,
|
logOutput: logOutput,
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
state: localState{
|
|
||||||
delaySync: make(chan struct{}, 1),
|
|
||||||
services: make(map[string]*structs.NodeService),
|
|
||||||
serviceStatus: make(map[string]syncStatus),
|
|
||||||
checks: make(map[string]*structs.HealthCheck),
|
|
||||||
checkStatus: make(map[string]syncStatus),
|
|
||||||
triggerCh: make(chan struct{}, 1),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup either the client or the server
|
// Setup either the client or the server
|
||||||
var err error
|
var err error
|
||||||
if config.Server {
|
if config.Server {
|
||||||
err = agent.setupServer()
|
err = agent.setupServer()
|
||||||
|
agent.state.Init(config, agent.server, agent.logger)
|
||||||
} else {
|
} else {
|
||||||
err = agent.setupClient()
|
err = agent.setupClient()
|
||||||
|
agent.state.Init(config, agent.client, agent.logger)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the anti entropy routine
|
|
||||||
go agent.antiEntropy()
|
|
||||||
return agent, nil
|
return agent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,3 +280,10 @@ func (a *Agent) WANMembers() []serf.Member {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StartSync is called once Services and Checks are registered.
|
||||||
|
// This is called to prevent a race between clients and the anti-entropy routines
|
||||||
|
func (a *Agent) StartSync() {
|
||||||
|
// Start the anti entropy routine
|
||||||
|
go a.state.antiEntropy(a.shutdownCh)
|
||||||
|
}
|
||||||
|
|
|
@ -6,12 +6,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
services := s.agent.Services()
|
services := s.agent.state.Services()
|
||||||
return services, nil
|
return services, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
checks := s.agent.Checks()
|
checks := s.agent.state.Checks()
|
||||||
return checks, nil
|
return checks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ func TestHTTPAgentServices(t *testing.T) {
|
||||||
Tag: "master",
|
Tag: "master",
|
||||||
Port: 5000,
|
Port: 5000,
|
||||||
}
|
}
|
||||||
srv.agent.AddService(srv1)
|
srv.agent.state.AddService(srv1)
|
||||||
|
|
||||||
obj, err := srv.AgentServices(nil, nil)
|
obj, err := srv.AgentServices(nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -49,7 +49,7 @@ func TestHTTPAgentChecks(t *testing.T) {
|
||||||
Name: "mysql",
|
Name: "mysql",
|
||||||
Status: structs.HealthPassing,
|
Status: structs.HealthPassing,
|
||||||
}
|
}
|
||||||
srv.agent.AddCheck(chk1)
|
srv.agent.state.AddCheck(chk1)
|
||||||
|
|
||||||
obj, err := srv.AgentChecks(nil, nil)
|
obj, err := srv.AgentChecks(nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -120,7 +120,7 @@ func TestCatalogNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes := obj.(structs.Nodes)
|
nodes := obj.(structs.Nodes)
|
||||||
if len(nodes) != 1 {
|
if len(nodes) != 2 {
|
||||||
t.Fatalf("bad: %v", obj)
|
t.Fatalf("bad: %v", obj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ func TestCatalogServices(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
services := obj.(structs.Services)
|
services := obj.(structs.Services)
|
||||||
if len(services) != 1 {
|
if len(services) != 2 {
|
||||||
t.Fatalf("bad: %v", obj)
|
t.Fatalf("bad: %v", obj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ func TestCheckTTL(t *testing.T) {
|
||||||
defer check.Stop()
|
defer check.Stop()
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
check.SetStatus(structs.HealthPassing)
|
check.SetStatus(structs.HealthPassing, "")
|
||||||
|
|
||||||
if mock.updates["foo"] != 1 {
|
if mock.updates["foo"] != 1 {
|
||||||
t.Fatalf("should have 1 updates %v", mock.updates)
|
t.Fatalf("should have 1 updates %v", mock.updates)
|
||||||
|
|
|
@ -200,7 +200,7 @@ func (c *Command) Run(args []string) int {
|
||||||
// TODO: Register services/checks
|
// TODO: Register services/checks
|
||||||
|
|
||||||
// Let the agent know we've finished registration
|
// Let the agent know we've finished registration
|
||||||
c.agent.RegistrationDone()
|
c.agent.StartSync()
|
||||||
|
|
||||||
c.Ui.Output("Consul agent running!")
|
c.Ui.Output("Consul agent running!")
|
||||||
c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
|
c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
|
||||||
|
|
|
@ -3,6 +3,7 @@ package agent
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/consul/consul"
|
"github.com/hashicorp/consul/consul"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"log"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -10,7 +11,6 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
syncRetryIntv = 30 * time.Second
|
syncRetryIntv = 30 * time.Second
|
||||||
maxDelaySync = 30 * time.Second
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// syncStatus is used to represent the difference between
|
// syncStatus is used to represent the difference between
|
||||||
|
@ -25,10 +25,13 @@ type syncStatus struct {
|
||||||
// catalog representation
|
// catalog representation
|
||||||
type localState struct {
|
type localState struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
logger *log.Logger
|
||||||
|
|
||||||
// delaySync is used to delay the initial sync until
|
// Config is the agent config
|
||||||
// the client has registered its services and checks.
|
config *Config
|
||||||
delaySync chan struct{}
|
|
||||||
|
// iface is the consul interface to use for keeping in sync
|
||||||
|
iface consul.Interface
|
||||||
|
|
||||||
// Services tracks the local services
|
// Services tracks the local services
|
||||||
services map[string]*structs.NodeService
|
services map[string]*structs.NodeService
|
||||||
|
@ -43,6 +46,18 @@ type localState struct {
|
||||||
triggerCh chan struct{}
|
triggerCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Init is used to initialize the local state
|
||||||
|
func (l *localState) Init(config *Config, iface consul.Interface, logger *log.Logger) {
|
||||||
|
l.config = config
|
||||||
|
l.iface = iface
|
||||||
|
l.logger = logger
|
||||||
|
l.services = make(map[string]*structs.NodeService)
|
||||||
|
l.serviceStatus = make(map[string]syncStatus)
|
||||||
|
l.checks = make(map[string]*structs.HealthCheck)
|
||||||
|
l.checkStatus = make(map[string]syncStatus)
|
||||||
|
l.triggerCh = make(chan struct{}, 1)
|
||||||
|
}
|
||||||
|
|
||||||
// changeMade is used to trigger an anti-entropy run
|
// changeMade is used to trigger an anti-entropy run
|
||||||
func (l *localState) changeMade() {
|
func (l *localState) changeMade() {
|
||||||
select {
|
select {
|
||||||
|
@ -51,52 +66,42 @@ func (l *localState) changeMade() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegistrationDone is called by the Agent client once base Services
|
|
||||||
// and Checks are registered. This is called to prevent a race
|
|
||||||
// between clients and the anti-entropy routines
|
|
||||||
func (a *Agent) RegistrationDone() {
|
|
||||||
select {
|
|
||||||
case a.state.delaySync <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddService is used to add a service entry to the local state.
|
// AddService is used to add a service entry to the local state.
|
||||||
// This entry is persistent and the agent will make a best effort to
|
// This entry is persistent and the agent will make a best effort to
|
||||||
// ensure it is registered
|
// ensure it is registered
|
||||||
func (a *Agent) AddService(service *structs.NodeService) {
|
func (l *localState) AddService(service *structs.NodeService) {
|
||||||
// Assign the ID if none given
|
// Assign the ID if none given
|
||||||
if service.ID == "" && service.Service != "" {
|
if service.ID == "" && service.Service != "" {
|
||||||
service.ID = service.Service
|
service.ID = service.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
a.state.services[service.ID] = service
|
l.services[service.ID] = service
|
||||||
a.state.serviceStatus[service.ID] = syncStatus{}
|
l.serviceStatus[service.ID] = syncStatus{}
|
||||||
a.state.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveService is used to remove a service entry from the local state.
|
// RemoveService is used to remove a service entry from the local state.
|
||||||
// The agent will make a best effort to ensure it is deregistered
|
// The agent will make a best effort to ensure it is deregistered
|
||||||
func (a *Agent) RemoveService(serviceID string) {
|
func (l *localState) RemoveService(serviceID string) {
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
delete(a.state.services, serviceID)
|
delete(l.services, serviceID)
|
||||||
a.state.serviceStatus[serviceID] = syncStatus{remoteDelete: true}
|
l.serviceStatus[serviceID] = syncStatus{remoteDelete: true}
|
||||||
a.state.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Services returns the locally registered services that the
|
// Services returns the locally registered services that the
|
||||||
// agent is aware of and are being kept in sync with the server
|
// agent is aware of and are being kept in sync with the server
|
||||||
func (a *Agent) Services() map[string]*structs.NodeService {
|
func (l *localState) Services() map[string]*structs.NodeService {
|
||||||
services := make(map[string]*structs.NodeService)
|
services := make(map[string]*structs.NodeService)
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
for name, serv := range a.state.services {
|
for name, serv := range l.services {
|
||||||
services[name] = serv
|
services[name] = serv
|
||||||
}
|
}
|
||||||
return services
|
return services
|
||||||
|
@ -105,35 +110,35 @@ func (a *Agent) Services() map[string]*structs.NodeService {
|
||||||
// AddCheck is used to add a health check to the local state.
|
// AddCheck is used to add a health check to the local state.
|
||||||
// This entry is persistent and the agent will make a best effort to
|
// This entry is persistent and the agent will make a best effort to
|
||||||
// ensure it is registered
|
// ensure it is registered
|
||||||
func (a *Agent) AddCheck(check *structs.HealthCheck) {
|
func (l *localState) AddCheck(check *structs.HealthCheck) {
|
||||||
// Set the node name
|
// Set the node name
|
||||||
check.Node = a.config.NodeName
|
check.Node = l.config.NodeName
|
||||||
|
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
a.state.checks[check.CheckID] = check
|
l.checks[check.CheckID] = check
|
||||||
a.state.checkStatus[check.CheckID] = syncStatus{}
|
l.checkStatus[check.CheckID] = syncStatus{}
|
||||||
a.state.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveCheck is used to remove a health check from the local state.
|
// RemoveCheck is used to remove a health check from the local state.
|
||||||
// The agent will make a best effort to ensure it is deregistered
|
// The agent will make a best effort to ensure it is deregistered
|
||||||
func (a *Agent) RemoveCheck(checkID string) {
|
func (l *localState) RemoveCheck(checkID string) {
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
delete(a.state.checks, checkID)
|
delete(l.checks, checkID)
|
||||||
a.state.checkStatus[checkID] = syncStatus{remoteDelete: true}
|
l.checkStatus[checkID] = syncStatus{remoteDelete: true}
|
||||||
a.state.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateCheck is used to update the status of a check
|
// UpdateCheck is used to update the status of a check
|
||||||
func (a *Agent) UpdateCheck(checkID, status, output string) {
|
func (l *localState) UpdateCheck(checkID, status, output string) {
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
check, ok := a.state.checks[checkID]
|
check, ok := l.checks[checkID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -146,18 +151,18 @@ func (a *Agent) UpdateCheck(checkID, status, output string) {
|
||||||
// Update status and mark out of sync
|
// Update status and mark out of sync
|
||||||
check.Status = status
|
check.Status = status
|
||||||
check.Notes = output
|
check.Notes = output
|
||||||
a.state.checkStatus[checkID] = syncStatus{inSync: false}
|
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||||
a.state.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks returns the locally registered checks that the
|
// Checks returns the locally registered checks that the
|
||||||
// agent is aware of and are being kept in sync with the server
|
// agent is aware of and are being kept in sync with the server
|
||||||
func (a *Agent) Checks() map[string]*structs.HealthCheck {
|
func (l *localState) Checks() map[string]*structs.HealthCheck {
|
||||||
checks := make(map[string]*structs.HealthCheck)
|
checks := make(map[string]*structs.HealthCheck)
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
for name, check := range a.state.checks {
|
for name, check := range l.checks {
|
||||||
checks[name] = check
|
checks[name] = check
|
||||||
}
|
}
|
||||||
return checks
|
return checks
|
||||||
|
@ -165,32 +170,27 @@ func (a *Agent) Checks() map[string]*structs.HealthCheck {
|
||||||
|
|
||||||
// antiEntropy is a long running method used to perform anti-entropy
|
// antiEntropy is a long running method used to perform anti-entropy
|
||||||
// between local and remote state.
|
// between local and remote state.
|
||||||
func (a *Agent) antiEntropy() {
|
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
|
||||||
// Delay the initial sync until client has a chance to register
|
|
||||||
select {
|
|
||||||
case <-a.state.delaySync:
|
|
||||||
case <-time.After(maxDelaySync):
|
|
||||||
a.logger.Printf("[WARN] Client failed to call RegisterDone within %v", maxDelaySync)
|
|
||||||
case <-a.shutdownCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
SYNC:
|
SYNC:
|
||||||
// Sync our state with the servers
|
// Sync our state with the servers
|
||||||
for !a.shutdown {
|
for {
|
||||||
if err := a.setSyncState(); err != nil {
|
if err := l.setSyncState(); err != nil {
|
||||||
a.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||||
time.Sleep(aeScale(syncRetryIntv, len(a.LANMembers())))
|
select {
|
||||||
continue
|
case <-time.After(aeScale(syncRetryIntv, len(l.iface.LANMembers()))):
|
||||||
|
continue
|
||||||
|
case <-shutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Force-trigger AE to pickup any changes
|
// Force-trigger AE to pickup any changes
|
||||||
a.state.changeMade()
|
l.changeMade()
|
||||||
|
|
||||||
// Schedule the next full sync, with a random stagger
|
// Schedule the next full sync, with a random stagger
|
||||||
aeIntv := aeScale(a.config.AEInterval, len(a.LANMembers()))
|
aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers()))
|
||||||
aeIntv = aeIntv + randomStagger(aeIntv)
|
aeIntv = aeIntv + randomStagger(aeIntv)
|
||||||
aeTimer := time.After(aeIntv)
|
aeTimer := time.After(aeIntv)
|
||||||
|
|
||||||
|
@ -199,11 +199,11 @@ SYNC:
|
||||||
select {
|
select {
|
||||||
case <-aeTimer:
|
case <-aeTimer:
|
||||||
goto SYNC
|
goto SYNC
|
||||||
case <-a.state.triggerCh:
|
case <-l.triggerCh:
|
||||||
if err := a.syncChanges(); err != nil {
|
if err := l.syncChanges(); err != nil {
|
||||||
a.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
l.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||||
}
|
}
|
||||||
case <-a.shutdownCh:
|
case <-shutdownCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -211,89 +211,89 @@ SYNC:
|
||||||
|
|
||||||
// setSyncState does a read of the server state, and updates
|
// setSyncState does a read of the server state, and updates
|
||||||
// the local syncStatus as appropriate
|
// the local syncStatus as appropriate
|
||||||
func (a *Agent) setSyncState() error {
|
func (l *localState) setSyncState() error {
|
||||||
req := structs.NodeSpecificRequest{
|
req := structs.NodeSpecificRequest{
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: a.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
}
|
}
|
||||||
var services structs.NodeServices
|
var services structs.NodeServices
|
||||||
var checks structs.HealthChecks
|
var checks structs.HealthChecks
|
||||||
if e := a.RPC("Catalog.NodeServices", &req, &services); e != nil {
|
if e := l.iface.RPC("Catalog.NodeServices", &req, &services); e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
if err := l.iface.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
for id, service := range services.Services {
|
for id, service := range services.Services {
|
||||||
// If we don't have the service locally, deregister it
|
// If we don't have the service locally, deregister it
|
||||||
existing, ok := a.state.services[id]
|
existing, ok := l.services[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
// The Consul service is created automatically, and
|
// The Consul service is created automatically, and
|
||||||
// does not need to be registered
|
// does not need to be registered
|
||||||
if id == consul.ConsulServiceID && a.config.Server {
|
if id == consul.ConsulServiceID && l.config.Server {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
a.state.serviceStatus[id] = syncStatus{remoteDelete: true}
|
l.serviceStatus[id] = syncStatus{remoteDelete: true}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If our definition is different, we need to update it
|
// If our definition is different, we need to update it
|
||||||
equal := reflect.DeepEqual(existing, service)
|
equal := reflect.DeepEqual(existing, service)
|
||||||
a.state.serviceStatus[id] = syncStatus{inSync: equal}
|
l.serviceStatus[id] = syncStatus{inSync: equal}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, check := range checks {
|
for _, check := range checks {
|
||||||
// If we don't have the check locally, deregister it
|
// If we don't have the check locally, deregister it
|
||||||
id := check.CheckID
|
id := check.CheckID
|
||||||
existing, ok := a.state.checks[id]
|
existing, ok := l.checks[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
// The Serf check is created automatically, and does not
|
// The Serf check is created automatically, and does not
|
||||||
// need to be registered
|
// need to be registered
|
||||||
if id == consul.SerfCheckID {
|
if id == consul.SerfCheckID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
a.state.checkStatus[id] = syncStatus{remoteDelete: true}
|
l.checkStatus[id] = syncStatus{remoteDelete: true}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If our definition is different, we need to update it
|
// If our definition is different, we need to update it
|
||||||
equal := reflect.DeepEqual(existing, check)
|
equal := reflect.DeepEqual(existing, check)
|
||||||
a.state.checkStatus[id] = syncStatus{inSync: equal}
|
l.checkStatus[id] = syncStatus{inSync: equal}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncChanges is used to scan the status our local services and checks
|
// syncChanges is used to scan the status our local services and checks
|
||||||
// and update any that are out of sync with the server
|
// and update any that are out of sync with the server
|
||||||
func (a *Agent) syncChanges() error {
|
func (l *localState) syncChanges() error {
|
||||||
a.state.Lock()
|
l.Lock()
|
||||||
defer a.state.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
// Sync the services
|
// Sync the services
|
||||||
for id, status := range a.state.serviceStatus {
|
for id, status := range l.serviceStatus {
|
||||||
if status.remoteDelete {
|
if status.remoteDelete {
|
||||||
if err := a.deleteService(id); err != nil {
|
if err := l.deleteService(id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if !status.inSync {
|
} else if !status.inSync {
|
||||||
if err := a.syncService(id); err != nil {
|
if err := l.syncService(id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync the checks
|
// Sync the checks
|
||||||
for id, status := range a.state.checkStatus {
|
for id, status := range l.checkStatus {
|
||||||
if status.remoteDelete {
|
if status.remoteDelete {
|
||||||
if err := a.deleteCheck(id); err != nil {
|
if err := l.deleteCheck(id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if !status.inSync {
|
} else if !status.inSync {
|
||||||
if err := a.syncCheck(id); err != nil {
|
if err := l.syncCheck(id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -302,67 +302,67 @@ func (a *Agent) syncChanges() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteService is used to delete a service from the server
|
// deleteService is used to delete a service from the server
|
||||||
func (a *Agent) deleteService(id string) error {
|
func (l *localState) deleteService(id string) error {
|
||||||
req := structs.DeregisterRequest{
|
req := structs.DeregisterRequest{
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: a.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
ServiceID: id,
|
ServiceID: id,
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := a.RPC("Catalog.Deregister", &req, &out)
|
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
delete(a.state.serviceStatus, id)
|
delete(l.serviceStatus, id)
|
||||||
a.logger.Printf("[INFO] Deregistered service '%s'", id)
|
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteCheck is used to delete a service from the server
|
// deleteCheck is used to delete a service from the server
|
||||||
func (a *Agent) deleteCheck(id string) error {
|
func (l *localState) deleteCheck(id string) error {
|
||||||
req := structs.DeregisterRequest{
|
req := structs.DeregisterRequest{
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: a.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
CheckID: id,
|
CheckID: id,
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := a.RPC("Catalog.Deregister", &req, &out)
|
err := l.iface.RPC("Catalog.Deregister", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
delete(a.state.checkStatus, id)
|
delete(l.checkStatus, id)
|
||||||
a.logger.Printf("[INFO] Deregistered check '%s'", id)
|
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncService is used to sync a service to the server
|
// syncService is used to sync a service to the server
|
||||||
func (a *Agent) syncService(id string) error {
|
func (l *localState) syncService(id string) error {
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: a.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
Address: a.config.AdvertiseAddr,
|
Address: l.config.AdvertiseAddr,
|
||||||
Service: a.state.services[id],
|
Service: l.services[id],
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := a.RPC("Catalog.Register", &req, &out)
|
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
a.state.serviceStatus[id] = syncStatus{inSync: true}
|
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||||
a.logger.Printf("[INFO] Synced service '%s'", id)
|
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncCheck is used to sync a service to the server
|
// syncCheck is used to sync a service to the server
|
||||||
func (a *Agent) syncCheck(id string) error {
|
func (l *localState) syncCheck(id string) error {
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: a.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
Address: a.config.AdvertiseAddr,
|
Address: l.config.AdvertiseAddr,
|
||||||
Check: a.state.checks[id],
|
Check: l.checks[id],
|
||||||
}
|
}
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := a.RPC("Catalog.Register", &req, &out)
|
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
a.state.checkStatus[id] = syncStatus{inSync: true}
|
l.checkStatus[id] = syncStatus{inSync: true}
|
||||||
a.logger.Printf("[INFO] Synced check '%s'", id)
|
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||||
Tag: "master",
|
Tag: "master",
|
||||||
Port: 5000,
|
Port: 5000,
|
||||||
}
|
}
|
||||||
agent.AddService(srv1)
|
agent.state.AddService(srv1)
|
||||||
args.Service = srv1
|
args.Service = srv1
|
||||||
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -45,7 +45,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||||
Tag: "",
|
Tag: "",
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
}
|
}
|
||||||
agent.AddService(srv2)
|
agent.state.AddService(srv2)
|
||||||
|
|
||||||
srv2_mod := new(structs.NodeService)
|
srv2_mod := new(structs.NodeService)
|
||||||
*srv2_mod = *srv2
|
*srv2_mod = *srv2
|
||||||
|
@ -62,7 +62,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||||
Tag: "",
|
Tag: "",
|
||||||
Port: 80,
|
Port: 80,
|
||||||
}
|
}
|
||||||
agent.AddService(srv3)
|
agent.state.AddService(srv3)
|
||||||
|
|
||||||
// Exists remote (delete)
|
// Exists remote (delete)
|
||||||
srv4 := &structs.NodeService{
|
srv4 := &structs.NodeService{
|
||||||
|
@ -77,7 +77,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger anti-entropy run and wait
|
// Trigger anti-entropy run and wait
|
||||||
agent.RegistrationDone()
|
agent.StartSync()
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
// Verify that we are in sync
|
// Verify that we are in sync
|
||||||
|
@ -155,7 +155,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||||
Name: "mysql",
|
Name: "mysql",
|
||||||
Status: structs.HealthPassing,
|
Status: structs.HealthPassing,
|
||||||
}
|
}
|
||||||
agent.AddCheck(chk1)
|
agent.state.AddCheck(chk1)
|
||||||
args.Check = chk1
|
args.Check = chk1
|
||||||
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -168,7 +168,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||||
Name: "redis",
|
Name: "redis",
|
||||||
Status: structs.HealthPassing,
|
Status: structs.HealthPassing,
|
||||||
}
|
}
|
||||||
agent.AddCheck(chk2)
|
agent.state.AddCheck(chk2)
|
||||||
|
|
||||||
chk2_mod := new(structs.HealthCheck)
|
chk2_mod := new(structs.HealthCheck)
|
||||||
*chk2_mod = *chk2
|
*chk2_mod = *chk2
|
||||||
|
@ -185,7 +185,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||||
Name: "web",
|
Name: "web",
|
||||||
Status: structs.HealthPassing,
|
Status: structs.HealthPassing,
|
||||||
}
|
}
|
||||||
agent.AddCheck(chk3)
|
agent.state.AddCheck(chk3)
|
||||||
|
|
||||||
// Exists remote (delete)
|
// Exists remote (delete)
|
||||||
chk4 := &structs.HealthCheck{
|
chk4 := &structs.HealthCheck{
|
||||||
|
@ -200,7 +200,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger anti-entropy run and wait
|
// Trigger anti-entropy run and wait
|
||||||
agent.RegistrationDone()
|
agent.StartSync()
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
// Verify that we are in sync
|
// Verify that we are in sync
|
||||||
|
|
Loading…
Reference in New Issue