NET-5879 - move the filter for non-passing to occur in the health RPC layer rather than the callers of the RPC (#21098)

* NET-5879 - move the filter for non-passing to occur in the health RPC layer rather than the callers of the RPC

* fix import of slices

* fix test
pull/21107/head
John Murret 6 months ago committed by GitHub
parent 48df56f7d2
commit a975b04302
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -25,6 +25,12 @@ import (
"github.com/hashicorp/consul/testrpc"
)
func addQueryParam(req *http.Request, param, value string) {
q := req.URL.Query()
q.Add(param, value)
req.URL.RawQuery = q.Encode()
}
func TestCatalogEndpointsFailInV2(t *testing.T) {
t.Parallel()

@ -301,7 +301,8 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
if err != nil {
return err
}
thisReply.Nodes = raw.(structs.CheckServiceNodes)
filteredNodes := raw.(structs.CheckServiceNodes)
thisReply.Nodes = filteredNodes.Filter(structs.CheckServiceNodeFilterOptions{FilterType: args.HealthFilterType})
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include

@ -562,8 +562,14 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
}
// Filter out any unhealthy nodes.
nodes = nodes.FilterIgnore(query.Service.OnlyPassing,
query.Service.IgnoreCheckIDs)
filterType := structs.HealthFilterExcludeCritical
if query.Service.OnlyPassing {
filterType = structs.HealthFilterIncludeOnlyPassing
}
nodes = nodes.Filter(structs.CheckServiceNodeFilterOptions{FilterType: filterType,
IgnoreCheckIDs: query.Service.IgnoreCheckIDs})
// Apply the node metadata filters, if any.
if len(query.Service.NodeMeta) > 0 {

@ -573,14 +573,21 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa
if req.Tag != "" {
serviceTags = []string{req.Tag}
}
healthFilterType := structs.HealthFilterExcludeCritical
if cfg.OnlyPassing {
healthFilterType = structs.HealthFilterIncludeOnlyPassing
}
args := structs.ServiceSpecificRequest{
PeerName: req.Tenancy.Peer,
Connect: lookupType == LookupTypeConnect,
Ingress: lookupType == LookupTypeIngress,
Datacenter: datacenter,
ServiceName: req.Name,
ServiceTags: serviceTags,
TagFilter: req.Tag != "",
PeerName: req.Tenancy.Peer,
Connect: lookupType == LookupTypeConnect,
Ingress: lookupType == LookupTypeIngress,
Datacenter: datacenter,
ServiceName: req.Name,
ServiceTags: serviceTags,
TagFilter: req.Tag != "",
HealthFilterType: healthFilterType,
QueryOptions: structs.QueryOptions{
Token: ctx.Token,
AllowStale: cfg.AllowStale,
@ -604,15 +611,6 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa
return nil, ErrNotFound
}
// Filter out any service nodes due to health checks
// We copy the slice to avoid modifying the result if it comes from the cache
nodes := make(structs.CheckServiceNodes, len(out.Nodes))
copy(nodes, out.Nodes)
out.Nodes = nodes.Filter(cfg.OnlyPassing)
if err != nil {
return nil, fmt.Errorf("rpc request failed: %w", err)
}
// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
return nil, ErrNotFound

@ -1450,14 +1450,19 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st
if lookup.Tag != "" {
serviceTags = []string{lookup.Tag}
}
healthFilterType := structs.HealthFilterExcludeCritical
if cfg.OnlyPassing {
healthFilterType = structs.HealthFilterIncludeOnlyPassing
}
args := structs.ServiceSpecificRequest{
PeerName: lookup.PeerName,
Connect: lookup.Connect,
Ingress: lookup.Ingress,
Datacenter: lookup.Datacenter,
ServiceName: lookup.Service,
ServiceTags: serviceTags,
TagFilter: lookup.Tag != "",
PeerName: lookup.PeerName,
Connect: lookup.Connect,
Ingress: lookup.Ingress,
Datacenter: lookup.Datacenter,
ServiceName: lookup.Service,
ServiceTags: serviceTags,
TagFilter: lookup.Tag != "",
HealthFilterType: healthFilterType,
QueryOptions: structs.QueryOptions{
Token: d.coalesceDNSToken(),
AllowStale: cfg.AllowStale,
@ -1473,11 +1478,6 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st
return out, err
}
// Filter out any service nodes due to health checks
// We copy the slice to avoid modifying the result if it comes from the cache
nodes := make(structs.CheckServiceNodes, len(out.Nodes))
copy(nodes, out.Nodes)
out.Nodes = nodes.Filter(cfg.OnlyPassing)
return out, nil
}

@ -214,12 +214,24 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
prefix = "/v1/health/service/"
}
// Pull out the service name
// Parse out the service name from the query params
args.ServiceName = strings.TrimPrefix(req.URL.Path, prefix)
if args.ServiceName == "" {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"}
}
// Parse the passing flag from the query params and use to set the health filter type
// to HealthFilterIncludeOnlyPassing if it is present. Otherwise, do not filter by health.
passing, err := getBoolQueryParam(params, api.HealthPassing)
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Invalid value for ?passing"}
}
healthFilterType := structs.HealthFilterIncludeAll
if passing {
healthFilterType = structs.HealthFilterIncludeOnlyPassing
}
args.HealthFilterType = healthFilterType
out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
if err != nil {
return nil, err
@ -229,19 +241,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
setCacheMeta(resp, &md)
}
out.QueryMeta.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
setMeta(resp, &out.QueryMeta)
// FIXME: argument parsing should be done before performing the rpc
// Filter to only passing if specified
filter, err := getBoolQueryParam(params, api.HealthPassing)
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Invalid value for ?passing"}
}
// FIXME: remove filterNonPassing, replace with nodes.Filter, which is used by DNSServer
if filter {
out.Nodes = filterNonPassing(out.Nodes)
}
_ = setMeta(resp, &out.QueryMeta)
// Translate addresses after filtering so we don't waste effort.
s.agent.TranslateAddresses(args.Datacenter, out.Nodes, dnsutil.TranslateAddressAcceptAny)
@ -290,25 +290,3 @@ func getBoolQueryParam(params url.Values, key string) (bool, error) {
}
return param, nil
}
// filterNonPassing is used to filter out any nodes that have check that are not passing
func filterNonPassing(nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
n := len(nodes)
// Make a copy of the cached nodes rather than operating on the cache directly
out := append(nodes[:0:0], nodes...)
OUTER:
for i := 0; i < n; i++ {
node := out[i]
for _, check := range node.Checks {
if check.Status != api.HealthPassing {
out[i], out[n-1] = out[n-1], structs.CheckServiceNode{}
n--
i--
continue OUTER
}
}
}
return out[:n]
}

@ -9,7 +9,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strconv"
"strings"
"testing"
@ -627,13 +626,54 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) {
})
}
type queryBackendConfiguration struct {
name string
config string
cached bool
queryBackend string
}
var queryBackendConfigs = []*queryBackendConfiguration{
{
name: "blocking-query-without-cache",
config: `use_streaming_backend=false`,
cached: false,
queryBackend: "blocking-query",
},
{
name: "blocking-query-with-cache",
config: `use_streaming_backend=false`,
cached: true,
queryBackend: "blocking-query",
},
{
name: "streaming-with-cache-setting",
config: `
rpc{
enable_streaming=true
}
use_streaming_backend=true
`,
cached: true,
queryBackend: "streaming",
},
}
func TestHealthServiceNodes(t *testing.T) {
for _, cfg := range queryBackendConfigs {
t.Run(cfg.name, func(t *testing.T) {
testHealthServiceNodes(t, cfg)
})
}
}
func testHealthServiceNodes(t *testing.T, backendCfg *queryBackendConfiguration) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
a := StartTestAgent(t, TestAgent{HCL: backendCfg.config, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -718,30 +758,44 @@ func TestHealthServiceNodes(t *testing.T) {
// Test caching
{
// List instances with cache enabled
req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil)
req, err := http.NewRequest("GET", "/v1/health/service/test?"+peerQuerySuffix(peerName), nil)
require.NoError(t, err)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
require.NoError(t, err)
nodes := obj.(structs.CheckServiceNodes)
verify(t, peerName, nodes)
// Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
if backendCfg.cached {
// Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
}
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
}
{
// List instances with cache enabled
req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil)
req, err := http.NewRequest("GET", "/v1/health/service/test?"+peerQuerySuffix(peerName), nil)
require.NoError(t, err)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
require.NoError(t, err)
nodes := obj.(structs.CheckServiceNodes)
verify(t, peerName, nodes)
// Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
if backendCfg.cached {
// Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
}
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
}
}
@ -761,8 +815,11 @@ func TestHealthServiceNodes(t *testing.T) {
for _, peerName := range testingPeerNames {
retry.Run(t, func(r *retry.R) {
// List it again
req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil)
req, err := http.NewRequest("GET", "/v1/health/service/test?"+peerQuerySuffix(peerName), nil)
require.NoError(r, err)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
require.NoError(r, err)
@ -780,9 +837,15 @@ func TestHealthServiceNodes(t *testing.T) {
// Should be a cache hit! The data should've updated in the cache
// in the background so this should've been fetched directly from
// the cache.
if resp.Header().Get("X-Cache") != "HIT" {
r.Fatalf("should be a cache hit")
if backendCfg.cached {
// Should be a cache hit! The data should've updated in the cache
// in the background so this should've been fetched directly from
// the cache.
if resp.Header().Get("X-Cache") != "HIT" {
r.Fatalf("should be a cache hit")
}
}
require.Equal(r, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
}
}
@ -1237,12 +1300,20 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
}
func TestHealthServiceNodes_Filter(t *testing.T) {
for _, cfg := range queryBackendConfigs {
t.Run(cfg.name, func(t *testing.T) {
testHealthServiceNodes_Filter(t, cfg)
})
}
}
func testHealthServiceNodes_Filter(t *testing.T, backendCfg *queryBackendConfiguration) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
a := NewTestAgent(t, backendCfg.config)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -1290,6 +1361,9 @@ func TestHealthServiceNodes_Filter(t *testing.T) {
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
req, _ = http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&filter="+url.QueryEscape("Node.Node == `test-health-node`"), nil)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp = httptest.NewRecorder()
obj, err = a.srv.HealthServiceNodes(resp, req)
require.NoError(t, err)
@ -1300,6 +1374,8 @@ func TestHealthServiceNodes_Filter(t *testing.T) {
nodes = obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 1)
require.Len(t, nodes[0].Checks, 1)
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
}
func TestHealthServiceNodes_DistanceSort(t *testing.T) {
@ -1386,12 +1462,20 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) {
}
func TestHealthServiceNodes_PassingFilter(t *testing.T) {
for _, cfg := range queryBackendConfigs {
t.Run(cfg.name, func(t *testing.T) {
testHealthServiceNodes_PassingFilter(t, cfg)
})
}
}
func testHealthServiceNodes_PassingFilter(t *testing.T, backendCfg *queryBackendConfiguration) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
a := NewTestAgent(t, backendCfg.config)
defer a.Shutdown()
dc := "dc1"
@ -1417,6 +1501,9 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) {
t.Run("bc_no_query_value", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/health/service/consul?passing", nil)
if backendCfg.cached {
addQueryParam(req, "cached", "")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
if err != nil {
@ -1428,12 +1515,16 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) {
// Should be 0 health check for consul
nodes := obj.(structs.CheckServiceNodes)
if len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
t.Fatalf("bad: %v", nodes)
}
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
t.Run("passing_true", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/health/service/consul?passing=true", nil)
if backendCfg.cached {
addQueryParam(req, "cached", "")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
if err != nil {
@ -1447,10 +1538,14 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) {
if len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
t.Run("passing_false", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/health/service/consul?passing=false", nil)
if backendCfg.cached {
addQueryParam(req, "cached", "")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
if err != nil {
@ -1465,16 +1560,21 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) {
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
}
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
t.Run("passing_bad", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/health/service/consul?passing=nope-nope-nope", nil)
if backendCfg.cached {
addQueryParam(req, "cached", "")
}
resp := httptest.NewRecorder()
_, err := a.srv.HealthServiceNodes(resp, req)
require.True(t, isHTTPBadRequest(err), fmt.Sprintf("Expected bad request HTTP error but got %v", err))
if !strings.Contains(err.Error(), "Invalid value for ?passing") {
t.Errorf("bad %s", err.Error())
}
require.Equal(t, "", resp.Header().Get("X-Consul-Query-Backend"))
})
}
@ -1626,13 +1726,21 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) {
}
func TestHealthConnectServiceNodes(t *testing.T) {
for _, cfg := range queryBackendConfigs {
t.Run(cfg.name, func(t *testing.T) {
testHealthConnectServiceNodes(t, cfg)
})
}
}
func testHealthConnectServiceNodes(t *testing.T, backendCfg *queryBackendConfiguration) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
a := NewTestAgent(t, backendCfg.config)
defer a.Shutdown()
// Register
@ -1643,6 +1751,9 @@ func TestHealthConnectServiceNodes(t *testing.T) {
// Request
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?dc=dc1", args.Service.Proxy.DestinationServiceName), nil)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(t, err)
@ -1771,13 +1882,21 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
}
func TestHealthConnectServiceNodes_Filter(t *testing.T) {
for _, cfg := range queryBackendConfigs {
t.Run(cfg.name, func(t *testing.T) {
testHealthConnectServiceNodes_Filter(t, cfg)
})
}
}
func testHealthConnectServiceNodes_Filter(t *testing.T, backendCfg *queryBackendConfiguration) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
a := NewTestAgent(t, backendCfg.config)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -1788,6 +1907,12 @@ func TestHealthConnectServiceNodes_Filter(t *testing.T) {
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
args = structs.TestRegisterRequestProxy(t)
// when using streaming these come back as empty slices rather than nil.
// rather than changing structs.TestRegisterRequestProxy(t) and the many places
// it is referenced without using caching, we'll just set them to empty slices here.
args.Service.Proxy.EnvoyExtensions = []structs.EnvoyExtension{}
args.Service.Proxy.Expose.Paths = []structs.ExposePath{}
args.Service.Address = "127.0.0.55"
args.Service.Meta = map[string]string{
"version": "2",
@ -1800,6 +1925,9 @@ func TestHealthConnectServiceNodes_Filter(t *testing.T) {
"/v1/health/connect/%s?filter=%s",
args.Service.Proxy.DestinationServiceName,
url.QueryEscape("Service.Meta.version == 2")), nil)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
require.NoError(t, err)
@ -1810,16 +1938,26 @@ func TestHealthConnectServiceNodes_Filter(t *testing.T) {
require.Equal(t, structs.ServiceKindConnectProxy, nodes[0].Service.Kind)
require.Equal(t, args.Service.Address, nodes[0].Service.Address)
require.Equal(t, args.Service.Proxy, nodes[0].Service.Proxy)
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
}
func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
for _, cfg := range queryBackendConfigs {
t.Run(cfg.name, func(t *testing.T) {
testHealthConnectServiceNodes_PassingFilter(t, cfg)
})
}
}
func testHealthConnectServiceNodes_PassingFilter(t *testing.T, backendCfg *queryBackendConfiguration) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
a := NewTestAgent(t, backendCfg.config)
defer a.Shutdown()
// Register
@ -1836,6 +1974,9 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
t.Run("bc_no_query_value", func(t *testing.T) {
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing", args.Service.Proxy.DestinationServiceName), nil)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(t, err)
@ -1844,11 +1985,16 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
// Should be 0 health check for consul
nodes := obj.(structs.CheckServiceNodes)
assert.Len(t, nodes, 0)
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
t.Run("passing_true", func(t *testing.T) {
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing=true", args.Service.Proxy.DestinationServiceName), nil)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(t, err)
@ -1857,11 +2003,16 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
// Should be 0 health check for consul
nodes := obj.(structs.CheckServiceNodes)
assert.Len(t, nodes, 0)
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
t.Run("passing_false", func(t *testing.T) {
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing=false", args.Service.Proxy.DestinationServiceName), nil)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(t, err)
@ -1870,57 +2021,26 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
// Should be 1
nodes := obj.(structs.CheckServiceNodes)
assert.Len(t, nodes, 1)
require.Equal(t, backendCfg.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
t.Run("passing_bad", func(t *testing.T) {
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing=nope-nope", args.Service.Proxy.DestinationServiceName), nil)
if backendCfg.cached {
addQueryParam(req, "cached", "true")
}
resp := httptest.NewRecorder()
_, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.NotNil(t, err)
assert.True(t, isHTTPBadRequest(err))
assert.True(t, strings.Contains(err.Error(), "Invalid value for ?passing"))
require.Equal(t, "", resp.Header().Get("X-Consul-Query-Backend"))
})
}
func TestFilterNonPassing(t *testing.T) {
t.Parallel()
nodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Checks: structs.HealthChecks{
&structs.HealthCheck{
Status: api.HealthCritical,
},
&structs.HealthCheck{
Status: api.HealthCritical,
},
},
},
structs.CheckServiceNode{
Checks: structs.HealthChecks{
&structs.HealthCheck{
Status: api.HealthCritical,
},
&structs.HealthCheck{
Status: api.HealthCritical,
},
},
},
structs.CheckServiceNode{
Checks: structs.HealthChecks{
&structs.HealthCheck{
Status: api.HealthPassing,
},
},
},
}
out := filterNonPassing(nodes)
if len(out) != 1 && reflect.DeepEqual(out[0], nodes[2]) {
t.Fatalf("bad: %v", out)
}
}
func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

@ -128,7 +128,8 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
if err != nil {
return 0, nil, err
}
thisReply.Nodes = raw.(structs.CheckServiceNodes)
filteredNodes := raw.(structs.CheckServiceNodes)
thisReply.Nodes = filteredNodes.Filter(structs.CheckServiceNodeFilterOptions{FilterType: args.HealthFilterType})
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include

@ -46,10 +46,11 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
return nil, err
}
return &HealthView{
state: make(map[string]structs.CheckServiceNode),
filter: fe,
connect: req.Connect,
kind: req.ServiceKind,
state: make(map[string]structs.CheckServiceNode),
filter: fe,
connect: req.Connect,
kind: req.ServiceKind,
healthFilterType: req.HealthFilterType,
}, nil
}
@ -61,10 +62,11 @@ var _ submatview.View = (*HealthView)(nil)
// (IndexedCheckServiceNodes) and update it in place for each event - that
// involves re-sorting each time etc. though.
type HealthView struct {
connect bool
kind structs.ServiceKind
state map[string]structs.CheckServiceNode
filter filterEvaluator
connect bool
kind structs.ServiceKind
state map[string]structs.CheckServiceNode
filter filterEvaluator
healthFilterType structs.HealthFilterType
}
// Update implements View
@ -87,6 +89,11 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error {
return errors.New("check service node was unexpectedly nil")
}
if csn.ExcludeBasedOnChecks(structs.CheckServiceNodeFilterOptions{FilterType: s.healthFilterType}) {
delete(s.state, id)
continue
}
// check if we intentionally need to skip the filter
if s.skipFilter(csn) {
s.state[id] = *csn

@ -13,6 +13,7 @@ import (
"os"
"reflect"
"regexp"
"slices"
"sort"
"strconv"
"strings"
@ -757,11 +758,12 @@ type ServiceSpecificRequest struct {
ServiceKind ServiceKind
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
// with 1.2.x is not required.
ServiceTag string
ServiceTags []string
ServiceAddress string
TagFilter bool // Controls tag filtering
Source QuerySource
ServiceTag string
ServiceTags []string
ServiceAddress string
TagFilter bool // Controls tag filtering
HealthFilterType HealthFilterType
Source QuerySource
// Connect if true will only search for Connect-compatible services.
Connect bool
@ -822,6 +824,7 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo {
r.Ingress,
r.ServiceKind,
r.MergeCentralConfig,
r.HealthFilterType,
}, nil)
if err == nil {
// If there is an error, we don't set the key. A blank key forces
@ -2122,6 +2125,19 @@ func (csn *CheckServiceNode) Locality() *Locality {
return nil
}
func (csn *CheckServiceNode) ExcludeBasedOnChecks(opts CheckServiceNodeFilterOptions) bool {
for _, check := range csn.Checks {
if slices.Contains(opts.IgnoreCheckIDs, check.CheckID) {
// Skip this _check_ but keep looking at other checks for this node.
continue
}
if opts.FilterType.ExcludeBasedOnStatus(check.Status) {
return true
}
}
return false
}
type CheckServiceNodes []CheckServiceNode
func (csns CheckServiceNodes) DeepCopy() CheckServiceNodes {
@ -2161,39 +2177,42 @@ func (nodes CheckServiceNodes) ShallowClone() CheckServiceNodes {
return dup
}
// HealthFilterType is used to filter nodes based on their health status.
type HealthFilterType int32
func (h HealthFilterType) ExcludeBasedOnStatus(status string) bool {
switch {
case h == HealthFilterExcludeCritical && status == api.HealthCritical:
return true
case h == HealthFilterIncludeOnlyPassing && status != api.HealthPassing:
return true
}
return false
}
// These are listed from most to least inclusive.
const (
HealthFilterIncludeAll HealthFilterType = 0
HealthFilterExcludeCritical HealthFilterType = 1
HealthFilterIncludeOnlyPassing HealthFilterType = 2
)
type CheckServiceNodeFilterOptions struct {
FilterType HealthFilterType
IgnoreCheckIDs []types.CheckID
disableReceiverModification bool
}
// Filter removes nodes that are failing health checks (and any non-passing
// check if that option is selected). Note that this returns the filtered
// results AND modifies the receiver for performance.
func (nodes CheckServiceNodes) Filter(onlyPassing bool) CheckServiceNodes {
return nodes.FilterIgnore(onlyPassing, nil)
}
// FilterIgnore removes nodes that are failing health checks just like Filter.
// It also ignores the status of any check with an ID present in ignoreCheckIDs
// as if that check didn't exist. Note that this returns the filtered results
// AND modifies the receiver for performance.
func (nodes CheckServiceNodes) FilterIgnore(onlyPassing bool,
ignoreCheckIDs []types.CheckID) CheckServiceNodes {
func (nodes CheckServiceNodes) Filter(opts CheckServiceNodeFilterOptions) CheckServiceNodes {
n := len(nodes)
OUTER:
for i := 0; i < n; i++ {
node := nodes[i]
INNER:
for _, check := range node.Checks {
for _, ignore := range ignoreCheckIDs {
if check.CheckID == ignore {
// Skip this _check_ but keep looking at other checks for this node.
continue INNER
}
}
if check.Status == api.HealthCritical ||
(onlyPassing && check.Status != api.HealthPassing) {
nodes[i], nodes[n-1] = nodes[n-1], CheckServiceNode{}
n--
i--
// Skip this _node_ now we've swapped it off the end of the list.
continue OUTER
}
if nodes[i].ExcludeBasedOnChecks(opts) {
nodes[i], nodes[n-1] = nodes[n-1], CheckServiceNode{}
n--
i--
}
}
return nodes[:n]

@ -1725,7 +1725,7 @@ func TestCheckServiceNodes_Filter(t *testing.T) {
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
filtered := twiddle.Filter(false)
filtered := twiddle.Filter(CheckServiceNodeFilterOptions{FilterType: HealthFilterExcludeCritical})
expected := CheckServiceNodes{
nodes[0],
nodes[1],
@ -1741,7 +1741,7 @@ func TestCheckServiceNodes_Filter(t *testing.T) {
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
filtered := twiddle.Filter(true)
filtered := twiddle.Filter(CheckServiceNodeFilterOptions{FilterType: HealthFilterIncludeOnlyPassing})
expected := CheckServiceNodes{
nodes[1],
}
@ -1757,7 +1757,7 @@ func TestCheckServiceNodes_Filter(t *testing.T) {
if n := copy(twiddle, nodes); n != len(nodes) {
t.Fatalf("bad: %d", n)
}
filtered := twiddle.FilterIgnore(true, []types.CheckID{""})
filtered := twiddle.Filter(CheckServiceNodeFilterOptions{FilterType: HealthFilterIncludeOnlyPassing, IgnoreCheckIDs: []types.CheckID{""}})
expected := CheckServiceNodes{
nodes[0],
nodes[1],

@ -16,6 +16,11 @@ upgrade flow.
## Consul v1.19.x
### Health endpoint status filtering is now processed on the server side when using client agents
In previous versions of Consul, client agents responded to health queries by requesting all results from the Consul servers and then locally filtering out service nodes with a `critical` status. The client agent also processed the `?passing` parameter by filtering node results locally. This process was not resource efficient in large deployments because large numbers of services and health check results must propagate to many clients before Consul can return a healthy node.
In this release, the Consul server is responsible for filtering services according to their health status before it sends the data to a client agent. When upgrading the version of Consul your deployment runs to this release, you must upgrade the Consul servers before you upgrade the client agents. We recommend this upgrade order to avoid a scenario where Consul returns results for an unhealthy service because the client agent no longer filters nodes but the servers do not yet understand the `?passing` query argument.
### Metrics removal
In previous versions, Consul emitted redundant state store usage metrics that contained two instances of `consul` in the metric name. As an example, config entry usage counts were emitted as both:

Loading…
Cancel
Save