|
|
|
@ -14,7 +14,8 @@ import (
|
|
|
|
|
// along with Raft to provide strong consistency. We implement
|
|
|
|
|
// this outside the Server to avoid exposing this outside the package.
|
|
|
|
|
type consulFSM struct { |
|
|
|
|
state *StateStore |
|
|
|
|
logger *log.Logger |
|
|
|
|
state *StateStore |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// consulSnapshot is used to provide a snapshot of the current
|
|
|
|
@ -25,14 +26,15 @@ type consulSnapshot struct {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewFSM is used to construct a new FSM with a blank state
|
|
|
|
|
func NewFSM() (*consulFSM, error) { |
|
|
|
|
func NewFSM(logOutput io.Writer) (*consulFSM, error) { |
|
|
|
|
state, err := NewStateStore() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fsm := &consulFSM{ |
|
|
|
|
state: state, |
|
|
|
|
logger: log.New(logOutput, "", log.LstdFlags), |
|
|
|
|
state: state, |
|
|
|
|
} |
|
|
|
|
return fsm, nil |
|
|
|
|
} |
|
|
|
@ -65,17 +67,26 @@ func (c *consulFSM) decodeRegister(buf []byte) interface{} {
|
|
|
|
|
func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} { |
|
|
|
|
// Ensure the node
|
|
|
|
|
node := structs.Node{req.Node, req.Address} |
|
|
|
|
c.state.EnsureNode(node) |
|
|
|
|
if err := c.state.EnsureNode(node); err != nil { |
|
|
|
|
c.logger.Printf("[INFO] consul.fsm: EnsureNode failed: %v", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Ensure the service if provided
|
|
|
|
|
if req.Service != nil { |
|
|
|
|
c.state.EnsureService(req.Node, req.Service.ID, req.Service.Service, |
|
|
|
|
req.Service.Tag, req.Service.Port) |
|
|
|
|
if err := c.state.EnsureService(req.Node, req.Service.ID, req.Service.Service, |
|
|
|
|
req.Service.Tag, req.Service.Port); err != nil { |
|
|
|
|
c.logger.Printf("[INFO] consul.fsm: EnsureService failed: %v", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Ensure the check if provided
|
|
|
|
|
if req.Check != nil { |
|
|
|
|
c.state.EnsureCheck(req.Check) |
|
|
|
|
if err := c.state.EnsureCheck(req.Check); err != nil { |
|
|
|
|
c.logger.Printf("[INFO] consul.fsm: EnsureCheck failed: %v", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
@ -89,18 +100,27 @@ func (c *consulFSM) applyDeregister(buf []byte) interface{} {
|
|
|
|
|
|
|
|
|
|
// Either remove the service entry or the whole node
|
|
|
|
|
if req.ServiceID != "" { |
|
|
|
|
c.state.DeleteNodeService(req.Node, req.ServiceID) |
|
|
|
|
if err := c.state.DeleteNodeService(req.Node, req.ServiceID); err != nil { |
|
|
|
|
c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} else if req.CheckID != "" { |
|
|
|
|
c.state.DeleteNodeCheck(req.Node, req.CheckID) |
|
|
|
|
if err := c.state.DeleteNodeCheck(req.Node, req.CheckID); err != nil { |
|
|
|
|
c.logger.Printf("[INFO] consul.fsm: DeleteNodeCheck failed: %v", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
c.state.DeleteNode(req.Node) |
|
|
|
|
if err := c.state.DeleteNode(req.Node); err != nil { |
|
|
|
|
c.logger.Printf("[INFO] consul.fsm: DeleteNode failed: %v", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { |
|
|
|
|
defer func(start time.Time) { |
|
|
|
|
log.Printf("[INFO] consul: FSM snapshot created in %v", time.Now().Sub(start)) |
|
|
|
|
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) |
|
|
|
|
}(time.Now()) |
|
|
|
|
|
|
|
|
|
// Create a new snapshot
|
|
|
|
|