test: fix flaky TestHealthServiceNodes_NodeMetaFilter by waiting until the streaming subsystem has a valid grpc connection

Also potentially unflakes TestHealthIngressServiceNodes for similar
reasons.
pull/15019/head
R.B. Boyer 2 years ago
parent fe2d41ddad
commit 62688107af

@ -1128,76 +1128,84 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
} }
for _, tst := range tests { for _, tst := range tests {
t.Run(tst.name, func(t *testing.T) { t.Run(tst.name, func(t *testing.T) {
a := NewTestAgent(t, tst.config) a := NewTestAgent(t, tst.config)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1") testrpc.WaitForLeader(t, a.RPC, "dc1")
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
waitForStreamingToBeReady(t, a)
req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil) encodedMeta := url.QueryEscape("somekey:somevalue")
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp) var lastIndex uint64
testutil.RunStep(t, "do initial read", func(t *testing.T) {
u := fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=%s", encodedMeta)
cIndex, err := strconv.ParseUint(resp.Header().Get("X-Consul-Index"), 10, 64) req, err := http.NewRequest("GET", u, nil)
require.NoError(t, err) require.NoError(t, err)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
require.NoError(t, err)
// Should be a non-nil empty list lastIndex = getIndex(t, resp)
nodes := obj.(structs.CheckServiceNodes) require.True(t, lastIndex > 0)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
args := &structs.RegisterRequest{ // Should be a non-nil empty list
Datacenter: "dc1", nodes := obj.(structs.CheckServiceNodes)
Node: "bar", require.NotNil(t, nodes)
Address: "127.0.0.1", require.Empty(t, nodes)
NodeMeta: map[string]string{"somekey": "somevalue"}, })
Service: &structs.NodeService{
ID: "test",
Service: "test",
},
}
var out struct{} require.True(t, lastIndex > 0, "lastindex = %d", lastIndex)
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
args = &structs.RegisterRequest{ testutil.RunStep(t, "register item 1", func(t *testing.T) {
Datacenter: "dc1", args := &structs.RegisterRequest{
Node: "bar2", Datacenter: "dc1",
Address: "127.0.0.1", Node: "bar",
NodeMeta: map[string]string{"somekey": "othervalue"}, Address: "127.0.0.1",
Service: &structs.NodeService{ NodeMeta: map[string]string{"somekey": "somevalue"},
ID: "test2", Service: &structs.NodeService{
Service: "test", ID: "test",
}, Service: "test",
} },
}
if err := a.RPC("Catalog.Register", args, &out); err != nil { var ignored struct{}
t.Fatalf("err: %v", err) require.NoError(t, a.RPC("Catalog.Register", args, &ignored))
} })
req, _ = http.NewRequest("GET", fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue&index=%d&wait=10ms", cIndex), nil) testutil.RunStep(t, "register item 2", func(t *testing.T) {
resp = httptest.NewRecorder() args := &structs.RegisterRequest{
obj, err = a.srv.HealthServiceNodes(resp, req) Datacenter: "dc1",
if err != nil { Node: "bar2",
t.Fatalf("err: %v", err) Address: "127.0.0.1",
} NodeMeta: map[string]string{"somekey": "othervalue"},
Service: &structs.NodeService{
ID: "test2",
Service: "test",
},
}
var ignored struct{}
require.NoError(t, a.RPC("Catalog.Register", args, &ignored))
})
assertIndex(t, resp) testutil.RunStep(t, "do blocking read", func(t *testing.T) {
u := fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=%s&index=%d&wait=100ms&cached", encodedMeta, lastIndex)
// Should be a non-nil empty list for checks req, err := http.NewRequest("GET", u, nil)
nodes = obj.(structs.CheckServiceNodes) require.NoError(t, err)
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { resp := httptest.NewRecorder()
t.Fatalf("bad: %v", obj) obj, err := a.srv.HealthServiceNodes(resp, req)
} require.NoError(t, err)
assertIndex(t, resp)
// Should be a non-nil empty list for checks
nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 1)
require.NotNil(t, nodes[0].Checks)
require.Empty(t, nodes[0].Checks)
require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
}) })
} }
} }
@ -1637,6 +1645,7 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
a := NewTestAgent(t, agentHCL) a := NewTestAgent(t, agentHCL)
defer a.Shutdown() defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1") testrpc.WaitForLeader(t, a.RPC, "dc1")
waitForStreamingToBeReady(t, a)
// Register gateway // Register gateway
gatewayArgs := structs.TestRegisterIngressGateway(t) gatewayArgs := structs.TestRegisterIngressGateway(t)
@ -2038,3 +2047,9 @@ func peerQuerySuffix(peerName string) string {
} }
return "&peer=" + peerName return "&peer=" + peerName
} }
func waitForStreamingToBeReady(t *testing.T, a *TestAgent) {
retry.Run(t, func(r *retry.R) {
require.True(r, a.rpcClientHealth.IsReadyForStreaming())
})
}

@ -3,6 +3,8 @@ package health
import ( import (
"context" "context"
"google.golang.org/grpc/connectivity"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/submatview"
@ -34,6 +36,16 @@ type MaterializedViewStore interface {
NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error
} }
// IsReadyForStreaming will indicate if the underlying gRPC connection is ready.
func (c *Client) IsReadyForStreaming() bool {
conn := c.MaterializerDeps.Conn
if conn == nil {
return false
}
return conn.GetState() == connectivity.Ready
}
func (c *Client) ServiceNodes( func (c *Client) ServiceNodes(
ctx context.Context, ctx context.Context,
req structs.ServiceSpecificRequest, req structs.ServiceSpecificRequest,

Loading…
Cancel
Save