Merge pull request #8987 from hashicorp/dnephin/stream-filter

streaming: apply filter to a single item
pull/8974/head
Daniel Nephin 2020-10-27 16:39:43 -04:00 committed by GitHub
commit 0f81915495
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 38 additions and 21 deletions

View File

@ -3,6 +3,7 @@ package cachetype
import (
"context"
"fmt"
"reflect"
"time"
"github.com/hashicorp/go-bexpr"
@ -138,15 +139,14 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult
}
func newHealthView(filterExpr string) (*healthView, error) {
s := &healthView{state: make(map[string]structs.CheckServiceNode)}
// We apply filtering to the raw CheckServiceNodes before we are done mutating
// state in Update to save from storing stuff in memory we'll only filter
// later. Because the state is just a map of those types, we can simply run
// that map through filter and it will remove any entries that don't match.
var err error
s.filter, err = bexpr.CreateFilter(filterExpr, nil, s.state)
return s, err
fe, err := newFilterEvaluator(filterExpr)
if err != nil {
return nil, err
}
return &healthView{
state: make(map[string]structs.CheckServiceNode),
filter: fe,
}, nil
}
// healthView implements submatview.View for storing the view state
@ -156,7 +156,7 @@ func newHealthView(filterExpr string) (*healthView, error) {
// involves re-sorting each time etc. though.
type healthView struct {
state map[string]structs.CheckServiceNode
filter *bexpr.Filter
filter filterEvaluator
}
// Update implements View
@ -171,24 +171,41 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error {
id := serviceHealth.CheckServiceNode.UniqueID()
switch serviceHealth.Op {
case pbsubscribe.CatalogOp_Register:
csn := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode)
s.state[id] = *csn
csn := *pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode)
passed, err := s.filter.Evaluate(csn)
switch {
case err != nil:
return err
case passed:
s.state[id] = csn
}
case pbsubscribe.CatalogOp_Deregister:
delete(s.state, id)
}
}
// TODO(streaming): should this filter be applied to only the new CheckServiceNode
// instead of the full map, which should already be filtered.
if s.filter != nil {
filtered, err := s.filter.Execute(s.state)
if err != nil {
return err
}
s.state = filtered.(map[string]structs.CheckServiceNode)
}
return nil
}
type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error)
}
func newFilterEvaluator(expr string) (filterEvaluator, error) {
if expr == "" {
return noopFilterEvaluator{}, nil
}
return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{}))
}
// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
// method always return true, so no items will be filtered out.
type noopFilterEvaluator struct{}
func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
return true, nil
}
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
func (s *healthView) Result(index uint64) (interface{}, error) {
result := structs.IndexedCheckServiceNodes{