consul: Adding new session tables

pull/162/head
Armon Dadgar 11 years ago
parent 36a1b59348
commit 1e5c8a445c

@ -17,6 +17,8 @@ const (
dbServices = "services" dbServices = "services"
dbChecks = "checks" dbChecks = "checks"
dbKVS = "kvs" dbKVS = "kvs"
dbSessions = "sessions"
dbSessionChecks = "sessionChecks"
dbMaxMapSize32bit uint64 = 512 * 1024 * 1024 // 512MB maximum size dbMaxMapSize32bit uint64 = 512 * 1024 * 1024 // 512MB maximum size
dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size
) )
@ -29,16 +31,18 @@ const (
// implementation uses the Lightning Memory-Mapped Database (MDB). // implementation uses the Lightning Memory-Mapped Database (MDB).
// This gives us Multi-Version Concurrency Control for "free" // This gives us Multi-Version Concurrency Control for "free"
type StateStore struct { type StateStore struct {
logger *log.Logger logger *log.Logger
path string path string
env *mdb.Env env *mdb.Env
nodeTable *MDBTable nodeTable *MDBTable
serviceTable *MDBTable serviceTable *MDBTable
checkTable *MDBTable checkTable *MDBTable
kvsTable *MDBTable kvsTable *MDBTable
tables MDBTables sessionTable *MDBTable
watch map[*MDBTable]*NotifyGroup sessionChecksTable *MDBTable
queryTables map[string]MDBTables tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
} }
// StateSnapshot is used to provide a point-in-time snapshot // StateSnapshot is used to provide a point-in-time snapshot
@ -49,6 +53,15 @@ type StateSnapshot struct {
lastIndex uint64 lastIndex uint64
} }
// sessionCheck is used to create a many-to-one table such
// that each check registered by a session can be mapped back
// to the session row.
type sessionCheck struct {
Node string
CheckID string
Session string
}
// Close is used to abort the transaction and allow for cleanup // Close is used to abort the transaction and allow for cleanup
func (s *StateSnapshot) Close() error { func (s *StateSnapshot) Close() error {
s.tx.Abort() s.tx.Abort()
@ -219,8 +232,47 @@ func (s *StateStore) initialize() error {
}, },
} }
s.sessionTable = &MDBTable{
Name: dbSessions,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"ID"},
},
"node": &MDBIndex{
AllowBlank: true,
Fields: []string{"Node"},
},
},
Decoder: func(buf []byte) interface{} {
out := new(structs.Session)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
s.sessionChecksTable = &MDBTable{
Name: dbSessionChecks,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Node", "CheckID", "Session"},
},
},
Decoder: func(buf []byte) interface{} {
out := new(sessionCheck)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
// Store the set of tables // Store the set of tables
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable} s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable,
s.kvsTable, s.sessionTable, s.sessionChecksTable}
for _, table := range s.tables { for _, table := range s.tables {
table.Env = s.env table.Env = s.env
table.Encoder = encoder table.Encoder = encoder

@ -335,6 +335,14 @@ type IndexedKeyList struct {
QueryMeta QueryMeta
} }
// Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks.
type Session struct {
ID string
Node string
Checks []string
}
// Decode is used to decode a MsgPack encoded object // Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error { func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle var handle codec.MsgpackHandle

Loading…
Cancel
Save