mirror of https://github.com/hashicorp/consul
First pass at local state + anti-entropy
parent
554ebdfc9a
commit
1feec1adf8
|
@ -3,6 +3,7 @@ package agent
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/hashicorp/consul/consul"
|
"github.com/hashicorp/consul/consul"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
@ -36,6 +37,10 @@ type Agent struct {
|
||||||
shutdown bool
|
shutdown bool
|
||||||
shutdownCh chan struct{}
|
shutdownCh chan struct{}
|
||||||
shutdownLock sync.Mutex
|
shutdownLock sync.Mutex
|
||||||
|
|
||||||
|
// state stores a local representation of the node,
|
||||||
|
// services and checks. Used for anti-entropy.
|
||||||
|
state localState
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create is used to create a new Agent. Returns
|
// Create is used to create a new Agent. Returns
|
||||||
|
@ -77,6 +82,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
||||||
logger: log.New(logOutput, "", log.LstdFlags),
|
logger: log.New(logOutput, "", log.LstdFlags),
|
||||||
logOutput: logOutput,
|
logOutput: logOutput,
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
|
state: localState{
|
||||||
|
delaySync: make(chan struct{}, 1),
|
||||||
|
services: make(map[string]*structs.NodeService),
|
||||||
|
serviceStatus: make(map[string]syncStatus),
|
||||||
|
checks: make(map[string]*structs.HealthCheck),
|
||||||
|
checkStatus: make(map[string]syncStatus),
|
||||||
|
triggerCh: make(chan struct{}, 1),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup either the client or the server
|
// Setup either the client or the server
|
||||||
|
@ -90,6 +103,8 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start the anti entropy routine
|
||||||
|
go agent.antiEntropy()
|
||||||
return agent, nil
|
return agent, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -196,6 +196,11 @@ func (c *Command) Run(args []string) int {
|
||||||
defer c.httpServer.Shutdown()
|
defer c.httpServer.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Register services/checks
|
||||||
|
|
||||||
|
// Let the agent know we've finished registration
|
||||||
|
c.agent.RegistrationDone()
|
||||||
|
|
||||||
c.Ui.Output("Consul agent running!")
|
c.Ui.Output("Consul agent running!")
|
||||||
c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
|
c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
|
||||||
c.Ui.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter))
|
c.Ui.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter))
|
||||||
|
|
|
@ -0,0 +1,322 @@
|
||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
syncRetryIntv = 30 * time.Second
|
||||||
|
maxDelaySync = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// syncStatus is used to represent the difference between
|
||||||
|
// the local and remote state, and if action needs to be taken
|
||||||
|
type syncStatus struct {
|
||||||
|
remoteDelete bool // Should this be deleted from the server
|
||||||
|
inSync bool // Is this in sync with the server
|
||||||
|
}
|
||||||
|
|
||||||
|
// localState is used to represent the node's services,
|
||||||
|
// and checks. We used it to perform anti-entropy with the
|
||||||
|
// catalog representation
|
||||||
|
type localState struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
// delaySync is used to delay the initial sync until
|
||||||
|
// the client has registered its services and checks.
|
||||||
|
delaySync chan struct{}
|
||||||
|
|
||||||
|
// Services tracks the local services
|
||||||
|
services map[string]*structs.NodeService
|
||||||
|
serviceStatus map[string]syncStatus
|
||||||
|
|
||||||
|
// Checks tracks the local checks
|
||||||
|
checks map[string]*structs.HealthCheck
|
||||||
|
checkStatus map[string]syncStatus
|
||||||
|
|
||||||
|
// triggerCh is used to inform of a change to local state
|
||||||
|
// that requires anti-entropy with the server
|
||||||
|
triggerCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// changeMade is used to trigger an anti-entropy run
|
||||||
|
func (l *localState) changeMade() {
|
||||||
|
select {
|
||||||
|
case l.triggerCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegistrationDone is called by the Agent client once base Services
|
||||||
|
// and Checks are registered. This is called to prevent a race
|
||||||
|
// between clients and the anti-entropy routines
|
||||||
|
func (a *Agent) RegistrationDone() {
|
||||||
|
select {
|
||||||
|
case a.state.delaySync <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddService is used to add a service entry to the local state.
|
||||||
|
// This entry is persistent and the agent will make a best effort to
|
||||||
|
// ensure it is registered
|
||||||
|
func (a *Agent) AddService(service *structs.NodeService) {
|
||||||
|
a.state.Lock()
|
||||||
|
defer a.state.Unlock()
|
||||||
|
|
||||||
|
a.state.services[service.ID] = service
|
||||||
|
a.state.serviceStatus[service.ID] = syncStatus{}
|
||||||
|
a.state.changeMade()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveService is used to remove a service entry from the local state.
|
||||||
|
// The agent will make a best effort to ensure it is deregistered
|
||||||
|
func (a *Agent) RemoveService(serviceID string) {
|
||||||
|
a.state.Lock()
|
||||||
|
defer a.state.Unlock()
|
||||||
|
|
||||||
|
delete(a.state.services, serviceID)
|
||||||
|
a.state.serviceStatus[serviceID] = syncStatus{remoteDelete: true}
|
||||||
|
a.state.changeMade()
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddCheck is used to add a health check to the local state.
|
||||||
|
// This entry is persistent and the agent will make a best effort to
|
||||||
|
// ensure it is registered
|
||||||
|
func (a *Agent) AddCheck(check *structs.HealthCheck) {
|
||||||
|
a.state.Lock()
|
||||||
|
defer a.state.Unlock()
|
||||||
|
|
||||||
|
a.state.checks[check.CheckID] = check
|
||||||
|
a.state.checkStatus[check.CheckID] = syncStatus{}
|
||||||
|
a.state.changeMade()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveCheck is used to remove a health check from the local state.
|
||||||
|
// The agent will make a best effort to ensure it is deregistered
|
||||||
|
func (a *Agent) RemoveCheck(checkID string) {
|
||||||
|
a.state.Lock()
|
||||||
|
defer a.state.Unlock()
|
||||||
|
|
||||||
|
delete(a.state.checks, checkID)
|
||||||
|
a.state.checkStatus[checkID] = syncStatus{remoteDelete: true}
|
||||||
|
a.state.changeMade()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateCheck is used to update the status of a check
|
||||||
|
func (a *Agent) UpdateCheck(checkID, status string) {
|
||||||
|
a.state.Lock()
|
||||||
|
defer a.state.Unlock()
|
||||||
|
|
||||||
|
check, ok := a.state.checks[checkID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do nothing if update is idempotent
|
||||||
|
if check.Status == status {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update status and mark out of sync
|
||||||
|
check.Status = status
|
||||||
|
a.state.checkStatus[checkID] = syncStatus{inSync: false}
|
||||||
|
a.state.changeMade()
|
||||||
|
}
|
||||||
|
|
||||||
|
// antiEntropy is a long running method used to perform anti-entropy
|
||||||
|
// between local and remote state.
|
||||||
|
func (a *Agent) antiEntropy() {
|
||||||
|
// Delay the initial sync until client has a chance to register
|
||||||
|
select {
|
||||||
|
case <-a.state.delaySync:
|
||||||
|
case <-time.After(maxDelaySync):
|
||||||
|
a.logger.Printf("[WARN] Client failed to call RegisterDone within %v", maxDelaySync)
|
||||||
|
case <-a.shutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
SYNC:
|
||||||
|
// Sync our state with the servers
|
||||||
|
for !a.shutdown {
|
||||||
|
if err := a.setSyncState(); err != nil {
|
||||||
|
a.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||||
|
time.Sleep(aeScale(syncRetryIntv, len(a.LANMembers())))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force-trigger AE to pickup any changes
|
||||||
|
a.state.changeMade()
|
||||||
|
|
||||||
|
// Schedule the next full sync, with a random stagger
|
||||||
|
aeIntv := aeScale(a.config.AEInterval, len(a.LANMembers()))
|
||||||
|
aeIntv = aeIntv + randomStagger(aeIntv)
|
||||||
|
aeTimer := time.After(aeIntv)
|
||||||
|
|
||||||
|
// Wait for sync events
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-aeTimer:
|
||||||
|
goto SYNC
|
||||||
|
case <-a.state.triggerCh:
|
||||||
|
if err := a.syncChanges(); err != nil {
|
||||||
|
a.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||||
|
}
|
||||||
|
case <-a.shutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// setSyncState does a read of the server state, and updates
|
||||||
|
// the local syncStatus as appropriate
|
||||||
|
func (a *Agent) setSyncState() error {
|
||||||
|
req := structs.NodeSpecificRequest{
|
||||||
|
Datacenter: a.config.Datacenter,
|
||||||
|
Node: a.config.NodeName,
|
||||||
|
}
|
||||||
|
var services structs.NodeServices
|
||||||
|
var checks structs.HealthChecks
|
||||||
|
if e := a.RPC("Catalog.NodeServices", &req, &services); e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.state.Lock()
|
||||||
|
defer a.state.Unlock()
|
||||||
|
|
||||||
|
for id, service := range services.Services {
|
||||||
|
// If we don't have the service locally, deregister it
|
||||||
|
existing, ok := a.state.services[id]
|
||||||
|
if !ok {
|
||||||
|
a.state.serviceStatus[id] = syncStatus{remoteDelete: true}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If our definition is different, we need to update it
|
||||||
|
equal := !reflect.DeepEqual(existing, service)
|
||||||
|
a.state.serviceStatus[id] = syncStatus{inSync: equal}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, check := range checks {
|
||||||
|
// If we don't have the check locally, deregister it
|
||||||
|
id := check.CheckID
|
||||||
|
existing, ok := a.state.checks[id]
|
||||||
|
if !ok {
|
||||||
|
a.state.checkStatus[id] = syncStatus{remoteDelete: true}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If our definition is different, we need to update it
|
||||||
|
equal := !reflect.DeepEqual(existing, check)
|
||||||
|
a.state.checkStatus[id] = syncStatus{inSync: equal}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncChanges is used to scan the status our local services and checks
|
||||||
|
// and update any that are out of sync with the server
|
||||||
|
func (a *Agent) syncChanges() error {
|
||||||
|
a.state.Lock()
|
||||||
|
defer a.state.Unlock()
|
||||||
|
|
||||||
|
// Sync the services
|
||||||
|
for id, status := range a.state.serviceStatus {
|
||||||
|
if status.remoteDelete {
|
||||||
|
if err := a.deleteService(id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if !status.inSync {
|
||||||
|
if err := a.syncService(id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync the checks
|
||||||
|
for id, status := range a.state.checkStatus {
|
||||||
|
if status.remoteDelete {
|
||||||
|
if err := a.deleteCheck(id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if !status.inSync {
|
||||||
|
if err := a.syncCheck(id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteService is used to delete a service from the server
|
||||||
|
func (a *Agent) deleteService(id string) error {
|
||||||
|
req := structs.DeregisterRequest{
|
||||||
|
Datacenter: a.config.Datacenter,
|
||||||
|
Node: a.config.NodeName,
|
||||||
|
ServiceID: id,
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
err := a.RPC("Catalog.Deregister", &req, &out)
|
||||||
|
if err == nil {
|
||||||
|
delete(a.state.serviceStatus, id)
|
||||||
|
a.logger.Printf("[INFO] Deregistered service '%s'", id)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteCheck is used to delete a service from the server
|
||||||
|
func (a *Agent) deleteCheck(id string) error {
|
||||||
|
req := structs.DeregisterRequest{
|
||||||
|
Datacenter: a.config.Datacenter,
|
||||||
|
Node: a.config.NodeName,
|
||||||
|
CheckID: id,
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
err := a.RPC("Catalog.Deregister", &req, &out)
|
||||||
|
if err == nil {
|
||||||
|
delete(a.state.checkStatus, id)
|
||||||
|
a.logger.Printf("[INFO] Deregistered check '%s'", id)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncService is used to sync a service to the server
|
||||||
|
func (a *Agent) syncService(id string) error {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: a.config.Datacenter,
|
||||||
|
Node: a.config.NodeName,
|
||||||
|
Address: a.config.AdvertiseAddr,
|
||||||
|
Service: a.state.services[id],
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
err := a.RPC("Catalog.Register", &req, &out)
|
||||||
|
if err == nil {
|
||||||
|
a.state.serviceStatus[id] = syncStatus{inSync: true}
|
||||||
|
a.logger.Printf("[INFO] Synced service '%s'", id)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncCheck is used to sync a service to the server
|
||||||
|
func (a *Agent) syncCheck(id string) error {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: a.config.Datacenter,
|
||||||
|
Node: a.config.NodeName,
|
||||||
|
Address: a.config.AdvertiseAddr,
|
||||||
|
Check: a.state.checks[id],
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
err := a.RPC("Catalog.Register", &req, &out)
|
||||||
|
if err == nil {
|
||||||
|
a.state.checkStatus[id] = syncStatus{inSync: true}
|
||||||
|
a.logger.Printf("[INFO] Synced check '%s'", id)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in New Issue