|
|
|
@ -19,6 +19,10 @@ var (
|
|
|
|
|
// ErrMissingService is the error we return if trying an
|
|
|
|
|
// operation which requires a service but none exists.
|
|
|
|
|
ErrMissingService = errors.New("Missing service registration")
|
|
|
|
|
|
|
|
|
|
// ErrMissingSessionID is returned when a session registration
|
|
|
|
|
// is attempted with an empty session ID.
|
|
|
|
|
ErrMissingSessionID = errors.New("Missing session ID")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// StateStore is where we store all of Consul's state, including
|
|
|
|
@ -36,6 +40,16 @@ type IndexEntry struct {
|
|
|
|
|
Value uint64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sessionCheck is used to create a many-to-one table such that
|
|
|
|
|
// each check registered by a session can be mapped back to the
|
|
|
|
|
// session table. This is only used internally in the state
|
|
|
|
|
// store and thus it is not exported.
|
|
|
|
|
type sessionCheck struct {
|
|
|
|
|
Node string
|
|
|
|
|
CheckID string
|
|
|
|
|
Session string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewStateStore creates a new in-memory state storage layer.
|
|
|
|
|
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
|
|
|
|
|
// Create the in-memory DB
|
|
|
|
@ -965,3 +979,108 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
|
|
|
|
|
tx.Commit()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SessionCreate is used to register a new session in the state store.
|
|
|
|
|
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
|
|
|
|
|
tx := s.db.Txn(true)
|
|
|
|
|
defer tx.Abort()
|
|
|
|
|
|
|
|
|
|
// Call the session creation
|
|
|
|
|
if err := s.sessionCreateTxn(idx, sess, tx); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tx.Commit()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sessionCreateTxn is the inner method used for creating session entries in
|
|
|
|
|
// an open transaction. Any health checks registered with the session will be
|
|
|
|
|
// checked for failing status. Returns any error encountered.
|
|
|
|
|
func (s *StateStore) sessionCreateTxn(idx uint64, sess *structs.Session, tx *memdb.Txn) error {
|
|
|
|
|
// Check that we have a session ID
|
|
|
|
|
if sess.ID == "" {
|
|
|
|
|
return ErrMissingSessionID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify the session behavior is valid
|
|
|
|
|
switch sess.Behavior {
|
|
|
|
|
case "":
|
|
|
|
|
// Release by default to preserve backwards compatibility
|
|
|
|
|
sess.Behavior = structs.SessionKeysRelease
|
|
|
|
|
case structs.SessionKeysRelease:
|
|
|
|
|
case structs.SessionKeysDelete:
|
|
|
|
|
default:
|
|
|
|
|
return fmt.Errorf("Invalid session behavior: %s", sess.Behavior)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Assign the indexes. ModifyIndex likely will not be used but
|
|
|
|
|
// we set it here anyways for sanity.
|
|
|
|
|
sess.CreateIndex = idx
|
|
|
|
|
sess.ModifyIndex = idx
|
|
|
|
|
|
|
|
|
|
// Check that the node exists
|
|
|
|
|
node, err := tx.First("nodes", "id", sess.Node)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed node lookup: %s", err)
|
|
|
|
|
}
|
|
|
|
|
if node == nil {
|
|
|
|
|
return ErrMissingNode
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Go over the session checks and ensure they exist.
|
|
|
|
|
for _, checkID := range sess.Checks {
|
|
|
|
|
check, err := tx.First("checks", "id", sess.Node, checkID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed check lookup: %s", err)
|
|
|
|
|
}
|
|
|
|
|
if check == nil {
|
|
|
|
|
return fmt.Errorf("Missing check '%s' registration", checkID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check that the check is not in critical state
|
|
|
|
|
status := check.(*structs.HealthCheck).Status
|
|
|
|
|
if status == structs.HealthCritical {
|
|
|
|
|
return fmt.Errorf("Check '%s' is in %s state", status)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Insert the session
|
|
|
|
|
if err := tx.Insert("sessions", sess); err != nil {
|
|
|
|
|
return fmt.Errorf("failed inserting session: %s", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Insert the check mappings
|
|
|
|
|
for _, checkID := range sess.Checks {
|
|
|
|
|
check := &sessionCheck{
|
|
|
|
|
Node: sess.Node,
|
|
|
|
|
CheckID: checkID,
|
|
|
|
|
Session: sess.ID,
|
|
|
|
|
}
|
|
|
|
|
if err := tx.Insert("session_checks", check); err != nil {
|
|
|
|
|
return fmt.Errorf("failed inserting session check mapping: %s", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update the index
|
|
|
|
|
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
|
|
|
|
|
return fmt.Errorf("failed updating index: %s", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetSession is used to retrieve an active session from the state store.
|
|
|
|
|
func (s *StateStore) GetSession(sessionID string) (*structs.Session, error) {
|
|
|
|
|
tx := s.db.Txn(false)
|
|
|
|
|
defer tx.Abort()
|
|
|
|
|
|
|
|
|
|
// Look up the session by its ID
|
|
|
|
|
session, err := tx.First("sessions", "id", sessionID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed session lookup: %s", err)
|
|
|
|
|
}
|
|
|
|
|
if session != nil {
|
|
|
|
|
return session.(*structs.Session), nil
|
|
|
|
|
}
|
|
|
|
|
return nil, nil
|
|
|
|
|
}
|
|
|
|
|