diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 85abe19287..cc0cb9a071 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -42,7 +42,6 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) { // this outside the Server to avoid exposing this outside the package. type FSM struct { logger hclog.Logger - path string // apply is built off the commands global and is used to route apply // operations to their appropriate handlers. diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index b42d47fc64..4b7ee11932 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -1,47 +1,44 @@ package state import ( + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/proto/pbsubscribe" ) -type changeOp int - -const ( - OpDelete changeOp = iota - OpCreate - OpUpdate -) - -type eventPayload struct { - Op changeOp - Obj interface{} +// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to +// indicates changes to a CheckServiceNode for service health. +type EventPayloadCheckServiceNode struct { + Op pbsubscribe.CatalogOp + Value *structs.CheckServiceNode } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot // of stream.Events that describe the current state of a service health query. // // TODO: no tests for this yet -func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { +func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { tx := s.db.Txn(false) defer tx.Abort() - connect := topic == TopicServiceHealthConnect + connect := topic == topicServiceHealthConnect // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil) if err != nil { return 0, err } - for _, n := range nodes { + for i := range nodes { + n := nodes[i] event := stream.Event{ Index: idx, Topic: topic, - Payload: eventPayload{ - Op: OpCreate, - Obj: &n, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &n, }, } @@ -254,7 +251,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S } e := newServiceHealthEventDeregister(idx, before) - e.Topic = TopicServiceHealthConnect + e.Topic = topicServiceHealthConnect e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName return e, true } @@ -290,7 +287,7 @@ func changeTypeFromChange(change memdb.Change) changeType { func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { var result []stream.Event for _, event := range events { - if event.Topic != TopicServiceHealth { + if event.Topic != topicServiceHealth { // Skip non-health or any events already emitted to Connect topic continue } @@ -300,7 +297,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { } connectEvent := event - connectEvent.Topic = TopicServiceHealthConnect + connectEvent.Topic = topicServiceHealthConnect switch { case node.Service.Connect.Native: @@ -320,15 +317,11 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { } func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { - ep, ok := payload.(eventPayload) + ep, ok := payload.(EventPayloadCheckServiceNode) if !ok { return nil } - csn, ok := ep.Obj.(*structs.CheckServiceNode) - if !ok { - return nil - } - return csn + return ep.Value } // newServiceHealthEventsForNode returns health events for all services on the @@ -437,12 +430,12 @@ func newServiceHealthEventRegister( Checks: checks, } return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: sn.ServiceName, Index: idx, - Payload: eventPayload{ - Op: OpCreate, - Obj: csn, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: csn, }, } } @@ -464,12 +457,12 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream } return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: sn.ServiceName, Index: idx, - Payload: eventPayload{ - Op: OpDelete, - Obj: csn, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: csn, }, } } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 5cf610604f..9d6d91d54f 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" "github.com/stretchr/testify/require" ) @@ -1137,7 +1138,7 @@ func evConnectNative(e *stream.Event) error { // depending on which topic they are published to and they determin this from // the event. func evConnectTopic(e *stream.Event) error { - e.Topic = TopicServiceHealthConnect + e.Topic = topicServiceHealthConnect return nil } @@ -1171,7 +1172,7 @@ func evSidecar(e *stream.Event) error { // Update event key to be the proxy service name, but only if this is not // already in the connect topic - if e.Topic != TopicServiceHealthConnect { + if e.Topic != topicServiceHealthConnect { e.Key = csn.Service.Service } return nil @@ -1261,7 +1262,7 @@ func evRenameService(e *stream.Event) error { csn.Service.Proxy.DestinationServiceName += "_changed" // If this is the connect topic we need to change the key too - if e.Topic == TopicServiceHealthConnect { + if e.Topic == topicServiceHealthConnect { e.Key += "_changed" } return nil @@ -1391,12 +1392,12 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: svc, Index: index, - Payload: eventPayload{ - Op: OpCreate, - Obj: &structs.CheckServiceNode{ + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ Node: &structs.Node{ ID: nodeID, Node: node, @@ -1459,12 +1460,12 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st // adding too many options to callers. func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: svc, Index: index, - Payload: eventPayload{ - Op: OpDelete, - Obj: &structs.CheckServiceNode{ + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: &structs.CheckServiceNode{ Node: &structs.Node{ Node: fmt.Sprintf("node%d", nodeNum), }, diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 3fd72dfaa7..5e6bbb604a 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/go-memdb" ) @@ -36,14 +37,10 @@ type Changes struct { // 2. Sent to the eventPublisher which will create and emit change events type changeTrackerDB struct { db *memdb.MemDB - publisher eventPublisher + publisher *stream.EventPublisher processChanges func(ReadTxn, Changes) ([]stream.Event, error) } -type eventPublisher interface { - Publish(events []stream.Event) -} - // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting // code may use it to create a read-only transaction, but it will panic if called // with write=true. @@ -158,18 +155,9 @@ func (tx *txn) Commit() error { return nil } -// TODO: may be replaced by a gRPC type. -type topic string - -func (t topic) String() string { - return string(t) -} - var ( - // TopicServiceHealth contains events for all registered service instances. - TopicServiceHealth topic = "topic-service-health" - // TopicServiceHealthConnect contains events for connect-enabled service instances. - TopicServiceHealthConnect topic = "topic-service-health-connect" + topicServiceHealth = pbsubscribe.Topic_ServiceHealth + topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect ) func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { @@ -191,7 +179,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth), - TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect), + topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), + topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), } } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 3a7229607c..a02cd864da 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -168,14 +168,23 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { lockDelay: NewDelay(), stopEventPublisher: cancel, } + pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second) s.db = &changeTrackerDB{ db: db, - publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second), + publisher: pub, processChanges: processDBChanges, } + + go pub.Run(ctx) return s, nil } +// EventPublisher returns the stream.EventPublisher used by the Store to +// publish events. +func (s *Store) EventPublisher() *stream.EventPublisher { + return s.db.publisher +} + // Snapshot is used to create a point-in-time snapshot of the entire db. func (s *Store) Snapshot() *Snapshot { tx := s.db.Txn(false) diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 6b2e9d1fe6..f9c978ed3c 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -28,7 +28,8 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -111,7 +112,8 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -227,7 +229,8 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -372,7 +375,13 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } -var topicService stream.Topic = topic("test-topic-service") +type topic string + +func (t topic) String() string { + return string(t) +} + +var topicService topic = "test-topic-service" func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ @@ -427,7 +436,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) + s.db.publisher = publisher sub, err := publisher.Subscribe(req) require.NoError(t, err) diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 815a68a261..1de9e10580 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -79,7 +79,7 @@ type SnapshotAppender interface { // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[Topic]*eventBuffer), @@ -91,8 +91,6 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache snapshotHandlers: handlers, } - go e.handleUpdates(ctx) - return e } @@ -103,7 +101,9 @@ func (e *EventPublisher) Publish(events []Event) { } } -func (e *EventPublisher) handleUpdates(ctx context.Context) { +// Run the event publisher until ctx is cancelled. Run should be called from a +// goroutine to forward events from Publish to all the appropriate subscribers. +func (e *EventPublisher) Run(ctx context.Context) { for { select { case <-ctx.Done(): diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 4448e68454..63f7d763d9 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -25,7 +25,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0) + publisher := NewEventPublisher(newTestSnapshotHandlers(), 0) + go publisher.Run(ctx) + sub, err := publisher.Subscribe(subscription) require.NoError(t, err) eventCh := consumeSubscription(ctx, sub) @@ -123,7 +125,8 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { handlers[intTopic(22)] = fn handlers[intTopic(33)] = fn - publisher := NewEventPublisher(ctx, handlers, time.Second) + publisher := NewEventPublisher(handlers, time.Second) + go publisher.Run(ctx) sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)}) require.NoError(t, err)