diff --git a/agent/consul/state/acl.go b/agent/consul/state/acl.go index 7bd11194f6..bb0b15755e 100644 --- a/agent/consul/state/acl.go +++ b/agent/consul/state/acl.go @@ -285,7 +285,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le defer tx.Abort() // We must have initialized before this will ever be possible. - existing, err := tx.First("index", "id", "acl-token-bootstrap") + existing, err := tx.First(tableIndex, indexID, "acl-token-bootstrap") if err != nil { return fmt.Errorf("bootstrap check failed: %v", err) } @@ -300,7 +300,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le if err := aclTokenSetTxn(tx, idx, token, ACLTokenSetOptions{Legacy: legacy}); err != nil { return fmt.Errorf("failed inserting bootstrap token: %v", err) } - if err := tx.Insert("index", &IndexEntry{"acl-token-bootstrap", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"acl-token-bootstrap", idx}); err != nil { return fmt.Errorf("failed to mark ACL bootstrapping as complete: %v", err) } return tx.Commit() @@ -311,7 +311,7 @@ func (s *Store) CanBootstrapACLToken() (bool, uint64, error) { tx := s.db.Txn(false) // Lookup the bootstrap sentinel - out, err := tx.First("index", "id", "acl-token-bootstrap") + out, err := tx.First(tableIndex, indexID, "acl-token-bootstrap") if err != nil { return false, 0, err } diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 71a42f0e25..7030b465ba 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -324,7 +324,7 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod if err := tx.Insert("nodes", node); err != nil { return fmt.Errorf("failed inserting node: %s", err) } - if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } // Update the node's service indexes as the node information is included @@ -557,7 +557,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { if err := tx.Delete("coordinates", coord); err != nil { return fmt.Errorf("failed deleting coordinate: %s", err) } - if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"coordinates", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } } @@ -566,7 +566,7 @@ func (s *Store) deleteNodeTxn(tx WriteTxn, idx uint64, nodeName string) error { if err := tx.Delete("nodes", node); err != nil { return fmt.Errorf("failed deleting node: %s", err) } - if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"nodes", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -1367,7 +1367,7 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st _, serviceIndex, err := catalogServiceMaxIndex(tx, svc.ServiceName, entMeta) if err == nil && serviceIndex != nil { // we found service. index, garbage collect it - if errW := tx.Delete("index", serviceIndex); errW != nil { + if errW := tx.Delete(tableIndex, serviceIndex); errW != nil { return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err) } } diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index 4ca5b40d25..cce6cb9435 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -54,7 +54,7 @@ func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _ } func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { - if err := tx.Insert("index", &IndexEntry{indexServiceExtinction, idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{indexServiceExtinction, idx}); err != nil { return fmt.Errorf("failed updating missing service extinction index: %s", err) } return nil @@ -86,7 +86,7 @@ func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 { } func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { - return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil)) + return tx.FirstWatch(tableIndex, "id", serviceIndexName(serviceName, nil)) } func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 { @@ -110,7 +110,7 @@ func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.En } func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) { - return tx.First("index", "id", indexServiceExtinction) + return tx.First(tableIndex, "id", indexServiceExtinction) } func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 { @@ -129,7 +129,7 @@ func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMe func catalogUpdateCheckIndexes(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { // update the universal index entry - if err := tx.Insert("index", &IndexEntry{"checks", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"checks", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } return nil diff --git a/agent/consul/state/kvs_oss.go b/agent/consul/state/kvs_oss.go index b1559af254..25f427e1fa 100644 --- a/agent/consul/state/kvs_oss.go +++ b/agent/consul/state/kvs_oss.go @@ -5,8 +5,9 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) func kvsIndexer() *memdb.StringFieldIndex { @@ -26,7 +27,7 @@ func insertKVTxn(tx WriteTxn, entry *structs.DirEntry, updateMax bool, _ bool) e return fmt.Errorf("failed updating kvs index: %v", err) } } else { - if err := tx.Insert("index", &IndexEntry{"kvs", entry.ModifyIndex}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"kvs", entry.ModifyIndex}); err != nil { return fmt.Errorf("failed updating kvs index: %s", err) } } @@ -70,7 +71,7 @@ func (s *Store) kvsDeleteTreeTxn(tx WriteTxn, idx uint64, prefix string, entMeta } } - if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } } @@ -87,7 +88,7 @@ func kvsDeleteWithEntry(tx WriteTxn, entry *structs.DirEntry, idx uint64) error return fmt.Errorf("failed deleting kvs entry: %s", err) } - if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{"kvs", idx}); err != nil { return fmt.Errorf("failed updating kvs index: %s", err) } diff --git a/agent/consul/state/schema.go b/agent/consul/state/schema.go index 402efe985e..cecb2a2a34 100644 --- a/agent/consul/state/schema.go +++ b/agent/consul/state/schema.go @@ -58,6 +58,8 @@ type IndexEntry struct { Value uint64 } +const tableIndex = "index" + // indexTableSchema returns a new table schema used for tracking various the // latest raft index for a table or entities within a table. // @@ -67,10 +69,10 @@ type IndexEntry struct { // table, even when that update is a delete of the most recent item. func indexTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: "index", + Name: tableIndex, Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", + indexID: { + Name: indexID, AllowMissing: false, Unique: true, Indexer: &memdb.StringFieldIndex{ diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index d0436c1847..a0a26243e8 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -209,16 +209,12 @@ func (s *Snapshot) LastIndex() uint64 { } func (s *Snapshot) Indexes() (memdb.ResultIterator, error) { - iter, err := s.tx.Get("index", "id") - if err != nil { - return nil, err - } - return iter, nil + return s.tx.Get(tableIndex, indexID) } // IndexRestore is used to restore an index func (s *Restore) IndexRestore(idx *IndexEntry) error { - if err := s.tx.Insert("index", idx); err != nil { + if err := s.tx.Insert(tableIndex, idx); err != nil { return fmt.Errorf("index insert failed: %v", err) } return nil @@ -279,7 +275,7 @@ func maxIndexTxn(tx ReadTxn, tables ...string) uint64 { func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 { var lindex uint64 for _, table := range tables { - ch, ti, err := tx.FirstWatch("index", "id", table) + ch, ti, err := tx.FirstWatch(tableIndex, "id", table) if err != nil { panic(fmt.Sprintf("unknown index: %s err: %s", table, err)) } @@ -294,21 +290,22 @@ func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 { // indexUpdateMaxTxn is used when restoring entries and sets the table's index to // the given idx only if it's greater than the current index. func indexUpdateMaxTxn(tx WriteTxn, idx uint64, table string) error { - ti, err := tx.First("index", "id", table) + ti, err := tx.First(tableIndex, indexID, table) if err != nil { return fmt.Errorf("failed to retrieve existing index: %s", err) } // Always take the first update, otherwise do the > check. if ti == nil { - if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil { + if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil { return fmt.Errorf("failed updating index %s", err) } - } else if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value { - if err := tx.Insert("index", &IndexEntry{table, idx}); err != nil { + return nil + } + if cur, ok := ti.(*IndexEntry); ok && idx > cur.Value { + if err := tx.Insert(tableIndex, &IndexEntry{table, idx}); err != nil { return fmt.Errorf("failed updating index %s", err) } } - return nil }