From a4c202aa9098bdd8d10c824e3ce5ff1cf8ddf202 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Thu, 3 Sep 2015 19:11:12 -0700 Subject: [PATCH] consul/state: adding session registration --- consul/state/schema.go | 26 +++++++ consul/state/state_store.go | 119 +++++++++++++++++++++++++++++++ consul/state/state_store_test.go | 110 ++++++++++++++++++++++++++++ consul/structs/structs.go | 17 ++--- 4 files changed, 264 insertions(+), 8 deletions(-) diff --git a/consul/state/schema.go b/consul/state/schema.go index f5c5193c39..59c6b63a7e 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -304,6 +304,32 @@ func sessionChecksTableSchema() *memdb.TableSchema { }, }, }, + "session": &memdb.IndexSchema{ + Name: "session", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Session", + Lowercase: false, + }, + }, + "node": &memdb.IndexSchema{ + Name: "node", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Node", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Session", + Lowercase: false, + }, + }, + }, + }, }, } } diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 5973558c80..f194df57c7 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -19,6 +19,10 @@ var ( // ErrMissingService is the error we return if trying an // operation which requires a service but none exists. ErrMissingService = errors.New("Missing service registration") + + // ErrMissingSessionID is returned when a session registration + // is attempted with an empty session ID. + ErrMissingSessionID = errors.New("Missing session ID") ) // StateStore is where we store all of Consul's state, including @@ -36,6 +40,16 @@ type IndexEntry struct { Value uint64 } +// sessionCheck is used to create a many-to-one table such that +// each check registered by a session can be mapped back to the +// session table. This is only used internally in the state +// store and thus it is not exported. +type sessionCheck struct { + Node string + CheckID string + Session string +} + // NewStateStore creates a new in-memory state storage layer. func NewStateStore(logOutput io.Writer) (*StateStore, error) { // Create the in-memory DB @@ -965,3 +979,108 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { tx.Commit() return nil } + +// SessionCreate is used to register a new session in the state store. +func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Call the session creation + if err := s.sessionCreateTxn(idx, sess, tx); err != nil { + return err + } + + tx.Commit() + return nil +} + +// sessionCreateTxn is the inner method used for creating session entries in +// an open transaction. Any health checks registered with the session will be +// checked for failing status. Returns any error encountered. +func (s *StateStore) sessionCreateTxn(idx uint64, sess *structs.Session, tx *memdb.Txn) error { + // Check that we have a session ID + if sess.ID == "" { + return ErrMissingSessionID + } + + // Verify the session behavior is valid + switch sess.Behavior { + case "": + // Release by default to preserve backwards compatibility + sess.Behavior = structs.SessionKeysRelease + case structs.SessionKeysRelease: + case structs.SessionKeysDelete: + default: + return fmt.Errorf("Invalid session behavior: %s", sess.Behavior) + } + + // Assign the indexes. ModifyIndex likely will not be used but + // we set it here anyways for sanity. + sess.CreateIndex = idx + sess.ModifyIndex = idx + + // Check that the node exists + node, err := tx.First("nodes", "id", sess.Node) + if err != nil { + return fmt.Errorf("failed node lookup: %s", err) + } + if node == nil { + return ErrMissingNode + } + + // Go over the session checks and ensure they exist. + for _, checkID := range sess.Checks { + check, err := tx.First("checks", "id", sess.Node, checkID) + if err != nil { + return fmt.Errorf("failed check lookup: %s", err) + } + if check == nil { + return fmt.Errorf("Missing check '%s' registration", checkID) + } + + // Check that the check is not in critical state + status := check.(*structs.HealthCheck).Status + if status == structs.HealthCritical { + return fmt.Errorf("Check '%s' is in %s state", status) + } + } + + // Insert the session + if err := tx.Insert("sessions", sess); err != nil { + return fmt.Errorf("failed inserting session: %s", err) + } + + // Insert the check mappings + for _, checkID := range sess.Checks { + check := &sessionCheck{ + Node: sess.Node, + CheckID: checkID, + Session: sess.ID, + } + if err := tx.Insert("session_checks", check); err != nil { + return fmt.Errorf("failed inserting session check mapping: %s", err) + } + } + + // Update the index + if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + return nil +} + +// GetSession is used to retrieve an active session from the state store. +func (s *StateStore) GetSession(sessionID string) (*structs.Session, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Look up the session by its ID + session, err := tx.First("sessions", "id", sessionID) + if err != nil { + return nil, fmt.Errorf("failed session lookup: %s", err) + } + if session != nil { + return session.(*structs.Session), nil + } + return nil, nil +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index fa49662653..d6b2c3edd5 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "reflect" + "strings" "testing" "github.com/hashicorp/consul/consul/structs" @@ -1164,3 +1165,112 @@ func TestStateStore_KVSDeleteTree(t *testing.T) { t.Fatalf("bad index: %d", idx) } } + +func TestStateStore_SessionCreate(t *testing.T) { + s := testStateStore(t) + + // Registering without a session ID is disallowed + err := s.SessionCreate(1, &structs.Session{}) + if err != ErrMissingSessionID { + t.Fatalf("expected %#v, got: %#v", ErrMissingSessionID, err) + } + + // Invalid session behavior throws error + sess := &structs.Session{ + ID: "foo", + Behavior: "nope", + } + err = s.SessionCreate(1, sess) + if err == nil || !strings.Contains(err.Error(), "session behavior") { + t.Fatalf("expected session behavior error, got: %#v", err) + } + + // Registering with an unknown node is disallowed + sess = &structs.Session{ID: "foo"} + if err := s.SessionCreate(1, sess); err != ErrMissingNode { + t.Fatalf("expected %#v, got: %#v", ErrMissingNode, err) + } + + // None of the errored operations modified the index + if idx := s.maxIndex("sessions"); idx != 0 { + t.Fatalf("bad index: %d", idx) + } + + // Valid session is able to register + testRegisterNode(t, s, 1, "node1") + sess = &structs.Session{ + ID: "foo", + Node: "node1", + } + if err := s.SessionCreate(2, sess); err != nil { + t.Fatalf("err: %s", err) + } + if idx := s.maxIndex("sessions"); idx != 2 { + t.Fatalf("bad index: %d", err) + } + + // Retrieve the session again + session, err := s.GetSession("foo") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Ensure the session looks correct and was assigned the + // proper default value for session behavior. + expect := &structs.Session{ + ID: "foo", + Behavior: structs.SessionKeysRelease, + Node: "node1", + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + } + if !reflect.DeepEqual(expect, session) { + t.Fatalf("bad session: %#v", session) + } + + // Registering with a non-existent check is disallowed + sess = &structs.Session{ + ID: "bar", + Node: "node1", + Checks: []string{"check1"}, + } + err = s.SessionCreate(3, sess) + if err == nil || !strings.Contains(err.Error(), "Missing check") { + t.Fatalf("expected missing check error, got: %#v", err) + } + + // Registering with a critical check is disallowed + testRegisterCheck(t, s, 3, "node1", "", "check1", structs.HealthCritical) + err = s.SessionCreate(4, sess) + if err == nil || !strings.Contains(err.Error(), structs.HealthCritical) { + t.Fatalf("expected critical state error, got: %#v", err) + } + + // Registering with a healthy check succeeds + testRegisterCheck(t, s, 4, "node1", "", "check1", structs.HealthPassing) + if err := s.SessionCreate(5, sess); err != nil { + t.Fatalf("err: %s", err) + } + + tx := s.db.Txn(false) + defer tx.Abort() + + // Check mappings were inserted + check, err := tx.First("session_checks", "session", "bar") + if err != nil { + t.Fatalf("err: %s", err) + } + if check == nil { + t.Fatalf("missing session check") + } + expectCheck := &sessionCheck{ + Node: "node1", + CheckID: "check1", + Session: "bar", + } + if actual := check.(*sessionCheck); !reflect.DeepEqual(actual, expectCheck) { + t.Fatalf("expected %#v, got: %#v", expectCheck, actual) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 01f9c9af30..fdcfaa8fd8 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -430,14 +430,15 @@ const ( // Session is used to represent an open session in the KV store. // This issued to associate node checks with acquired locks. type Session struct { - CreateIndex uint64 - ID string - Name string - Node string - Checks []string - LockDelay time.Duration - Behavior SessionBehavior // What to do when session is invalidated - TTL string + ID string + Name string + Node string + Checks []string + LockDelay time.Duration + Behavior SessionBehavior // What to do when session is invalidated + TTL string + + RaftIndex } type Sessions []*Session