mirror of https://github.com/hashicorp/consul
Get agent cache tests passing without global hit count (which is racy).
Few other fixes in here just to get a clean run locally - they are all also fixed in other PRs but shouldn't conflict. This should be robust to timing between goroutines now.pull/4275/head
parent
79778635e8
commit
43b48bc06b
|
@ -913,10 +913,13 @@ func (s *HTTPServer) AgentConnectCARoots(resp http.ResponseWriter, req *http.Req
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := s.agent.cache.Get(cachetype.ConnectCARootName, &args)
|
raw, m, err := s.agent.cache.Get(cachetype.ConnectCARootName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer setCacheMeta(resp, &m)
|
||||||
|
|
||||||
|
// Add cache hit
|
||||||
|
|
||||||
reply, ok := raw.(*structs.IndexedCARoots)
|
reply, ok := raw.(*structs.IndexedCARoots)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -953,10 +956,11 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.
|
||||||
}
|
}
|
||||||
args.Token = effectiveToken
|
args.Token = effectiveToken
|
||||||
|
|
||||||
raw, err := s.agent.cache.Get(cachetype.ConnectCALeafName, &args)
|
raw, m, err := s.agent.cache.Get(cachetype.ConnectCALeafName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer setCacheMeta(resp, &m)
|
||||||
|
|
||||||
reply, ok := raw.(*structs.IssuedCert)
|
reply, ok := raw.(*structs.IssuedCert)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -1186,7 +1190,7 @@ func (s *HTTPServer) AgentConnectAuthorize(resp http.ResponseWriter, req *http.R
|
||||||
// Validate the trust domain matches ours. Later we will support explicit
|
// Validate the trust domain matches ours. Later we will support explicit
|
||||||
// external federation but not built yet.
|
// external federation but not built yet.
|
||||||
rootArgs := &structs.DCSpecificRequest{Datacenter: s.agent.config.Datacenter}
|
rootArgs := &structs.DCSpecificRequest{Datacenter: s.agent.config.Datacenter}
|
||||||
raw, err := s.agent.cache.Get(cachetype.ConnectCARootName, rootArgs)
|
raw, _, err := s.agent.cache.Get(cachetype.ConnectCARootName, rootArgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1223,10 +1227,11 @@ func (s *HTTPServer) AgentConnectAuthorize(resp http.ResponseWriter, req *http.R
|
||||||
}
|
}
|
||||||
args.Token = token
|
args.Token = token
|
||||||
|
|
||||||
raw, err = s.agent.cache.Get(cachetype.IntentionMatchName, args)
|
raw, m, err := s.agent.cache.Get(cachetype.IntentionMatchName, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
setCacheMeta(resp, &m)
|
||||||
|
|
||||||
reply, ok := raw.(*structs.IndexedIntentionMatches)
|
reply, ok := raw.(*structs.IndexedIntentionMatches)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -2310,9 +2310,6 @@ func TestAgentConnectCARoots_list(t *testing.T) {
|
||||||
a := NewTestAgent(t.Name(), "")
|
a := NewTestAgent(t.Name(), "")
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
// Grab the initial cache hit count
|
|
||||||
cacheHits := a.cache.Hits()
|
|
||||||
|
|
||||||
// Set some CAs. Note that NewTestAgent already bootstraps one CA so this just
|
// Set some CAs. Note that NewTestAgent already bootstraps one CA so this just
|
||||||
// adds a second and makes it active.
|
// adds a second and makes it active.
|
||||||
ca2 := connect.TestCAConfigSet(t, a, nil)
|
ca2 := connect.TestCAConfigSet(t, a, nil)
|
||||||
|
@ -2337,19 +2334,18 @@ func TestAgentConnectCARoots_list(t *testing.T) {
|
||||||
assert.Equal("", r.SigningKey)
|
assert.Equal("", r.SigningKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// That should've been a cache miss, so no hit change
|
assert.Equal("MISS", resp.Header().Get("X-Cache"))
|
||||||
assert.Equal(cacheHits, a.cache.Hits())
|
|
||||||
|
|
||||||
// Test caching
|
// Test caching
|
||||||
{
|
{
|
||||||
// List it again
|
// List it again
|
||||||
obj2, err := a.srv.AgentConnectCARoots(httptest.NewRecorder(), req)
|
resp2 := httptest.NewRecorder()
|
||||||
|
obj2, err := a.srv.AgentConnectCARoots(resp2, req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
assert.Equal(obj, obj2)
|
assert.Equal(obj, obj2)
|
||||||
|
|
||||||
// Should cache hit this time and not make request
|
// Should cache hit this time and not make request
|
||||||
assert.Equal(cacheHits+1, a.cache.Hits())
|
assert.Equal("HIT", resp2.Header().Get("X-Cache"))
|
||||||
cacheHits++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that caching is updated in the background
|
// Test that caching is updated in the background
|
||||||
|
@ -2359,7 +2355,8 @@ func TestAgentConnectCARoots_list(t *testing.T) {
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
// List it again
|
// List it again
|
||||||
obj, err := a.srv.AgentConnectCARoots(httptest.NewRecorder(), req)
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.AgentConnectCARoots(resp, req)
|
||||||
r.Check(err)
|
r.Check(err)
|
||||||
|
|
||||||
value := obj.(structs.IndexedCARoots)
|
value := obj.(structs.IndexedCARoots)
|
||||||
|
@ -2371,15 +2368,14 @@ func TestAgentConnectCARoots_list(t *testing.T) {
|
||||||
if len(value.Roots) != 3 {
|
if len(value.Roots) != 3 {
|
||||||
r.Fatalf("bad len: %d", len(value.Roots))
|
r.Fatalf("bad len: %d", len(value.Roots))
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
|
||||||
// Should be a cache hit! The data should've updated in the cache
|
// Should be a cache hit! The data should've updated in the cache
|
||||||
// in the background so this should've been fetched directly from
|
// in the background so this should've been fetched directly from
|
||||||
// the cache.
|
// the cache.
|
||||||
if v := a.cache.Hits(); v < cacheHits+1 {
|
if resp.Header().Get("X-Cache") != "HIT" {
|
||||||
t.Fatalf("expected at least one more cache hit, still at %d", v)
|
r.Fatalf("should be a cache hit")
|
||||||
}
|
}
|
||||||
cacheHits = a.cache.Hits()
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2651,9 +2647,6 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
|
||||||
// verify it was signed easily.
|
// verify it was signed easily.
|
||||||
ca1 := connect.TestCAConfigSet(t, a, nil)
|
ca1 := connect.TestCAConfigSet(t, a, nil)
|
||||||
|
|
||||||
// Grab the initial cache hit count
|
|
||||||
cacheHits := a.cache.Hits()
|
|
||||||
|
|
||||||
{
|
{
|
||||||
// Register a local service
|
// Register a local service
|
||||||
args := &structs.ServiceDefinition{
|
args := &structs.ServiceDefinition{
|
||||||
|
@ -2679,9 +2672,7 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
|
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
require.Equal("MISS", resp.Header().Get("X-Cache"))
|
||||||
// That should've been a cache miss, so no hit change.
|
|
||||||
require.Equal(cacheHits, a.cache.Hits())
|
|
||||||
|
|
||||||
// Get the issued cert
|
// Get the issued cert
|
||||||
issued, ok := obj.(*structs.IssuedCert)
|
issued, ok := obj.(*structs.IssuedCert)
|
||||||
|
@ -2698,13 +2689,13 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
|
||||||
// Test caching
|
// Test caching
|
||||||
{
|
{
|
||||||
// Fetch it again
|
// Fetch it again
|
||||||
obj2, err := a.srv.AgentConnectCALeafCert(httptest.NewRecorder(), req)
|
resp := httptest.NewRecorder()
|
||||||
|
obj2, err := a.srv.AgentConnectCALeafCert(resp, req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(obj, obj2)
|
require.Equal(obj, obj2)
|
||||||
|
|
||||||
// Should cache hit this time and not make request
|
// Should cache hit this time and not make request
|
||||||
require.Equal(cacheHits+1, a.cache.Hits())
|
require.Equal("HIT", resp.Header().Get("X-Cache"))
|
||||||
cacheHits++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that caching is updated in the background
|
// Test that caching is updated in the background
|
||||||
|
@ -2713,9 +2704,10 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
|
||||||
ca := connect.TestCAConfigSet(t, a, nil)
|
ca := connect.TestCAConfigSet(t, a, nil)
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
// Try and sign again (note no index/wait arg since cache should update in
|
// Try and sign again (note no index/wait arg since cache should update in
|
||||||
// background even if we aren't actively blocking)
|
// background even if we aren't actively blocking)
|
||||||
obj, err := a.srv.AgentConnectCALeafCert(httptest.NewRecorder(), req)
|
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
|
||||||
r.Check(err)
|
r.Check(err)
|
||||||
|
|
||||||
issued2 := obj.(*structs.IssuedCert)
|
issued2 := obj.(*structs.IssuedCert)
|
||||||
|
@ -2731,15 +2723,14 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
|
||||||
|
|
||||||
// Verify that the cert is signed by the new CA
|
// Verify that the cert is signed by the new CA
|
||||||
requireLeafValidUnderCA(t, issued2, ca)
|
requireLeafValidUnderCA(t, issued2, ca)
|
||||||
})
|
|
||||||
|
|
||||||
// Should be a cache hit! The data should've updated in the cache
|
// Should be a cache hit! The data should've updated in the cache
|
||||||
// in the background so this should've been fetched directly from
|
// in the background so this should've been fetched directly from
|
||||||
// the cache.
|
// the cache.
|
||||||
if v := a.cache.Hits(); v < cacheHits+1 {
|
if resp.Header().Get("X-Cache") != "HIT" {
|
||||||
t.Fatalf("expected at least one more cache hit, still at %d", v)
|
r.Fatalf("should be a cache hit")
|
||||||
}
|
}
|
||||||
cacheHits = a.cache.Hits()
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2757,9 +2748,6 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) {
|
||||||
// verify it was signed easily.
|
// verify it was signed easily.
|
||||||
ca1 := connect.TestCAConfigSet(t, a, nil)
|
ca1 := connect.TestCAConfigSet(t, a, nil)
|
||||||
|
|
||||||
// Grab the initial cache hit count
|
|
||||||
cacheHits := a.cache.Hits()
|
|
||||||
|
|
||||||
{
|
{
|
||||||
// Register a non-local service (central catalog)
|
// Register a non-local service (central catalog)
|
||||||
args := &structs.RegisterRequest{
|
args := &structs.RegisterRequest{
|
||||||
|
@ -2785,6 +2773,7 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) {
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
|
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
require.Equal("MISS", resp.Header().Get("X-Cache"))
|
||||||
|
|
||||||
// Get the issued cert
|
// Get the issued cert
|
||||||
issued, ok := obj.(*structs.IssuedCert)
|
issued, ok := obj.(*structs.IssuedCert)
|
||||||
|
@ -2798,19 +2787,16 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) {
|
||||||
assert.Equal(fmt.Sprintf("%d", issued.ModifyIndex),
|
assert.Equal(fmt.Sprintf("%d", issued.ModifyIndex),
|
||||||
resp.Header().Get("X-Consul-Index"))
|
resp.Header().Get("X-Consul-Index"))
|
||||||
|
|
||||||
// That should've been a cache miss, so no hit change
|
|
||||||
require.Equal(cacheHits, a.cache.Hits())
|
|
||||||
|
|
||||||
// Test caching
|
// Test caching
|
||||||
{
|
{
|
||||||
// Fetch it again
|
// Fetch it again
|
||||||
obj2, err := a.srv.AgentConnectCALeafCert(httptest.NewRecorder(), req)
|
resp := httptest.NewRecorder()
|
||||||
|
obj2, err := a.srv.AgentConnectCALeafCert(resp, req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(obj, obj2)
|
require.Equal(obj, obj2)
|
||||||
|
|
||||||
// Should cache hit this time and not make request
|
// Should cache hit this time and not make request
|
||||||
require.Equal(cacheHits+1, a.cache.Hits())
|
require.Equal("HIT", resp.Header().Get("X-Cache"))
|
||||||
cacheHits++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that caching is updated in the background
|
// Test that caching is updated in the background
|
||||||
|
@ -2819,9 +2805,10 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) {
|
||||||
ca := connect.TestCAConfigSet(t, a, nil)
|
ca := connect.TestCAConfigSet(t, a, nil)
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
// Try and sign again (note no index/wait arg since cache should update in
|
// Try and sign again (note no index/wait arg since cache should update in
|
||||||
// background even if we aren't actively blocking)
|
// background even if we aren't actively blocking)
|
||||||
obj, err := a.srv.AgentConnectCALeafCert(httptest.NewRecorder(), req)
|
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
|
||||||
r.Check(err)
|
r.Check(err)
|
||||||
|
|
||||||
issued2 := obj.(*structs.IssuedCert)
|
issued2 := obj.(*structs.IssuedCert)
|
||||||
|
@ -2837,15 +2824,14 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) {
|
||||||
|
|
||||||
// Verify that the cert is signed by the new CA
|
// Verify that the cert is signed by the new CA
|
||||||
requireLeafValidUnderCA(t, issued2, ca)
|
requireLeafValidUnderCA(t, issued2, ca)
|
||||||
})
|
|
||||||
|
|
||||||
// Should be a cache hit! The data should've updated in the cache
|
// Should be a cache hit! The data should've updated in the cache
|
||||||
// in the background so this should've been fetched directly from
|
// in the background so this should've been fetched directly from
|
||||||
// the cache.
|
// the cache.
|
||||||
if v := a.cache.Hits(); v < cacheHits+1 {
|
if resp.Header().Get("X-Cache") != "HIT" {
|
||||||
t.Fatalf("expected at least one more cache hit, still at %d", v)
|
r.Fatalf("should be a cache hit")
|
||||||
}
|
}
|
||||||
cacheHits = a.cache.Hits()
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3588,9 +3574,6 @@ func TestAgentConnectAuthorize_allow(t *testing.T) {
|
||||||
require.Nil(a.RPC("Intention.Apply", &req, &ixnId))
|
require.Nil(a.RPC("Intention.Apply", &req, &ixnId))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grab the initial cache hit count
|
|
||||||
cacheHits := a.cache.Hits()
|
|
||||||
|
|
||||||
args := &structs.ConnectAuthorizeRequest{
|
args := &structs.ConnectAuthorizeRequest{
|
||||||
Target: target,
|
Target: target,
|
||||||
ClientCertURI: connect.TestSpiffeIDService(t, "web").URI().String(),
|
ClientCertURI: connect.TestSpiffeIDService(t, "web").URI().String(),
|
||||||
|
@ -3600,15 +3583,12 @@ func TestAgentConnectAuthorize_allow(t *testing.T) {
|
||||||
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
|
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
|
||||||
require.Nil(err)
|
require.Nil(err)
|
||||||
require.Equal(200, resp.Code)
|
require.Equal(200, resp.Code)
|
||||||
|
require.Equal("MISS", resp.Header().Get("X-Cache"))
|
||||||
|
|
||||||
obj := respRaw.(*connectAuthorizeResp)
|
obj := respRaw.(*connectAuthorizeResp)
|
||||||
require.True(obj.Authorized)
|
require.True(obj.Authorized)
|
||||||
require.Contains(obj.Reason, "Matched")
|
require.Contains(obj.Reason, "Matched")
|
||||||
|
|
||||||
// That should've been a cache miss (for both Intentions and Roots, so no hit
|
|
||||||
// change).
|
|
||||||
require.Equal(cacheHits, a.cache.Hits())
|
|
||||||
|
|
||||||
// Make the request again
|
// Make the request again
|
||||||
{
|
{
|
||||||
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
|
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
|
||||||
|
@ -3620,11 +3600,10 @@ func TestAgentConnectAuthorize_allow(t *testing.T) {
|
||||||
obj := respRaw.(*connectAuthorizeResp)
|
obj := respRaw.(*connectAuthorizeResp)
|
||||||
require.True(obj.Authorized)
|
require.True(obj.Authorized)
|
||||||
require.Contains(obj.Reason, "Matched")
|
require.Contains(obj.Reason, "Matched")
|
||||||
}
|
|
||||||
|
|
||||||
// That should've been a cache hit. We add 2 (Roots + Intentions).
|
// That should've been a cache hit.
|
||||||
require.Equal(cacheHits+2, a.cache.Hits())
|
require.Equal("HIT", resp.Header().Get("X-Cache"))
|
||||||
cacheHits += 2
|
}
|
||||||
|
|
||||||
// Change the intention
|
// Change the intention
|
||||||
{
|
{
|
||||||
|
@ -3657,12 +3636,11 @@ func TestAgentConnectAuthorize_allow(t *testing.T) {
|
||||||
obj := respRaw.(*connectAuthorizeResp)
|
obj := respRaw.(*connectAuthorizeResp)
|
||||||
require.False(obj.Authorized)
|
require.False(obj.Authorized)
|
||||||
require.Contains(obj.Reason, "Matched")
|
require.Contains(obj.Reason, "Matched")
|
||||||
}
|
|
||||||
|
|
||||||
// That should've been a cache hit, too, since it updated in the
|
// That should've been a cache hit, too, since it updated in the
|
||||||
// background. (again 2 hits for Roots + Intentions)
|
// background.
|
||||||
require.Equal(cacheHits+2, a.cache.Hits())
|
require.Equal("HIT", resp.Header().Get("X-Cache"))
|
||||||
cacheHits += 2
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test when there is an intention denying the connection
|
// Test when there is an intention denying the connection
|
||||||
|
|
|
@ -97,7 +97,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
||||||
|
|
||||||
// Need to lookup RootCAs response to discover trust domain. First just lookup
|
// Need to lookup RootCAs response to discover trust domain. First just lookup
|
||||||
// with no blocking info - this should be a cache hit most of the time.
|
// with no blocking info - this should be a cache hit most of the time.
|
||||||
rawRoots, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
|
rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
Datacenter: reqReal.Datacenter,
|
Datacenter: reqReal.Datacenter,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -176,7 +176,7 @@ func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error,
|
||||||
|
|
||||||
// Fetch some new roots. This will block until our MinQueryIndex is
|
// Fetch some new roots. This will block until our MinQueryIndex is
|
||||||
// matched or the timeout is reached.
|
// matched or the timeout is reached.
|
||||||
rawRoots, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
|
rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
Datacenter: datacenter,
|
Datacenter: datacenter,
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
MinQueryIndex: minIndex,
|
MinQueryIndex: minIndex,
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -52,11 +51,6 @@ const (
|
||||||
// searching for "metrics." to see the various metrics exposed. These can be
|
// searching for "metrics." to see the various metrics exposed. These can be
|
||||||
// used to explore the performance of the cache.
|
// used to explore the performance of the cache.
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
// Keeps track of the cache hits and misses in total. This is used by
|
|
||||||
// tests currently to verify cache behavior and is not meant for general
|
|
||||||
// analytics; for that, go-metrics emitted values are better.
|
|
||||||
hits, misses uint64
|
|
||||||
|
|
||||||
// types stores the list of data types that the cache knows how to service.
|
// types stores the list of data types that the cache knows how to service.
|
||||||
// These can be dynamically registered with RegisterType.
|
// These can be dynamically registered with RegisterType.
|
||||||
typesLock sync.RWMutex
|
typesLock sync.RWMutex
|
||||||
|
@ -85,6 +79,13 @@ type typeEntry struct {
|
||||||
Opts *RegisterOptions
|
Opts *RegisterOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResultMeta is returned from Get calls along with the value and can be used
|
||||||
|
// to expose information about the cache status for debugging or testing.
|
||||||
|
type ResultMeta struct {
|
||||||
|
// Return whether or not the request was a cache hit
|
||||||
|
Hit bool
|
||||||
|
}
|
||||||
|
|
||||||
// Options are options for the Cache.
|
// Options are options for the Cache.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Nothing currently, reserved.
|
// Nothing currently, reserved.
|
||||||
|
@ -178,7 +179,7 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
|
||||||
// index is retrieved, the last known value (maybe nil) is returned. No
|
// index is retrieved, the last known value (maybe nil) is returned. No
|
||||||
// error is returned on timeout. This matches the behavior of Consul blocking
|
// error is returned on timeout. This matches the behavior of Consul blocking
|
||||||
// queries.
|
// queries.
|
||||||
func (c *Cache) Get(t string, r Request) (interface{}, error) {
|
func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
|
||||||
info := r.CacheInfo()
|
info := r.CacheInfo()
|
||||||
if info.Key == "" {
|
if info.Key == "" {
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
|
||||||
|
@ -209,9 +210,10 @@ RETRY_GET:
|
||||||
// we have.
|
// we have.
|
||||||
if ok && entry.Valid {
|
if ok && entry.Valid {
|
||||||
if info.MinIndex == 0 || info.MinIndex < entry.Index {
|
if info.MinIndex == 0 || info.MinIndex < entry.Index {
|
||||||
|
meta := ResultMeta{}
|
||||||
if first {
|
if first {
|
||||||
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
|
||||||
atomic.AddUint64(&c.hits, 1)
|
meta.Hit = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Touch the expiration and fix the heap.
|
// Touch the expiration and fix the heap.
|
||||||
|
@ -224,7 +226,7 @@ RETRY_GET:
|
||||||
// only works with fetching values that either have a value
|
// only works with fetching values that either have a value
|
||||||
// or have an error, but not both. The Error may be non-nil
|
// or have an error, but not both. The Error may be non-nil
|
||||||
// in the entry because of this to note future fetch errors.
|
// in the entry because of this to note future fetch errors.
|
||||||
return entry.Value, nil
|
return entry.Value, meta, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,13 +236,10 @@ RETRY_GET:
|
||||||
// timeout. Instead, we make one effort to fetch a new value, and if
|
// timeout. Instead, we make one effort to fetch a new value, and if
|
||||||
// there was an error, we return.
|
// there was an error, we return.
|
||||||
if !first && entry.Error != nil {
|
if !first && entry.Error != nil {
|
||||||
return entry.Value, entry.Error
|
return entry.Value, ResultMeta{}, entry.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
if first {
|
if first {
|
||||||
// Record the miss if its our first time through
|
|
||||||
atomic.AddUint64(&c.misses, 1)
|
|
||||||
|
|
||||||
// We increment two different counters for cache misses depending on
|
// We increment two different counters for cache misses depending on
|
||||||
// whether we're missing because we didn't have the data at all,
|
// whether we're missing because we didn't have the data at all,
|
||||||
// or if we're missing because we're blocking on a set index.
|
// or if we're missing because we're blocking on a set index.
|
||||||
|
@ -263,7 +262,7 @@ RETRY_GET:
|
||||||
// value we have is too old. We need to wait for new data.
|
// value we have is too old. We need to wait for new data.
|
||||||
waiterCh, err := c.fetch(t, key, r, true, 0)
|
waiterCh, err := c.fetch(t, key, r, true, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, ResultMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -273,7 +272,7 @@ RETRY_GET:
|
||||||
|
|
||||||
case <-timeoutCh:
|
case <-timeoutCh:
|
||||||
// Timeout on the cache read, just return whatever we have.
|
// Timeout on the cache read, just return whatever we have.
|
||||||
return entry.Value, nil
|
return entry.Value, ResultMeta{}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,13 +419,13 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
||||||
// fetchDirect fetches the given request with no caching. Because this
|
// fetchDirect fetches the given request with no caching. Because this
|
||||||
// bypasses the caching entirely, multiple matching requests will result
|
// bypasses the caching entirely, multiple matching requests will result
|
||||||
// in multiple actual RPC calls (unlike fetch).
|
// in multiple actual RPC calls (unlike fetch).
|
||||||
func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
|
func (c *Cache) fetchDirect(t string, r Request) (interface{}, ResultMeta, error) {
|
||||||
// Get the type that we're fetching
|
// Get the type that we're fetching
|
||||||
c.typesLock.RLock()
|
c.typesLock.RLock()
|
||||||
tEntry, ok := c.types[t]
|
tEntry, ok := c.types[t]
|
||||||
c.typesLock.RUnlock()
|
c.typesLock.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("unknown type in cache: %s", t)
|
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch it with the min index specified directly by the request.
|
// Fetch it with the min index specified directly by the request.
|
||||||
|
@ -434,11 +433,11 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
|
||||||
MinIndex: r.CacheInfo().MinIndex,
|
MinIndex: r.CacheInfo().MinIndex,
|
||||||
}, r)
|
}, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, ResultMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the result and ignore the rest
|
// Return the result and ignore the rest
|
||||||
return result.Value, nil
|
return result.Value, ResultMeta{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// refresh triggers a fetch for a specific Request according to the
|
// refresh triggers a fetch for a specific Request according to the
|
||||||
|
@ -515,8 +514,3 @@ func (c *Cache) runExpiryLoop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the number of cache hits. Safe to call concurrently.
|
|
||||||
func (c *Cache) Hits() uint64 {
|
|
||||||
return atomic.LoadUint64(&c.hits)
|
|
||||||
}
|
|
||||||
|
|
|
@ -29,14 +29,16 @@ func TestCacheGet_noIndex(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err := c.Get("t", req)
|
result, meta, err := c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should not fetch since we already have a satisfying value
|
// Get, should not fetch since we already have a satisfying value
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.True(meta.Hit)
|
||||||
|
|
||||||
// Sleep a tiny bit just to let maybe some background calls happen
|
// Sleep a tiny bit just to let maybe some background calls happen
|
||||||
// then verify that we still only got the one call
|
// then verify that we still only got the one call
|
||||||
|
@ -61,14 +63,16 @@ func TestCacheGet_initError(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err := c.Get("t", req)
|
result, meta, err := c.Get("t", req)
|
||||||
require.Error(err)
|
require.Error(err)
|
||||||
require.Nil(result)
|
require.Nil(result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should fetch again since our last fetch was an error
|
// Get, should fetch again since our last fetch was an error
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.Error(err)
|
require.Error(err)
|
||||||
require.Nil(result)
|
require.Nil(result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Sleep a tiny bit just to let maybe some background calls happen
|
// Sleep a tiny bit just to let maybe some background calls happen
|
||||||
// then verify that we still only got the one call
|
// then verify that we still only got the one call
|
||||||
|
@ -93,14 +97,16 @@ func TestCacheGet_blankCacheKey(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: ""})
|
req := TestRequest(t, RequestInfo{Key: ""})
|
||||||
result, err := c.Get("t", req)
|
result, meta, err := c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should not fetch since we already have a satisfying value
|
// Get, should not fetch since we already have a satisfying value
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Sleep a tiny bit just to let maybe some background calls happen
|
// Sleep a tiny bit just to let maybe some background calls happen
|
||||||
// then verify that we still only got the one call
|
// then verify that we still only got the one call
|
||||||
|
@ -317,16 +323,18 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err := c.Get("t", req)
|
result, meta, err := c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should not fetch since we already have a satisfying value
|
// Get, should not fetch since we already have a satisfying value
|
||||||
req = TestRequest(t, RequestInfo{
|
req = TestRequest(t, RequestInfo{
|
||||||
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Sleep a tiny bit just to let maybe some background calls happen
|
// Sleep a tiny bit just to let maybe some background calls happen
|
||||||
// then verify that we still only got the one call
|
// then verify that we still only got the one call
|
||||||
|
@ -518,9 +526,10 @@ func TestCacheGet_fetchTimeout(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err := c.Get("t", req)
|
result, meta, err := c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Test the timeout
|
// Test the timeout
|
||||||
require.Equal(timeout, actual)
|
require.Equal(timeout, actual)
|
||||||
|
@ -546,26 +555,27 @@ func TestCacheGet_expire(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err := c.Get("t", req)
|
result, meta, err := c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should not fetch, verified via the mock assertions above
|
// Get, should not fetch, verified via the mock assertions above
|
||||||
hits := c.Hits()
|
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.Equal(hits+1, c.Hits())
|
require.True(meta.Hit)
|
||||||
|
|
||||||
// Sleep for the expiry
|
// Sleep for the expiry
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Sleep a tiny bit just to let maybe some background calls happen
|
// Sleep a tiny bit just to let maybe some background calls happen
|
||||||
// then verify that we still only got the one call
|
// then verify that we still only got the one call
|
||||||
|
@ -593,9 +603,10 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err := c.Get("t", req)
|
result, meta, err := c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Fetch multiple times, where the total time is well beyond
|
// Fetch multiple times, where the total time is well beyond
|
||||||
// the TTL. We should not trigger any fetches during this time.
|
// the TTL. We should not trigger any fetches during this time.
|
||||||
|
@ -605,18 +616,20 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
|
|
||||||
// Get, should not fetch
|
// Get, should not fetch
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.True(meta.Hit)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, err = c.Get("t", req)
|
result, meta, err = c.Get("t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Sleep a tiny bit just to let maybe some background calls happen
|
// Sleep a tiny bit just to let maybe some background calls happen
|
||||||
// then verify that we still only got the one call
|
// then verify that we still only got the one call
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestCache(t testing.T) *Cache {
|
||||||
func TestCacheGetCh(t testing.T, c *Cache, typ string, r Request) <-chan interface{} {
|
func TestCacheGetCh(t testing.T, c *Cache, typ string, r Request) <-chan interface{} {
|
||||||
resultCh := make(chan interface{})
|
resultCh := make(chan interface{})
|
||||||
go func() {
|
go func() {
|
||||||
result, err := c.Get(typ, r)
|
result, _, err := c.Get(typ, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Error: %s", err)
|
t.Logf("Error: %s", err)
|
||||||
close(resultCh)
|
close(resultCh)
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/NYTimes/gziphandler"
|
"github.com/NYTimes/gziphandler"
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/go-cleanhttp"
|
"github.com/hashicorp/go-cleanhttp"
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
|
@ -460,6 +461,15 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
|
||||||
setConsistency(resp, m.ConsistencyLevel)
|
setConsistency(resp, m.ConsistencyLevel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setCacheMeta sets http response headers to indicate cache status.
|
||||||
|
func setCacheMeta(resp http.ResponseWriter, m *cache.ResultMeta) {
|
||||||
|
str := "MISS"
|
||||||
|
if m != nil && m.Hit {
|
||||||
|
str = "HIT"
|
||||||
|
}
|
||||||
|
resp.Header().Set("X-Cache", str)
|
||||||
|
}
|
||||||
|
|
||||||
// setHeaders is used to set canonical response header fields
|
// setHeaders is used to set canonical response header fields
|
||||||
func setHeaders(resp http.ResponseWriter, headers map[string]string) {
|
func setHeaders(resp http.ResponseWriter, headers map[string]string) {
|
||||||
for field, value := range headers {
|
for field, value := range headers {
|
||||||
|
|
|
@ -1052,7 +1052,7 @@ func TestAPI_AgentConnectCARoots_empty(t *testing.T) {
|
||||||
agent := c.Agent()
|
agent := c.Agent()
|
||||||
list, meta, err := agent.ConnectCARoots(nil)
|
list, meta, err := agent.ConnectCARoots(nil)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(uint64(0), meta.LastIndex)
|
require.Equal(uint64(1), meta.LastIndex)
|
||||||
require.Len(list.Roots, 0)
|
require.Len(list.Roots, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue