|
|
|
@ -5,6 +5,7 @@ import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"reflect"
|
|
|
|
|
"sort"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/go-bexpr"
|
|
|
|
@ -82,7 +83,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
|
|
|
|
|
return req
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter)
|
|
|
|
|
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return cache.FetchResult{}, err
|
|
|
|
|
}
|
|
|
|
@ -100,9 +101,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
|
|
|
|
|
func newMaterializer(
|
|
|
|
|
deps MaterializerDeps,
|
|
|
|
|
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
|
|
|
|
|
filter string,
|
|
|
|
|
req *structs.ServiceSpecificRequest,
|
|
|
|
|
) (*submatview.Materializer, error) {
|
|
|
|
|
view, err := newHealthView(filter)
|
|
|
|
|
view, err := newHealthView(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -139,8 +140,8 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult
|
|
|
|
|
return result, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newHealthView(filterExpr string) (*healthView, error) {
|
|
|
|
|
fe, err := newFilterEvaluator(filterExpr)
|
|
|
|
|
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
|
|
|
|
|
fe, err := newFilterEvaluator(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -192,11 +193,44 @@ type filterEvaluator interface {
|
|
|
|
|
Evaluate(datum interface{}) (bool, error)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newFilterEvaluator(expr string) (filterEvaluator, error) {
|
|
|
|
|
if expr == "" {
|
|
|
|
|
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
|
|
|
|
|
var evaluators []filterEvaluator
|
|
|
|
|
|
|
|
|
|
typ := reflect.TypeOf(structs.CheckServiceNode{})
|
|
|
|
|
if req.Filter != "" {
|
|
|
|
|
e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
evaluators = append(evaluators, e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if req.ServiceTag != "" {
|
|
|
|
|
// Handle backwards compat with old field
|
|
|
|
|
req.ServiceTags = []string{req.ServiceTag}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if req.TagFilter && len(req.ServiceTags) > 0 {
|
|
|
|
|
evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for key, value := range req.NodeMetaFilters {
|
|
|
|
|
expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key)
|
|
|
|
|
e, err := bexpr.CreateEvaluatorForType(expr, nil, typ)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
evaluators = append(evaluators, e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch len(evaluators) {
|
|
|
|
|
case 0:
|
|
|
|
|
return noopFilterEvaluator{}, nil
|
|
|
|
|
case 1:
|
|
|
|
|
return evaluators[0], nil
|
|
|
|
|
default:
|
|
|
|
|
return &multiFilterEvaluator{evaluators: evaluators}, nil
|
|
|
|
|
}
|
|
|
|
|
return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{}))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
|
|
|
|
@ -207,6 +241,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
|
|
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type multiFilterEvaluator struct {
|
|
|
|
|
evaluators []filterEvaluator
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) {
|
|
|
|
|
for _, e := range m.evaluators {
|
|
|
|
|
match, err := e.Evaluate(data)
|
|
|
|
|
if !match || err != nil {
|
|
|
|
|
return match, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sortCheckServiceNodes sorts the results to match memdb semantics
|
|
|
|
|
// Sort results by Node.Node, if 2 instances match, order by Service.ID
|
|
|
|
|
// Will allow result to be stable sorted and match queries without cache
|
|
|
|
@ -240,3 +288,34 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
|
|
|
|
|
func (s *healthView) Reset() {
|
|
|
|
|
s.state = make(map[string]structs.CheckServiceNode)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// serviceTagEvaluator implements the filterEvaluator to perform filtering
|
|
|
|
|
// by service tags. bexpr can not be used at this time, because the filtering
|
|
|
|
|
// must be case insensitive for backwards compatibility. In the future this
|
|
|
|
|
// may be replaced with bexpr once case insensitive support is added.
|
|
|
|
|
type serviceTagEvaluator struct {
|
|
|
|
|
tags []string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) {
|
|
|
|
|
csn, ok := data.(structs.CheckServiceNode)
|
|
|
|
|
if !ok {
|
|
|
|
|
return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data)
|
|
|
|
|
}
|
|
|
|
|
for _, tag := range m.tags {
|
|
|
|
|
if !serviceHasTag(csn.Service, tag) {
|
|
|
|
|
// If any one of the expected tags was not found, filter the service
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func serviceHasTag(sn *structs.NodeService, tag string) bool {
|
|
|
|
|
for _, t := range sn.Tags {
|
|
|
|
|
if strings.EqualFold(t, tag) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|