Merge pull request #8769 from hashicorp/streaming/prep-for-subscribe-service

state: use protobuf Topic and and export payload type
pull/8808/head
Daniel Nephin 2020-10-02 13:30:06 -04:00 committed by GitHub
commit 04b51de783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 81 additions and 77 deletions

View File

@ -42,7 +42,6 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) {
// this outside the Server to avoid exposing this outside the package.
type FSM struct {
logger hclog.Logger
path string
// apply is built off the commands global and is used to route apply
// operations to their appropriate handlers.

View File

@ -1,47 +1,44 @@
package state
import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
type changeOp int
const (
OpDelete changeOp = iota
OpCreate
OpUpdate
)
type eventPayload struct {
Op changeOp
Obj interface{}
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
// indicates changes to a CheckServiceNode for service health.
type EventPayloadCheckServiceNode struct {
Op pbsubscribe.CatalogOp
Value *structs.CheckServiceNode
}
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
// of stream.Events that describe the current state of a service health query.
//
// TODO: no tests for this yet
func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc {
func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc {
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
tx := s.db.Txn(false)
defer tx.Abort()
connect := topic == TopicServiceHealthConnect
connect := topic == topicServiceHealthConnect
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil)
if err != nil {
return 0, err
}
for _, n := range nodes {
for i := range nodes {
n := nodes[i]
event := stream.Event{
Index: idx,
Topic: topic,
Payload: eventPayload{
Op: OpCreate,
Obj: &n,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
},
}
@ -254,7 +251,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S
}
e := newServiceHealthEventDeregister(idx, before)
e.Topic = TopicServiceHealthConnect
e.Topic = topicServiceHealthConnect
e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName
return e, true
}
@ -290,7 +287,7 @@ func changeTypeFromChange(change memdb.Change) changeType {
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
var result []stream.Event
for _, event := range events {
if event.Topic != TopicServiceHealth {
if event.Topic != topicServiceHealth {
// Skip non-health or any events already emitted to Connect topic
continue
}
@ -300,7 +297,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
}
connectEvent := event
connectEvent.Topic = TopicServiceHealthConnect
connectEvent.Topic = topicServiceHealthConnect
switch {
case node.Service.Connect.Native:
@ -320,15 +317,11 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
}
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
ep, ok := payload.(eventPayload)
ep, ok := payload.(EventPayloadCheckServiceNode)
if !ok {
return nil
}
csn, ok := ep.Obj.(*structs.CheckServiceNode)
if !ok {
return nil
}
return csn
return ep.Value
}
// newServiceHealthEventsForNode returns health events for all services on the
@ -437,12 +430,12 @@ func newServiceHealthEventRegister(
Checks: checks,
}
return stream.Event{
Topic: TopicServiceHealth,
Topic: topicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: eventPayload{
Op: OpCreate,
Obj: csn,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: csn,
},
}
}
@ -464,12 +457,12 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream
}
return stream.Event{
Topic: TopicServiceHealth,
Topic: topicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: eventPayload{
Op: OpDelete,
Obj: csn,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
Value: csn,
},
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
)
@ -1137,7 +1138,7 @@ func evConnectNative(e *stream.Event) error {
// depending on which topic they are published to and they determin this from
// the event.
func evConnectTopic(e *stream.Event) error {
e.Topic = TopicServiceHealthConnect
e.Topic = topicServiceHealthConnect
return nil
}
@ -1171,7 +1172,7 @@ func evSidecar(e *stream.Event) error {
// Update event key to be the proxy service name, but only if this is not
// already in the connect topic
if e.Topic != TopicServiceHealthConnect {
if e.Topic != topicServiceHealthConnect {
e.Key = csn.Service.Service
}
return nil
@ -1261,7 +1262,7 @@ func evRenameService(e *stream.Event) error {
csn.Service.Proxy.DestinationServiceName += "_changed"
// If this is the connect topic we need to change the key too
if e.Topic == TopicServiceHealthConnect {
if e.Topic == topicServiceHealthConnect {
e.Key += "_changed"
}
return nil
@ -1391,12 +1392,12 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return stream.Event{
Topic: TopicServiceHealth,
Topic: topicServiceHealth,
Key: svc,
Index: index,
Payload: eventPayload{
Op: OpCreate,
Obj: &structs.CheckServiceNode{
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Node: &structs.Node{
ID: nodeID,
Node: node,
@ -1459,12 +1460,12 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
// adding too many options to callers.
func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event {
return stream.Event{
Topic: TopicServiceHealth,
Topic: topicServiceHealth,
Key: svc,
Index: index,
Payload: eventPayload{
Op: OpDelete,
Obj: &structs.CheckServiceNode{
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
Value: &structs.CheckServiceNode{
Node: &structs.Node{
Node: fmt.Sprintf("node%d", nodeNum),
},

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/go-memdb"
)
@ -36,14 +37,10 @@ type Changes struct {
// 2. Sent to the eventPublisher which will create and emit change events
type changeTrackerDB struct {
db *memdb.MemDB
publisher eventPublisher
publisher *stream.EventPublisher
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
}
type eventPublisher interface {
Publish(events []stream.Event)
}
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
// code may use it to create a read-only transaction, but it will panic if called
// with write=true.
@ -158,18 +155,9 @@ func (tx *txn) Commit() error {
return nil
}
// TODO: may be replaced by a gRPC type.
type topic string
func (t topic) String() string {
return string(t)
}
var (
// TopicServiceHealth contains events for all registered service instances.
TopicServiceHealth topic = "topic-service-health"
// TopicServiceHealthConnect contains events for connect-enabled service instances.
TopicServiceHealthConnect topic = "topic-service-health-connect"
topicServiceHealth = pbsubscribe.Topic_ServiceHealth
topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect
)
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
@ -191,7 +179,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{
TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth),
TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect),
topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth),
topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect),
}
}

View File

@ -168,14 +168,23 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
lockDelay: NewDelay(),
stopEventPublisher: cancel,
}
pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second)
s.db = &changeTrackerDB{
db: db,
publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second),
publisher: pub,
processChanges: processDBChanges,
}
go pub.Run(ctx)
return s, nil
}
// EventPublisher returns the stream.EventPublisher used by the Store to
// publish events.
func (s *Store) EventPublisher() *stream.EventPublisher {
return s.db.publisher
}
// Snapshot is used to create a point-in-time snapshot of the entire db.
func (s *Store) Snapshot() *Snapshot {
tx := s.db.Txn(false)

View File

@ -28,7 +28,8 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
@ -111,7 +112,8 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
@ -227,7 +229,8 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher
sub, err := publisher.Subscribe(subscription)
require.NoError(err)
@ -372,7 +375,13 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
}
}
var topicService stream.Topic = topic("test-topic-service")
type topic string
func (t topic) String() string {
return string(t)
}
var topicService topic = "test-topic-service"
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{
@ -427,7 +436,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := stream.NewEventPublisher(ctx, newTestSnapshotHandlers(s), 0)
publisher := stream.NewEventPublisher(newTestSnapshotHandlers(s), 0)
go publisher.Run(ctx)
s.db.publisher = publisher
sub, err := publisher.Subscribe(req)
require.NoError(t, err)

View File

@ -79,7 +79,7 @@ type SnapshotAppender interface {
// A goroutine is run in the background to publish events to all subscribes.
// Cancelling the context will shutdown the goroutine, to free resources,
// and stop all publishing.
func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer),
@ -91,8 +91,6 @@ func NewEventPublisher(ctx context.Context, handlers SnapshotHandlers, snapCache
snapshotHandlers: handlers,
}
go e.handleUpdates(ctx)
return e
}
@ -103,7 +101,9 @@ func (e *EventPublisher) Publish(events []Event) {
}
}
func (e *EventPublisher) handleUpdates(ctx context.Context) {
// Run the event publisher until ctx is cancelled. Run should be called from a
// goroutine to forward events from Publish to all the appropriate subscribers.
func (e *EventPublisher) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():

View File

@ -25,7 +25,9 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, newTestSnapshotHandlers(), 0)
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx)
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
@ -123,7 +125,8 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
handlers[intTopic(22)] = fn
handlers[intTopic(33)] = fn
publisher := NewEventPublisher(ctx, handlers, time.Second)
publisher := NewEventPublisher(handlers, time.Second)
go publisher.Run(ctx)
sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)})
require.NoError(t, err)