From 7b1534ef0548a9c6b01953506c51569b7ad38851 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 8 Sep 2020 18:13:24 -0400 Subject: [PATCH 1/3] state: rename and export EventPayload The subscribe endpoint needs to be able to inspect the payload to filter events, and convert them into the protobuf types. Use the protobuf CatalogOp type for the operation field, for now. In the future if we end up with multiple interfaces we should be able to remove the protobuf dependency by changing this to an int32 and adding a test for the mapping between the values. Make the value of the payload a concrete type instead of interface{}. We can create other payloads for other event types. --- agent/consul/state/catalog_events.go | 43 +++++++++-------------- agent/consul/state/catalog_events_test.go | 13 +++---- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index b42d47fc64..d68180ed60 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -3,20 +3,15 @@ package state import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbsubscribe" memdb "github.com/hashicorp/go-memdb" ) -type changeOp int - -const ( - OpDelete changeOp = iota - OpCreate - OpUpdate -) - -type eventPayload struct { - Op changeOp - Obj interface{} +// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to +// indicates changes to a CheckServiceNode for service health. +type EventPayloadCheckServiceNode struct { + Op pbsubscribe.CatalogOp + Value *structs.CheckServiceNode } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -39,9 +34,9 @@ func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { event := stream.Event{ Index: idx, Topic: topic, - Payload: eventPayload{ - Op: OpCreate, - Obj: &n, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &n, }, } @@ -320,15 +315,11 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { } func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { - ep, ok := payload.(eventPayload) + ep, ok := payload.(EventPayloadCheckServiceNode) if !ok { return nil } - csn, ok := ep.Obj.(*structs.CheckServiceNode) - if !ok { - return nil - } - return csn + return ep.Value } // newServiceHealthEventsForNode returns health events for all services on the @@ -440,9 +431,9 @@ func newServiceHealthEventRegister( Topic: TopicServiceHealth, Key: sn.ServiceName, Index: idx, - Payload: eventPayload{ - Op: OpCreate, - Obj: csn, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: csn, }, } } @@ -467,9 +458,9 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream Topic: TopicServiceHealth, Key: sn.ServiceName, Index: idx, - Payload: eventPayload{ - Op: OpDelete, - Obj: csn, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: csn, }, } } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 5cf610604f..d4d5416157 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" "github.com/stretchr/testify/require" ) @@ -1394,9 +1395,9 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st Topic: TopicServiceHealth, Key: svc, Index: index, - Payload: eventPayload{ - Op: OpCreate, - Obj: &structs.CheckServiceNode{ + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ Node: &structs.Node{ ID: nodeID, Node: node, @@ -1462,9 +1463,9 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) Topic: TopicServiceHealth, Key: svc, Index: index, - Payload: eventPayload{ - Op: OpDelete, - Obj: &structs.CheckServiceNode{ + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: &structs.CheckServiceNode{ Node: &structs.Node{ Node: fmt.Sprintf("node%d", nodeNum), }, From 0fb2a5b9921dc2ef7fa5c95afa789f506113c5ec Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 15 Sep 2020 15:04:33 -0400 Subject: [PATCH 2/3] state: use pbsubscribe.Topic for topic values --- agent/consul/state/catalog_events.go | 20 +++++++++++--------- agent/consul/state/catalog_events_test.go | 10 +++++----- agent/consul/state/memdb.go | 18 +++++------------- agent/consul/state/store_integration_test.go | 8 +++++++- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index d68180ed60..4b7ee11932 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -1,10 +1,11 @@ package state import ( + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbsubscribe" - memdb "github.com/hashicorp/go-memdb" ) // EventPayloadCheckServiceNode is used as the Payload for a stream.Event to @@ -18,19 +19,20 @@ type EventPayloadCheckServiceNode struct { // of stream.Events that describe the current state of a service health query. // // TODO: no tests for this yet -func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { +func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { tx := s.db.Txn(false) defer tx.Abort() - connect := topic == TopicServiceHealthConnect + connect := topic == topicServiceHealthConnect // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil) if err != nil { return 0, err } - for _, n := range nodes { + for i := range nodes { + n := nodes[i] event := stream.Event{ Index: idx, Topic: topic, @@ -249,7 +251,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S } e := newServiceHealthEventDeregister(idx, before) - e.Topic = TopicServiceHealthConnect + e.Topic = topicServiceHealthConnect e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName return e, true } @@ -285,7 +287,7 @@ func changeTypeFromChange(change memdb.Change) changeType { func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { var result []stream.Event for _, event := range events { - if event.Topic != TopicServiceHealth { + if event.Topic != topicServiceHealth { // Skip non-health or any events already emitted to Connect topic continue } @@ -295,7 +297,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { } connectEvent := event - connectEvent.Topic = TopicServiceHealthConnect + connectEvent.Topic = topicServiceHealthConnect switch { case node.Service.Connect.Native: @@ -428,7 +430,7 @@ func newServiceHealthEventRegister( Checks: checks, } return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ @@ -455,7 +457,7 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream } return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index d4d5416157..9d6d91d54f 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1138,7 +1138,7 @@ func evConnectNative(e *stream.Event) error { // depending on which topic they are published to and they determin this from // the event. func evConnectTopic(e *stream.Event) error { - e.Topic = TopicServiceHealthConnect + e.Topic = topicServiceHealthConnect return nil } @@ -1172,7 +1172,7 @@ func evSidecar(e *stream.Event) error { // Update event key to be the proxy service name, but only if this is not // already in the connect topic - if e.Topic != TopicServiceHealthConnect { + if e.Topic != topicServiceHealthConnect { e.Key = csn.Service.Service } return nil @@ -1262,7 +1262,7 @@ func evRenameService(e *stream.Event) error { csn.Service.Proxy.DestinationServiceName += "_changed" // If this is the connect topic we need to change the key too - if e.Topic == TopicServiceHealthConnect { + if e.Topic == topicServiceHealthConnect { e.Key += "_changed" } return nil @@ -1392,7 +1392,7 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ @@ -1460,7 +1460,7 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st // adding too many options to callers. func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 3fd72dfaa7..beb46d62d8 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/go-memdb" ) @@ -158,18 +159,9 @@ func (tx *txn) Commit() error { return nil } -// TODO: may be replaced by a gRPC type. -type topic string - -func (t topic) String() string { - return string(t) -} - var ( - // TopicServiceHealth contains events for all registered service instances. - TopicServiceHealth topic = "topic-service-health" - // TopicServiceHealthConnect contains events for connect-enabled service instances. - TopicServiceHealthConnect topic = "topic-service-health-connect" + topicServiceHealth = pbsubscribe.Topic_ServiceHealth + topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect ) func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { @@ -191,7 +183,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth), - TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect), + topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), + topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), } } diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 6b2e9d1fe6..14d667ce39 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -372,7 +372,13 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } -var topicService stream.Topic = topic("test-topic-service") +type topic string + +func (t topic) String() string { + return string(t) +} + +var topicService topic = "test-topic-service" func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ From b7ca15e91091c31e76c6149356e5e1e20f89f397 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 9 Sep 2020 16:26:11 -0400 Subject: [PATCH 3/3] stream: move goroutine out of New This change will make it easier to manage goroutine lifecycle from the caller. Also expose EventPublisher from state.Store --- agent/consul/fsm/fsm.go | 1 - agent/consul/state/memdb.go | 6 +----- agent/consul/state/state_store.go | 11 ++++++++++- agent/consul/state/store_integration_test.go | 13 +++++++++---- agent/consul/stream/event_publisher.go | 8 ++++---- agent/consul/stream/event_publisher_test.go | 7 +++++-- 6 files changed, 29 insertions(+), 17 deletions(-) diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 85abe19287..cc0cb9a071 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -42,7 +42,6 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) { // this outside the Server to avoid exposing this outside the package. type FSM struct { logger hclog.Logger - path string // apply is built off the commands global and is used to route apply // operations to their appropriate handlers. diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index beb46d62d8..5e6bbb604a 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -37,14 +37,10 @@ type Changes struct { // 2. Sent to the eventPublisher which will create and emit change events type changeTrackerDB struct { db *memdb.MemDB - publisher eventPublisher + publisher *stream.EventPublisher processChanges func(ReadTxn, Changes) ([]stream.Event, error) } -type eventPublisher interface { - Publish(events []stream.Event) -} - // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting // code may use it to create a read-only transaction, but it will panic if called // with write=true. diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 3a7229607c..a02cd864da 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -168,14 +168,23 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { lockDelay: NewDelay(), stopEventPublisher: cancel, } + pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second) s.db = &changeTrackerDB{ db: db, - publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second), + publisher: pub, processChanges: processDBChanges, } + + go pub.Run(ctx) return s, nil } +// EventPublisher returns the stream.EventPublisher used by the Store to +// publish events. +func (s *Store) EventPublisher() *stream.EventPublisher { + return s.db.publisher +} + // Snapshot is used to create a point-in-time snapshot of the entire db. func (s *Store) Snapshot() *Snapshot { tx := s.db.Txn(false) diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 14d667ce39..f9c978ed3c 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -28,7 +28,8 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -111,7 +112,8 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -227,7 +229,8 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) s.db.publisher = publisher sub, err := publisher.Subscribe(subscription) require.NoError(err) @@ -433,7 +436,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0) + publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0) + go publisher.Run(ctx) + s.db.publisher = publisher sub, err := publisher.Subscribe(req) require.NoError(t, err) diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 815a68a261..1de9e10580 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -79,7 +79,7 @@ type SnapshotAppender interface { // A goroutine is run in the background to publish events to all subscribes. // Cancelling the context will shutdown the goroutine, to free resources, // and stop all publishing. -func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { +func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, topicBuffers: make(map[Topic]*eventBuffer), @@ -91,8 +91,6 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache snapshotHandlers: handlers, } - go e.handleUpdates(ctx) - return e } @@ -103,7 +101,9 @@ func (e *EventPublisher) Publish(events []Event) { } } -func (e *EventPublisher) handleUpdates(ctx context.Context) { +// Run the event publisher until ctx is cancelled. Run should be called from a +// goroutine to forward events from Publish to all the appropriate subscribers. +func (e *EventPublisher) Run(ctx context.Context) { for { select { case <-ctx.Done(): diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 4448e68454..63f7d763d9 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -25,7 +25,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0) + publisher := NewEventPublisher(newTestSnapshotHandlers(), 0) + go publisher.Run(ctx) + sub, err := publisher.Subscribe(subscription) require.NoError(t, err) eventCh := consumeSubscription(ctx, sub) @@ -123,7 +125,8 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { handlers[intTopic(22)] = fn handlers[intTopic(33)] = fn - publisher := NewEventPublisher(ctx, handlers, time.Second) + publisher := NewEventPublisher(handlers, time.Second) + go publisher.Run(ctx) sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)}) require.NoError(t, err)