agent: cache notifications work after error if the underlying RPC returns index=1 (#6547)

Fixes #6521

Ensure that initial failures to fetch an agent cache entry using the
notify API where the underlying RPC returns a synthetic index of 1
correctly recovers when those RPCs resume working.

The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the
index for the next query from 0 to 1 for all queries, when it should
have not done so for queries that errored.

Also fixed some things that made debugging difficult:

- config entry read/list endpoints send back QueryMeta headers
- xds event loops don't swallow the cache notification errors
pull/6558/head
R.B. Boyer 2019-09-26 10:42:17 -05:00 committed by GitHub
parent 76cf54068b
commit 9566df524e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 127 additions and 18 deletions

View File

@ -20,6 +20,8 @@ import (
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
@ -4011,3 +4013,99 @@ func TestAgent_RerouteNewHTTPChecks(t *testing.T) {
}
})
}
func TestAgentCache_serviceInConfigFile_initialFetchErrors_Issue6521(t *testing.T) {
t.Parallel()
// Ensure that initial failures to fetch the discovery chain via the agent
// cache using the notify API for a service with no config entries
// correctly recovers when those RPCs resume working. The key here is that
// the lack of config entries guarantees that the RPC will come back with a
// synthetic index of 1.
//
// The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the
// index for the next query from 0 to 1 for all queries, when it should
// have not done so for queries that errored.
a1 := NewTestAgent(t, t.Name()+"-a1", "")
defer a1.Shutdown()
testrpc.WaitForLeader(t, a1.RPC, "dc1")
a2 := NewTestAgent(t, t.Name()+"-a2", `
server = false
bootstrap = false
services {
name = "echo-client"
port = 8080
connect {
sidecar_service {
proxy {
upstreams {
destination_name = "echo"
local_bind_port = 9191
}
}
}
}
}
services {
name = "echo"
port = 9090
connect {
sidecar_service {}
}
}
`)
defer a2.Shutdown()
// Starting a client agent disconnected from a server with services.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan cache.UpdateEvent, 1)
require.NoError(t, a2.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: "dc1",
Name: "echo",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
}, "foo", ch))
{ // The first event is an error because we are not joined yet.
evt := <-ch
require.Equal(t, "foo", evt.CorrelationID)
require.Nil(t, evt.Result)
require.Error(t, evt.Err)
require.Equal(t, evt.Err, structs.ErrNoServers)
}
t.Logf("joining client to server")
// Now connect to server
_, err := a1.JoinLAN([]string{
fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN),
})
require.NoError(t, err)
t.Logf("joined client to server")
deadlineCh := time.After(10 * time.Second)
start := time.Now()
LOOP:
for {
select {
case evt := <-ch:
// We may receive several notifications of an error until we get the
// first successful reply.
require.Equal(t, "foo", evt.CorrelationID)
if evt.Err != nil {
break LOOP
}
require.NoError(t, evt.Err)
require.NotNil(t, evt.Result)
t.Logf("took %s to get first success", time.Since(start))
case <-deadlineCh:
t.Fatal("did not get notified successfully")
}
}
}

View File

@ -126,7 +126,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
}
}
// Sanity check we always request blocking on second pass
if index < 1 {
if err == nil && index < 1 {
index = 1
}
}

View File

@ -33,8 +33,9 @@ func TestCacheNotify(t *testing.T) {
// initially.
typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once()
// Configure the type
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
// Configure the type. The first time we use the fake index of "1" to verify we
// don't regress on https://github.com/hashicorp/consul/issues/6521 .
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
// Assert the right request type - all real Fetch implementations do this so
// it keeps us honest that Watch doesn't require type mangling which will
// break in real life (hint: it did on the first attempt)
@ -79,7 +80,7 @@ func TestCacheNotify(t *testing.T) {
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test",
Result: 1,
Meta: ResultMeta{Hit: false, Index: 4},
Meta: ResultMeta{Hit: false, Index: 1},
Err: nil,
})

View File

@ -42,6 +42,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int
if err := s.agent.RPC("ConfigEntry.Get", &args, &reply); err != nil {
return nil, err
}
setMeta(resp, &reply.QueryMeta)
if reply.Entry == nil {
return nil, NotFoundError{Reason: fmt.Sprintf("Config entry not found for %q / %q", pathArgs[0], pathArgs[1])}
@ -56,6 +57,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int
if err := s.agent.RPC("ConfigEntry.List", &args, &reply); err != nil {
return nil, err
}
setMeta(resp, &reply.QueryMeta)
return reply.Entries, nil
default:

View File

@ -470,18 +470,22 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
}
func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
switch {
case u.CorrelationID == rootsWatchID:
roots, ok := u.Result.(*structs.IndexedCARoots)
if !ok {
return fmt.Errorf("invalid type for roots response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
snap.Roots = roots
case u.CorrelationID == leafWatchID:
leaf, ok := u.Result.(*structs.IssuedCert)
if !ok {
return fmt.Errorf("invalid type for leaf response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
snap.ConnectProxy.Leaf = leaf
@ -491,7 +495,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain
@ -503,7 +507,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
targetID, svc, ok := removeColonPrefix(correlationID)
@ -521,7 +525,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
dc, svc, ok := removeColonPrefix(correlationID)
@ -538,7 +542,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix)
snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes
@ -546,7 +550,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
if !ok {
return fmt.Errorf("invalid type for prepared query response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes
@ -560,7 +564,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
default:
return errors.New("unknown correlation ID")
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
}
return nil
}
@ -668,17 +672,21 @@ func (s *state) resetWatchesFromChain(
}
func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
switch u.CorrelationID {
case rootsWatchID:
roots, ok := u.Result.(*structs.IndexedCARoots)
if !ok {
return fmt.Errorf("invalid type for roots response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
snap.Roots = roots
case serviceListWatchID:
services, ok := u.Result.(*structs.IndexedServices)
if !ok {
return fmt.Errorf("invalid type for services response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
for svcName := range services.Services {
@ -721,7 +729,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
case datacentersWatchID:
datacentersRaw, ok := u.Result.(*[]string)
if !ok {
return fmt.Errorf("invalid type for datacenters response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
if datacentersRaw == nil {
return fmt.Errorf("invalid response with a nil datacenter list")
@ -771,7 +779,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
case serviceResolversWatchID:
configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
if !ok {
return fmt.Errorf("invalid type for services response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
resolvers := make(map[string]*structs.ServiceResolverConfigEntry)
@ -786,7 +794,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "connect-service:")
@ -799,7 +807,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for service response: %T", u.Result)
return fmt.Errorf("invalid type for response: %T", u.Result)
}
dc := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")