|
|
|
@ -125,6 +125,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|
|
|
|
|
|
|
|
|
func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
|
|
|
|
|
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
|
|
|
|
|
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
|
|
|
|
|
var req structs.RegisterRequest
|
|
|
|
|
if err := structs.Decode(buf, &req); err != nil {
|
|
|
|
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
|
|
@ -140,6 +141,7 @@ func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
|
|
|
|
|
|
|
|
|
|
func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
|
|
|
|
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
|
|
|
|
|
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
|
|
|
|
|
var req structs.DeregisterRequest
|
|
|
|
|
if err := structs.Decode(buf, &req); err != nil {
|
|
|
|
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
|
|
@ -174,6 +176,8 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
|
|
|
|
}
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
switch req.Op {
|
|
|
|
|
case api.KVSet:
|
|
|
|
|
return c.state.KVSSet(index, &req.DirEnt)
|
|
|
|
@ -219,6 +223,8 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
|
|
|
|
|
}
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
switch req.Op {
|
|
|
|
|
case structs.SessionCreate:
|
|
|
|
|
if err := c.state.SessionCreate(index, &req.Session); err != nil {
|
|
|
|
@ -240,6 +246,8 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
|
|
|
|
|
}
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
switch req.Op {
|
|
|
|
|
case structs.ACLBootstrapInit:
|
|
|
|
|
enabled, err := c.state.ACLBootstrapInit(index)
|
|
|
|
@ -272,6 +280,8 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
|
|
|
|
|
}
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
switch req.Op {
|
|
|
|
|
case structs.TombstoneReap:
|
|
|
|
|
return c.state.ReapTombstones(req.ReapIndex)
|
|
|
|
@ -291,6 +301,7 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa
|
|
|
|
|
panic(fmt.Errorf("failed to decode batch updates: %v", err))
|
|
|
|
|
}
|
|
|
|
|
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now())
|
|
|
|
|
defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now())
|
|
|
|
|
if err := c.state.CoordinateBatchUpdate(index, updates); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -307,6 +318,8 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
|
|
|
|
|
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(),
|
|
|
|
|
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
|
|
|
|
switch req.Op {
|
|
|
|
|
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
|
|
|
|
|
return c.state.PreparedQuerySet(index, req.Query)
|
|
|
|
@ -324,6 +337,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
|
|
|
|
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
|
|
|
}
|
|
|
|
|
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
|
|
|
|
|
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
|
|
|
|
|
results, errors := c.state.TxnRW(index, req.Ops)
|
|
|
|
|
return structs.TxnResponse{
|
|
|
|
|
Results: results,
|
|
|
|
@ -337,6 +351,7 @@ func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
|
|
|
|
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
|
|
|
}
|
|
|
|
|
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
|
|
|
|
|
defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now())
|
|
|
|
|
|
|
|
|
|
if req.CAS {
|
|
|
|
|
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
|
|
|
|
@ -506,6 +521,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|
|
|
|
|
|
|
|
|
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
|
|
|
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
|
|
|
|
|
defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now())
|
|
|
|
|
|
|
|
|
|
// Register the nodes
|
|
|
|
|
encoder := codec.NewEncoder(sink, msgpackHandle)
|
|
|
|
|