store: use a ReadDB for snapshots

to remove the cyclic dependency between the snapshot handlers and the state.Store
pull/9025/head
Daniel Nephin 2020-10-23 14:43:13 -04:00
parent b724e096c2
commit 23eee604c9
3 changed files with 24 additions and 12 deletions

View File

@ -19,9 +19,9 @@ type EventPayloadCheckServiceNode struct {
// of stream.Events that describe the current state of a service health query.
//
// TODO: no tests for this yet
func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc {
func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
tx := s.db.Txn(false)
tx := db.ReadTxn()
defer tx.Abort()
connect := topic == topicServiceHealthConnect

View File

@ -22,6 +22,11 @@ type AbortTxn interface {
Abort()
}
// ReadDB is a DB that provides read-only transactions.
type ReadDB interface {
ReadTxn() AbortTxn
}
// WriteTxn is implemented by memdb.Txn to perform write operations.
type WriteTxn interface {
ReadTxn
@ -160,6 +165,12 @@ func (tx *txn) Commit() error {
return nil
}
type readDB memdb.MemDB
func (db *readDB) ReadTxn() AbortTxn {
return (*memdb.MemDB)(db).Txn(false)
}
var (
topicServiceHealth = pbsubscribe.Topic_ServiceHealth
topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect
@ -182,11 +193,11 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
return events, nil
}
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers {
return stream.SnapshotHandlers{
topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth),
topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth),
// The connect topic is temporarily disabled until the correct events are
// created for terminating gateway changes.
//topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect),
//topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect),
}
}

View File

@ -6,9 +6,10 @@ import (
"fmt"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
var (
@ -160,6 +161,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
return nil, fmt.Errorf("Failed setting up state store: %s", err)
}
pub := stream.NewEventPublisher(newSnapshotHandlers((*readDB)(db)), 10*time.Second)
ctx, cancel := context.WithCancel(context.TODO())
s := &Store{
schema: schema,
@ -167,12 +169,11 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
stopEventPublisher: cancel,
}
pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second)
s.db = &changeTrackerDB{
db: &changeTrackerDB{
db: db,
publisher: pub,
processChanges: processDBChanges,
},
}
go pub.Run(ctx)