diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 4b7ee11932..9e681bb46c 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -19,9 +19,9 @@ type EventPayloadCheckServiceNode struct { // of stream.Events that describe the current state of a service health query. // // TODO: no tests for this yet -func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc { +func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { - tx := s.db.Txn(false) + tx := db.ReadTxn() defer tx.Abort() connect := topic == topicServiceHealthConnect diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 2c7cc5cd41..96f65ff3ad 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -22,6 +22,11 @@ type AbortTxn interface { Abort() } +// ReadDB is a DB that provides read-only transactions. +type ReadDB interface { + ReadTxn() AbortTxn +} + // WriteTxn is implemented by memdb.Txn to perform write operations. type WriteTxn interface { ReadTxn @@ -160,6 +165,12 @@ func (tx *txn) Commit() error { return nil } +type readDB memdb.MemDB + +func (db *readDB) ReadTxn() AbortTxn { + return (*memdb.MemDB)(db).Txn(false) +} + var ( topicServiceHealth = pbsubscribe.Topic_ServiceHealth topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect @@ -182,11 +193,11 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { return events, nil } -func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { +func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), + topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth), // The connect topic is temporarily disabled until the correct events are // created for terminating gateway changes. - //topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), + //topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect), } } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index e71b995f15..1302de8aa7 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -6,9 +6,10 @@ import ( "fmt" "time" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" ) var ( @@ -160,6 +161,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { return nil, fmt.Errorf("Failed setting up state store: %s", err) } + pub := stream.NewEventPublisher(newSnapshotHandlers((*readDB)(db)), 10*time.Second) ctx, cancel := context.WithCancel(context.TODO()) s := &Store{ schema: schema, @@ -167,12 +169,11 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), stopEventPublisher: cancel, - } - pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second) - s.db = &changeTrackerDB{ - db: db, - publisher: pub, - processChanges: processDBChanges, + db: &changeTrackerDB{ + db: db, + publisher: pub, + processChanges: processDBChanges, + }, } go pub.Run(ctx)