diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 0214b7ef77..f04f9f60f9 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -6,37 +6,6 @@ import ( memdb "github.com/hashicorp/go-memdb" ) -// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot -// of stream.Events that describe the current state of a service health query. -func (s *Store) ServiceHealthSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { - tx := s.db.Txn(false) - defer tx.Abort() - // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest - idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, false, nil) - if err != nil { - return 0, err - } - - _, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealth) - return idx, err -} - -// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot -// of stream.Events that describe the current state of a service connect health -// query. -func (s *Store) ServiceHealthConnectSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { - tx := s.db.Txn(false) - defer tx.Abort() - // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest - idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, true, nil) - if err != nil { - return 0, err - } - - _, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealthConnect) - return idx, err -} - type changeOp int const ( @@ -50,35 +19,42 @@ type eventPayload struct { Obj interface{} } -// checkServiceNodesToServiceHealth converts a list of structs.CheckServiceNodes -// to stream.ServiceHealth events for streaming. If a non-nil event buffer is -// passed, events are appended to the buffer one at a time and an nil slice is -// returned to avoid keeping a full copy in memory. -func checkServiceNodesToServiceHealth(idx uint64, nodes structs.CheckServiceNodes, - buf stream.SnapshotAppender, topic topic) ([]stream.Event, error) { - var events []stream.Event - for _, n := range nodes { - event := stream.Event{ - Index: idx, - Topic: topic, - Payload: eventPayload{ - Op: OpCreate, - Obj: &n, - }, +// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot +// of stream.Events that describe the current state of a service health query. +// +// TODO: no tests for this yet +func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { + return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { + tx := s.db.Txn(false) + defer tx.Abort() + + connect := topic == TopicServiceHealthConnect + // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest + idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil) + if err != nil { + return 0, err } - if n.Service != nil { - event.Key = n.Service.Service - } + for _, n := range nodes { + event := stream.Event{ + Index: idx, + Topic: topic, + Payload: eventPayload{ + Op: OpCreate, + Obj: &n, + }, + } - // TODO: always called with a non-nil buf? - if buf != nil { + if n.Service != nil { + event.Key = n.Service.Service + } + + // TODO: could all the events be appended as a single item? buf.Append([]stream.Event{event}) - } else { - events = append(events, event) } + + return idx, err } - return events, nil } type nodeServiceTuple struct { diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 8ac0cf57a2..d16faa6510 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -192,7 +192,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { // TODO: could accept a ReadTxner instead of a Store func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - TopicServiceHealth: s.ServiceHealthSnapshot, - TopicServiceHealthConnect: s.ServiceHealthConnectSnapshot, + TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth), + TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect), } }