state: serviceHealthSnapshot

refactored to remove unused return value and remove duplication
pull/8357/head
Daniel Nephin 2020-07-21 18:42:26 -04:00
parent bf523420ee
commit 09329b542d
2 changed files with 32 additions and 56 deletions

View File

@ -6,37 +6,6 @@ import (
memdb "github.com/hashicorp/go-memdb"
)
// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot
// of stream.Events that describe the current state of a service health query.
func (s *Store) ServiceHealthSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, false, nil)
if err != nil {
return 0, err
}
_, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealth)
return idx, err
}
// ServiceHealthSnapshot is a stream.SnapFn that provides a streaming snapshot
// of stream.Events that describe the current state of a service connect health
// query.
func (s *Store) ServiceHealthConnectSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, true, nil)
if err != nil {
return 0, err
}
_, err = checkServiceNodesToServiceHealth(idx, nodes, buf, TopicServiceHealthConnect)
return idx, err
}
type changeOp int
const (
@ -50,35 +19,42 @@ type eventPayload struct {
Obj interface{}
}
// checkServiceNodesToServiceHealth converts a list of structs.CheckServiceNodes
// to stream.ServiceHealth events for streaming. If a non-nil event buffer is
// passed, events are appended to the buffer one at a time and an nil slice is
// returned to avoid keeping a full copy in memory.
func checkServiceNodesToServiceHealth(idx uint64, nodes structs.CheckServiceNodes,
buf stream.SnapshotAppender, topic topic) ([]stream.Event, error) {
var events []stream.Event
for _, n := range nodes {
event := stream.Event{
Index: idx,
Topic: topic,
Payload: eventPayload{
Op: OpCreate,
Obj: &n,
},
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
// of stream.Events that describe the current state of a service health query.
//
// TODO: no tests for this yet
func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc {
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
tx := s.db.Txn(false)
defer tx.Abort()
connect := topic == TopicServiceHealthConnect
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil)
if err != nil {
return 0, err
}
if n.Service != nil {
event.Key = n.Service.Service
}
for _, n := range nodes {
event := stream.Event{
Index: idx,
Topic: topic,
Payload: eventPayload{
Op: OpCreate,
Obj: &n,
},
}
// TODO: always called with a non-nil buf?
if buf != nil {
if n.Service != nil {
event.Key = n.Service.Service
}
// TODO: could all the events be appended as a single item?
buf.Append([]stream.Event{event})
} else {
events = append(events, event)
}
return idx, err
}
return events, nil
}
type nodeServiceTuple struct {

View File

@ -192,7 +192,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: could accept a ReadTxner instead of a Store
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{
TopicServiceHealth: s.ServiceHealthSnapshot,
TopicServiceHealthConnect: s.ServiceHealthConnectSnapshot,
TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth),
TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect),
}
}