mirror of https://github.com/hashicorp/consul
agent/cache: Make all cache options RegisterOptions
Previously the SupportsBlocking option was specified by a method on the type, and all the other options were specified from RegisterOptions. This change moves RegisterOptions to a method on the type, and moves SupportsBlocking into the options struct. Currently there are only 2 cache-types. So all cache-types can implement this method by embedding a struct with those predefined values. In the future if a cache type needs to be registered more than once with different options it can remove the embedded type and implement the method in a way that allows for paramaterization.pull/7647/head
parent
e9e8c0e730
commit
5fe7043439
140
agent/agent.go
140
agent/agent.go
|
@ -4182,151 +4182,45 @@ func (a *Agent) registerCache() {
|
||||||
// the a.delegate directly, otherwise tests that rely on overriding RPC
|
// the a.delegate directly, otherwise tests that rely on overriding RPC
|
||||||
// routing via a.registerEndpoint will not work.
|
// routing via a.registerEndpoint will not work.
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{
|
a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
|
a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
|
||||||
RPC: a,
|
RPC: a,
|
||||||
Cache: a.cache,
|
Cache: a.cache,
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: a.config.Datacenter,
|
||||||
TestOverrideCAChangeInitialDelay: a.config.ConnectTestCALeafRootChangeSpread,
|
TestOverrideCAChangeInitialDelay: a.config.ConnectTestCALeafRootChangeSpread,
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{
|
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{
|
a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{
|
a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{
|
a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Prepared queries don't support blocking
|
|
||||||
Refresh: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{
|
a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{
|
a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{
|
a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.CatalogServiceListName, &cachetype.CatalogServiceList{
|
a.cache.RegisterType(cachetype.CatalogServiceListName, &cachetype.CatalogServiceList{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{
|
a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
Refresh: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{
|
a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{
|
a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{
|
a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{
|
a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{RPC: a})
|
||||||
RPC: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
// Maintain a blocking query, retry dropped connections quickly
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{
|
a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{Agent: a})
|
||||||
Agent: a,
|
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName, &cachetype.FederationStateListMeshGateways{
|
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName,
|
||||||
RPC: a,
|
&cachetype.FederationStateListMeshGateways{RPC: a})
|
||||||
}, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0 * time.Second,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LocalState returns the agent's local state
|
// LocalState returns the agent's local state
|
||||||
|
|
|
@ -12,6 +12,7 @@ const CatalogDatacentersName = "catalog-datacenters"
|
||||||
|
|
||||||
// Datacenters supports fetching discovering all the known datacenters
|
// Datacenters supports fetching discovering all the known datacenters
|
||||||
type CatalogDatacenters struct {
|
type CatalogDatacenters struct {
|
||||||
|
RegisterOptionsNoRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +76,3 @@ func (c *CatalogDatacenters) Fetch(opts cache.FetchOptions, req cache.Request) (
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CatalogDatacenters) SupportsBlocking() bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ const CatalogListServicesName = "catalog-list-services"
|
||||||
|
|
||||||
// CatalogListServices supports fetching discovering service names via the catalog.
|
// CatalogListServices supports fetching discovering service names via the catalog.
|
||||||
type CatalogListServices struct {
|
type CatalogListServices struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +50,3 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request)
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CatalogListServices) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ const CatalogServiceListName = "catalog-services-list"
|
||||||
|
|
||||||
// CatalogServiceList supports fetching service names via the catalog.
|
// CatalogServiceList supports fetching service names via the catalog.
|
||||||
type CatalogServiceList struct {
|
type CatalogServiceList struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +50,3 @@ func (c *CatalogServiceList) Fetch(opts cache.FetchOptions, req cache.Request) (
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CatalogServiceList) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ const CatalogServicesName = "catalog-services"
|
||||||
// CatalogServices supports fetching discovering service instances via the
|
// CatalogServices supports fetching discovering service instances via the
|
||||||
// catalog.
|
// catalog.
|
||||||
type CatalogServices struct {
|
type CatalogServices struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +51,3 @@ func (c *CatalogServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CatalogServices) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ const ConfigEntriesName = "config-entries"
|
||||||
|
|
||||||
// ConfigEntries supports fetching discovering configuration entries
|
// ConfigEntries supports fetching discovering configuration entries
|
||||||
type ConfigEntries struct {
|
type ConfigEntries struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +50,3 @@ func (c *ConfigEntries) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConfigEntries) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ const caChangeJitterWindow = 30 * time.Second
|
||||||
// ConnectCALeaf supports fetching and generating Connect leaf
|
// ConnectCALeaf supports fetching and generating Connect leaf
|
||||||
// certificates.
|
// certificates.
|
||||||
type ConnectCALeaf struct {
|
type ConnectCALeaf struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
caIndex uint64 // Current index for CA roots
|
caIndex uint64 // Current index for CA roots
|
||||||
|
|
||||||
// rootWatchMu protects access to the rootWatchSubscribers map and
|
// rootWatchMu protects access to the rootWatchSubscribers map and
|
||||||
|
@ -629,10 +630,6 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest,
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConnectCALeaf) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConnectCALeafRequest is the cache.Request implementation for the
|
// ConnectCALeafRequest is the cache.Request implementation for the
|
||||||
// ConnectCALeaf cache type. This is implemented here and not in structs
|
// ConnectCALeaf cache type. This is implemented here and not in structs
|
||||||
// since this is only used for cache-related requests and not forwarded
|
// since this is only used for cache-related requests and not forwarded
|
||||||
|
|
|
@ -966,6 +966,21 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testConnectCaRoot wraps ConnectCARoot to disable refresh so that the gated
|
||||||
|
// channel controls the request directly. Otherwise, we get background refreshes and
|
||||||
|
// it screws up the ordering of the channel reads of the testGatedRootsRPC
|
||||||
|
// implementation.
|
||||||
|
type testConnectCaRoot struct {
|
||||||
|
ConnectCARoot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r testConnectCaRoot) RegisterOptions() cache.RegisterOptions {
|
||||||
|
return cache.RegisterOptions{
|
||||||
|
Refresh: false,
|
||||||
|
SupportsBlocking: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// testCALeafType returns a *ConnectCALeaf that is pre-configured to
|
// testCALeafType returns a *ConnectCALeaf that is pre-configured to
|
||||||
// use the given RPC implementation for "ConnectCA.Sign" operations.
|
// use the given RPC implementation for "ConnectCA.Sign" operations.
|
||||||
func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.IndexedCARoots) {
|
func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.IndexedCARoots) {
|
||||||
|
@ -977,14 +992,9 @@ func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.Indexed
|
||||||
|
|
||||||
// Create a cache
|
// Create a cache
|
||||||
c := cache.TestCache(t)
|
c := cache.TestCache(t)
|
||||||
c.RegisterType(ConnectCARootName, &ConnectCARoot{RPC: rootsRPC}, &cache.RegisterOptions{
|
c.RegisterType(ConnectCARootName, &testConnectCaRoot{
|
||||||
// Disable refresh so that the gated channel controls the
|
ConnectCARoot: ConnectCARoot{RPC: rootsRPC},
|
||||||
// request directly. Otherwise, we get background refreshes and
|
|
||||||
// it screws up the ordering of the channel reads of the
|
|
||||||
// testGatedRootsRPC implementation.
|
|
||||||
Refresh: false,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// Create the leaf type
|
// Create the leaf type
|
||||||
return &ConnectCALeaf{
|
return &ConnectCALeaf{
|
||||||
RPC: rpc,
|
RPC: rpc,
|
||||||
|
|
|
@ -14,6 +14,7 @@ const ConnectCARootName = "connect-ca-root"
|
||||||
// straightforward cache type since it only has to block on the given
|
// straightforward cache type since it only has to block on the given
|
||||||
// index and return the data.
|
// index and return the data.
|
||||||
type ConnectCARoot struct {
|
type ConnectCARoot struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +46,3 @@ func (c *ConnectCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConnectCARoot) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ const CompiledDiscoveryChainName = "compiled-discovery-chain"
|
||||||
// CompiledDiscoveryChain supports fetching the complete discovery chain for a
|
// CompiledDiscoveryChain supports fetching the complete discovery chain for a
|
||||||
// service and caching its compilation.
|
// service and caching its compilation.
|
||||||
type CompiledDiscoveryChain struct {
|
type CompiledDiscoveryChain struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +51,3 @@ func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Reques
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CompiledDiscoveryChain) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ const FederationStateListMeshGatewaysName = "federation-state-list-mesh-gateways
|
||||||
|
|
||||||
// FederationState supports fetching federation states.
|
// FederationState supports fetching federation states.
|
||||||
type FederationStateListMeshGateways struct {
|
type FederationStateListMeshGateways struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +50,3 @@ func (c *FederationStateListMeshGateways) Fetch(opts cache.FetchOptions, req cac
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FederationStateListMeshGateways) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ const GatewayServicesName = "gateway-services"
|
||||||
|
|
||||||
// GatewayUpstreams supports fetching upstreams for a given gateway name.
|
// GatewayUpstreams supports fetching upstreams for a given gateway name.
|
||||||
type GatewayServices struct {
|
type GatewayServices struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +50,3 @@ func (g *GatewayServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GatewayServices) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ const HealthServicesName = "health-services"
|
||||||
// HealthServices supports fetching discovering service instances via the
|
// HealthServices supports fetching discovering service instances via the
|
||||||
// catalog.
|
// catalog.
|
||||||
type HealthServices struct {
|
type HealthServices struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +51,3 @@ func (c *HealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cach
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *HealthServices) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ const IntentionMatchName = "intention-match"
|
||||||
|
|
||||||
// IntentionMatch supports fetching the intentions via match queries.
|
// IntentionMatch supports fetching the intentions via match queries.
|
||||||
type IntentionMatch struct {
|
type IntentionMatch struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +44,3 @@ func (c *IntentionMatch) Fetch(opts cache.FetchOptions, req cache.Request) (cach
|
||||||
result.Index = reply.Index
|
result.Index = reply.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *IntentionMatch) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ const NodeServicesName = "node-services"
|
||||||
// NodeServices supports fetching discovering service instances via the
|
// NodeServices supports fetching discovering service instances via the
|
||||||
// catalog.
|
// catalog.
|
||||||
type NodeServices struct {
|
type NodeServices struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +51,3 @@ func (c *NodeServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *NodeServices) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RegisterOptionsBlockingRefresh can be embedded into a struct to implement
|
||||||
|
// part of the agent/cache.Type interface.
|
||||||
|
// When embedded into a struct it identifies the cache type as one which
|
||||||
|
// supports blocking, and uses refresh to keep the cache fresh.
|
||||||
|
type RegisterOptionsBlockingRefresh struct{}
|
||||||
|
|
||||||
|
func (r RegisterOptionsBlockingRefresh) RegisterOptions() cache.RegisterOptions {
|
||||||
|
return cache.RegisterOptions{
|
||||||
|
// Maintain a blocking query, retry dropped connections quickly
|
||||||
|
Refresh: true,
|
||||||
|
SupportsBlocking: true,
|
||||||
|
RefreshTimer: 0 * time.Second,
|
||||||
|
RefreshTimeout: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterOptionsNoRefresh can be embedded into a struct to implement
|
||||||
|
// part of the agent/cache.Type interface.
|
||||||
|
// When embedded into a struct it identifies the cache type as one which
|
||||||
|
// does not support blocking, and should not be refreshed.
|
||||||
|
type RegisterOptionsNoRefresh struct{}
|
||||||
|
|
||||||
|
func (r RegisterOptionsNoRefresh) RegisterOptions() cache.RegisterOptions {
|
||||||
|
return cache.RegisterOptions{
|
||||||
|
Refresh: false,
|
||||||
|
SupportsBlocking: false,
|
||||||
|
}
|
||||||
|
}
|
|
@ -13,10 +13,11 @@ const PreparedQueryName = "prepared-query"
|
||||||
// PreparedQuery supports fetching discovering service instances via prepared
|
// PreparedQuery supports fetching discovering service instances via prepared
|
||||||
// queries.
|
// queries.
|
||||||
type PreparedQuery struct {
|
type PreparedQuery struct {
|
||||||
|
RegisterOptionsNoRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
func (c *PreparedQuery) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
var result cache.FetchResult
|
var result cache.FetchResult
|
||||||
|
|
||||||
// The request should be a PreparedQueryExecuteRequest.
|
// The request should be a PreparedQueryExecuteRequest.
|
||||||
|
@ -47,8 +48,3 @@ func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PreparedQuery) SupportsBlocking() bool {
|
|
||||||
// Prepared queries don't support blocking.
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ const ResolvedServiceConfigName = "resolved-service-config"
|
||||||
// ResolvedServiceConfig supports fetching the config for a service resolved from
|
// ResolvedServiceConfig supports fetching the config for a service resolved from
|
||||||
// the global proxy defaults and the centrally registered service config.
|
// the global proxy defaults and the centrally registered service config.
|
||||||
type ResolvedServiceConfig struct {
|
type ResolvedServiceConfig struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +51,3 @@ func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ResolvedServiceConfig) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ type Agent interface {
|
||||||
|
|
||||||
// ServiceHTTPBasedChecks supports fetching discovering checks in the local state
|
// ServiceHTTPBasedChecks supports fetching discovering checks in the local state
|
||||||
type ServiceHTTPChecks struct {
|
type ServiceHTTPChecks struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
Agent Agent
|
Agent Agent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,10 +92,6 @@ func (c *ServiceHTTPChecks) Fetch(opts cache.FetchOptions, req cache.Request) (c
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceHTTPChecks) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServiceHTTPChecksRequest is the cache.Request implementation for the
|
// ServiceHTTPChecksRequest is the cache.Request implementation for the
|
||||||
// ServiceHTTPBasedChecks cache type. This is implemented here and not in structs
|
// ServiceHTTPBasedChecks cache type. This is implemented here and not in structs
|
||||||
// since this is only used for cache-related requests and not forwarded
|
// since this is only used for cache-related requests and not forwarded
|
||||||
|
|
|
@ -12,6 +12,7 @@ const InternalServiceDumpName = "service-dump"
|
||||||
|
|
||||||
// InternalServiceDump supports fetching discovering service names via the catalog.
|
// InternalServiceDump supports fetching discovering service names via the catalog.
|
||||||
type InternalServiceDump struct {
|
type InternalServiceDump struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +50,3 @@ func (c *InternalServiceDump) Fetch(opts cache.FetchOptions, req cache.Request)
|
||||||
result.Index = reply.QueryMeta.Index
|
result.Index = reply.QueryMeta.Index
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *InternalServiceDump) SupportsBlocking() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -160,6 +160,12 @@ type RegisterOptions struct {
|
||||||
// is to only request data on explicit Get.
|
// is to only request data on explicit Get.
|
||||||
Refresh bool
|
Refresh bool
|
||||||
|
|
||||||
|
// SupportsBlocking should be set to true if the type supports blocking queries.
|
||||||
|
// Types that do not support blocking queries will not be able to use
|
||||||
|
// background refresh nor will the cache attempt blocking fetches if the
|
||||||
|
// client requests them with MinIndex.
|
||||||
|
SupportsBlocking bool
|
||||||
|
|
||||||
// RefreshTimer is the time between attempting to refresh data.
|
// RefreshTimer is the time between attempting to refresh data.
|
||||||
// If this is zero, then data is refreshed immediately when a fetch
|
// If this is zero, then data is refreshed immediately when a fetch
|
||||||
// is returned.
|
// is returned.
|
||||||
|
@ -185,17 +191,15 @@ type RegisterOptions struct {
|
||||||
//
|
//
|
||||||
// This makes the type available for Get but does not automatically perform
|
// This makes the type available for Get but does not automatically perform
|
||||||
// any prefetching. In order to populate the cache, Get must be called.
|
// any prefetching. In order to populate the cache, Get must be called.
|
||||||
func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
|
func (c *Cache) RegisterType(n string, typ Type) {
|
||||||
if opts == nil {
|
opts := typ.RegisterOptions()
|
||||||
opts = &RegisterOptions{}
|
|
||||||
}
|
|
||||||
if opts.LastGetTTL == 0 {
|
if opts.LastGetTTL == 0 {
|
||||||
opts.LastGetTTL = 72 * time.Hour // reasonable default is days
|
opts.LastGetTTL = 72 * time.Hour // reasonable default is days
|
||||||
}
|
}
|
||||||
|
|
||||||
c.typesLock.Lock()
|
c.typesLock.Lock()
|
||||||
defer c.typesLock.Unlock()
|
defer c.typesLock.Unlock()
|
||||||
c.types[n] = typeEntry{Name: n, Type: typ, Opts: opts}
|
c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get loads the data for the given type and request. If data satisfying the
|
// Get loads the data for the given type and request. If data satisfying the
|
||||||
|
@ -241,7 +245,7 @@ func (c *Cache) getEntryLocked(
|
||||||
|
|
||||||
// Check index is not specified or lower than value, or the type doesn't
|
// Check index is not specified or lower than value, or the type doesn't
|
||||||
// support blocking.
|
// support blocking.
|
||||||
if tEntry.Type.SupportsBlocking() && minIndex > 0 && minIndex >= entry.Index {
|
if tEntry.Opts.SupportsBlocking && minIndex > 0 && minIndex >= entry.Index {
|
||||||
// MinIndex was given and matches or is higher than current value so we
|
// MinIndex was given and matches or is higher than current value so we
|
||||||
// ignore the cache and fallthrough to blocking on a new value below.
|
// ignore the cache and fallthrough to blocking on a new value below.
|
||||||
return true, false, entry
|
return true, false, entry
|
||||||
|
@ -465,7 +469,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
|
||||||
}
|
}
|
||||||
|
|
||||||
fOpts := FetchOptions{}
|
fOpts := FetchOptions{}
|
||||||
if tEntry.Type.SupportsBlocking() {
|
if tEntry.Opts.SupportsBlocking {
|
||||||
fOpts.MinIndex = entry.Index
|
fOpts.MinIndex = entry.Index
|
||||||
fOpts.Timeout = tEntry.Opts.RefreshTimeout
|
fOpts.Timeout = tEntry.Opts.RefreshTimeout
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ func TestCacheGet_noIndex(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 42}, nil).Times(1)
|
typ.Static(FetchResult{Value: 42}, nil).Times(1)
|
||||||
|
@ -56,7 +56,7 @@ func TestCacheGet_initError(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
fetcherr := fmt.Errorf("error")
|
fetcherr := fmt.Errorf("error")
|
||||||
|
@ -91,7 +91,7 @@ func TestCacheGet_cachedErrorsDontStick(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
fetcherr := fmt.Errorf("initial error")
|
fetcherr := fmt.Errorf("initial error")
|
||||||
|
@ -152,7 +152,7 @@ func TestCacheGet_blankCacheKey(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 42}, nil).Times(2)
|
typ.Static(FetchResult{Value: 42}, nil).Times(2)
|
||||||
|
@ -183,7 +183,7 @@ func TestCacheGet_blockingInitSameKey(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
triggerCh := make(chan time.Time)
|
triggerCh := make(chan time.Time)
|
||||||
|
@ -220,7 +220,7 @@ func TestCacheGet_blockingInitDiffKeys(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Keep track of the keys
|
// Keep track of the keys
|
||||||
var keysLock sync.Mutex
|
var keysLock sync.Mutex
|
||||||
|
@ -270,7 +270,7 @@ func TestCacheGet_blockingIndex(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
triggerCh := make(chan time.Time)
|
triggerCh := make(chan time.Time)
|
||||||
|
@ -304,7 +304,7 @@ func TestCacheGet_blockingIndexTimeout(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
triggerCh := make(chan time.Time)
|
triggerCh := make(chan time.Time)
|
||||||
|
@ -340,7 +340,7 @@ func TestCacheGet_blockingIndexError(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var retries uint32
|
var retries uint32
|
||||||
|
@ -377,7 +377,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
stateCh := make(chan int, 1)
|
stateCh := make(chan int, 1)
|
||||||
|
|
||||||
|
@ -440,14 +440,15 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||||
func TestCacheGet_periodicRefresh(t *testing.T) {
|
func TestCacheGet_periodicRefresh(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
defer typ.AssertExpectations(t)
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
c := TestCache(t)
|
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
|
||||||
Refresh: true,
|
Refresh: true,
|
||||||
RefreshTimer: 100 * time.Millisecond,
|
RefreshTimer: 100 * time.Millisecond,
|
||||||
RefreshTimeout: 5 * time.Minute,
|
RefreshTimeout: 5 * time.Minute,
|
||||||
})
|
})
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// This is a bit weird, but we do this to ensure that the final
|
// This is a bit weird, but we do this to ensure that the final
|
||||||
// call to the Fetch (if it happens, depends on timing) just blocks.
|
// call to the Fetch (if it happens, depends on timing) just blocks.
|
||||||
|
@ -479,14 +480,15 @@ func TestCacheGet_periodicRefresh(t *testing.T) {
|
||||||
func TestCacheGet_periodicRefreshMultiple(t *testing.T) {
|
func TestCacheGet_periodicRefreshMultiple(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
defer typ.AssertExpectations(t)
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
c := TestCache(t)
|
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
|
||||||
Refresh: true,
|
Refresh: true,
|
||||||
RefreshTimer: 0 * time.Millisecond,
|
RefreshTimer: 0 * time.Millisecond,
|
||||||
RefreshTimeout: 5 * time.Minute,
|
RefreshTimeout: 5 * time.Minute,
|
||||||
})
|
})
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// This is a bit weird, but we do this to ensure that the final
|
// This is a bit weird, but we do this to ensure that the final
|
||||||
// call to the Fetch (if it happens, depends on timing) just blocks.
|
// call to the Fetch (if it happens, depends on timing) just blocks.
|
||||||
|
@ -527,14 +529,15 @@ func TestCacheGet_periodicRefreshMultiple(t *testing.T) {
|
||||||
func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
|
func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
defer typ.AssertExpectations(t)
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
c := TestCache(t)
|
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
|
||||||
Refresh: true,
|
Refresh: true,
|
||||||
RefreshTimer: 0,
|
RefreshTimer: 0,
|
||||||
RefreshTimeout: 5 * time.Minute,
|
RefreshTimeout: 5 * time.Minute,
|
||||||
})
|
})
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var retries uint32
|
var retries uint32
|
||||||
|
@ -568,14 +571,15 @@ func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
|
||||||
func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) {
|
func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
defer typ.AssertExpectations(t)
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
c := TestCache(t)
|
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
|
||||||
Refresh: true,
|
Refresh: true,
|
||||||
RefreshTimer: 0,
|
RefreshTimer: 0,
|
||||||
RefreshTimeout: 5 * time.Minute,
|
RefreshTimeout: 5 * time.Minute,
|
||||||
})
|
})
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var retries uint32
|
var retries uint32
|
||||||
|
@ -611,14 +615,16 @@ func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) {
|
||||||
func TestCacheGet_noIndexSetsOne(t *testing.T) {
|
func TestCacheGet_noIndexSetsOne(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
|
SupportsBlocking: true,
|
||||||
|
Refresh: true,
|
||||||
|
RefreshTimer: 0,
|
||||||
|
RefreshTimeout: 5 * time.Minute,
|
||||||
|
})
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
c.RegisterType("t", typ)
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0,
|
|
||||||
RefreshTimeout: 5 * time.Minute,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Simulate "well behaved" RPC with no data yet but returning 1
|
// Simulate "well behaved" RPC with no data yet but returning 1
|
||||||
{
|
{
|
||||||
|
@ -671,15 +677,17 @@ func TestCacheGet_fetchTimeout(t *testing.T) {
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
|
timeout := 10 * time.Minute
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
|
RefreshTimeout: timeout,
|
||||||
|
SupportsBlocking: true,
|
||||||
|
})
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
|
|
||||||
// Register the type with a timeout
|
// Register the type with a timeout
|
||||||
timeout := 10 * time.Minute
|
c.RegisterType("t", typ)
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
|
||||||
RefreshTimeout: timeout,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var actual time.Duration
|
var actual time.Duration
|
||||||
|
@ -705,14 +713,15 @@ func TestCacheGet_expire(t *testing.T) {
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
|
LastGetTTL: 400 * time.Millisecond,
|
||||||
|
})
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
|
|
||||||
// Register the type with a timeout
|
// Register the type with a timeout
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
c.RegisterType("t", typ)
|
||||||
LastGetTTL: 400 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 42}, nil).Times(2)
|
typ.Static(FetchResult{Value: 42}, nil).Times(2)
|
||||||
|
@ -760,14 +769,15 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
|
LastGetTTL: 150 * time.Millisecond,
|
||||||
|
})
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
|
|
||||||
// Register the type with a timeout
|
// Register the type with a timeout
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
c.RegisterType("t", typ)
|
||||||
LastGetTTL: 150 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 42}, nil).Times(2)
|
typ.Static(FetchResult{Value: 42}, nil).Times(2)
|
||||||
|
@ -821,8 +831,8 @@ func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
|
||||||
defer typ2.AssertExpectations(t)
|
defer typ2.AssertExpectations(t)
|
||||||
|
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
c.RegisterType("t2", typ2, nil)
|
c.RegisterType("t2", typ2)
|
||||||
|
|
||||||
// Configure the types
|
// Configure the types
|
||||||
typ.Static(FetchResult{Value: 100}, nil)
|
typ.Static(FetchResult{Value: 100}, nil)
|
||||||
|
@ -863,7 +873,7 @@ func TestCacheGet_partitionDC(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", &testPartitionType{}, nil)
|
c.RegisterType("t", &testPartitionType{})
|
||||||
|
|
||||||
// Perform multiple gets
|
// Perform multiple gets
|
||||||
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
|
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
|
||||||
|
@ -882,7 +892,7 @@ func TestCacheGet_partitionToken(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", &testPartitionType{}, nil)
|
c.RegisterType("t", &testPartitionType{})
|
||||||
|
|
||||||
// Perform multiple gets
|
// Perform multiple gets
|
||||||
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
|
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
|
||||||
|
@ -907,8 +917,10 @@ func (t *testPartitionType) Fetch(opts FetchOptions, r Request) (FetchResult, er
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *testPartitionType) SupportsBlocking() bool {
|
func (t *testPartitionType) RegisterOptions() RegisterOptions {
|
||||||
return true
|
return RegisterOptions{
|
||||||
|
SupportsBlocking: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that background refreshing reports correct Age in failure and happy
|
// Test that background refreshing reports correct Age in failure and happy
|
||||||
|
@ -918,14 +930,15 @@ func TestCacheGet_refreshAge(t *testing.T) {
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
defer typ.AssertExpectations(t)
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
c := TestCache(t)
|
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
|
||||||
Refresh: true,
|
Refresh: true,
|
||||||
RefreshTimer: 0,
|
RefreshTimer: 0,
|
||||||
RefreshTimeout: 5 * time.Minute,
|
RefreshTimeout: 5 * time.Minute,
|
||||||
})
|
})
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var index, shouldFail uint64
|
var index, shouldFail uint64
|
||||||
|
@ -1035,13 +1048,14 @@ func TestCacheGet_nonRefreshAge(t *testing.T) {
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := &MockType{}
|
||||||
defer typ.AssertExpectations(t)
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
c := TestCache(t)
|
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
|
||||||
Refresh: false,
|
Refresh: false,
|
||||||
LastGetTTL: 100 * time.Millisecond,
|
LastGetTTL: 100 * time.Millisecond,
|
||||||
})
|
})
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var index uint64
|
var index uint64
|
||||||
|
@ -1121,7 +1135,7 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
|
||||||
|
|
||||||
typ := TestTypeNonBlocking(t)
|
typ := TestTypeNonBlocking(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, nil)
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once()
|
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once()
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
// Code generated by mockery v1.0.0
|
// Code generated by mockery v1.0.0. DO NOT EDIT.
|
||||||
|
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import mock "github.com/stretchr/testify/mock"
|
import mock "github.com/stretchr/testify/mock"
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
// Code generated by mockery v1.0.0. DO NOT EDIT.
|
// Code generated by mockery v1.0.0. DO NOT EDIT.
|
||||||
|
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import mock "github.com/stretchr/testify/mock"
|
import mock "github.com/stretchr/testify/mock"
|
||||||
|
@ -29,15 +30,15 @@ func (_m *MockType) Fetch(_a0 FetchOptions, _a1 Request) (FetchResult, error) {
|
||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
// SupportsBlocking provides a mock function with given fields:
|
// RegisterOptions provides a mock function with given fields:
|
||||||
func (_m *MockType) SupportsBlocking() bool {
|
func (_m *MockType) RegisterOptions() RegisterOptions {
|
||||||
ret := _m.Called()
|
ret := _m.Called()
|
||||||
|
|
||||||
var r0 bool
|
var r0 RegisterOptions
|
||||||
if rf, ok := ret.Get(0).(func() bool); ok {
|
if rf, ok := ret.Get(0).(func() RegisterOptions); ok {
|
||||||
r0 = rf()
|
r0 = rf()
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Get(0).(bool)
|
r0 = ret.Get(0).(RegisterOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r0
|
return r0
|
||||||
|
|
|
@ -96,20 +96,21 @@ func TestRequest(t testing.T, info RequestInfo) *MockRequest {
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestType returns a MockType that can be used to setup expectations
|
// TestType returns a MockType that sets default RegisterOptions.
|
||||||
// on data fetching.
|
|
||||||
func TestType(t testing.T) *MockType {
|
func TestType(t testing.T) *MockType {
|
||||||
return testTypeInternal(t, true)
|
typ := &MockType{}
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
|
SupportsBlocking: true,
|
||||||
|
})
|
||||||
|
return typ
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestTypeNonBlocking returns a MockType that returns false to SupportsBlocking.
|
// TestTypeNonBlocking returns a MockType that returns false to SupportsBlocking.
|
||||||
func TestTypeNonBlocking(t testing.T) *MockType {
|
func TestTypeNonBlocking(t testing.T) *MockType {
|
||||||
return testTypeInternal(t, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testTypeInternal(t testing.T, enableBlocking bool) *MockType {
|
|
||||||
typ := &MockType{}
|
typ := &MockType{}
|
||||||
typ.On("SupportsBlocking").Return(enableBlocking).Maybe()
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
|
SupportsBlocking: false,
|
||||||
|
})
|
||||||
return typ
|
return typ
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,11 +30,9 @@ type Type interface {
|
||||||
// metadata even when there is no result.
|
// metadata even when there is no result.
|
||||||
Fetch(FetchOptions, Request) (FetchResult, error)
|
Fetch(FetchOptions, Request) (FetchResult, error)
|
||||||
|
|
||||||
// SupportsBlocking should return true if the type supports blocking queries.
|
// RegisterOptions are used when the type is registered to configure the
|
||||||
// Types that do not support blocking queries will not be able to use
|
// behaviour of cache entries for this type.
|
||||||
// background refresh nor will the cache attempt blocking fetches if the
|
RegisterOptions() RegisterOptions
|
||||||
// client requests them with MinIndex.
|
|
||||||
SupportsBlocking() bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FetchOptions are various settable options when a Fetch is called.
|
// FetchOptions are various settable options when a Fetch is called.
|
||||||
|
|
|
@ -64,7 +64,7 @@ func (c *Cache) Notify(
|
||||||
return fmt.Errorf("unknown type in cache: %s", t)
|
return fmt.Errorf("unknown type in cache: %s", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
if tEntry.Type.SupportsBlocking() {
|
if tEntry.Opts.SupportsBlocking {
|
||||||
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
|
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,10 @@ func TestCacheNotify(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{})
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
c.RegisterType("t", typ)
|
||||||
Refresh: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Setup triggers to control when "updates" should be delivered
|
// Setup triggers to control when "updates" should be delivered
|
||||||
trigger := make([]chan time.Time, 5)
|
trigger := make([]chan time.Time, 5)
|
||||||
|
@ -167,9 +166,7 @@ func TestCacheNotifyPolling(t *testing.T) {
|
||||||
typ := TestTypeNonBlocking(t)
|
typ := TestTypeNonBlocking(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
c.RegisterType("t", typ)
|
||||||
Refresh: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
|
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
|
||||||
|
@ -280,11 +277,10 @@ func TestCacheWatch_ErrorBackoff(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
typ := TestType(t)
|
typ := TestType(t)
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{})
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
c.RegisterType("t", typ)
|
||||||
Refresh: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var retries uint32
|
var retries uint32
|
||||||
|
@ -345,9 +341,7 @@ func TestCacheWatch_ErrorBackoffNonBlocking(t *testing.T) {
|
||||||
typ := TestTypeNonBlocking(t)
|
typ := TestTypeNonBlocking(t)
|
||||||
defer typ.AssertExpectations(t)
|
defer typ.AssertExpectations(t)
|
||||||
c := TestCache(t)
|
c := TestCache(t)
|
||||||
c.RegisterType("t", typ, &RegisterOptions{
|
c.RegisterType("t", typ)
|
||||||
Refresh: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
var retries uint32
|
var retries uint32
|
||||||
|
|
|
@ -50,35 +50,13 @@ func NewTestCacheTypes(t testing.T) *TestCacheTypes {
|
||||||
// proxycfg will watch suitable for testing a proxycfg.State or Manager.
|
// proxycfg will watch suitable for testing a proxycfg.State or Manager.
|
||||||
func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
|
func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
|
||||||
c := cache.TestCache(t)
|
c := cache.TestCache(t)
|
||||||
c.RegisterType(cachetype.ConnectCARootName, types.roots, &cache.RegisterOptions{
|
c.RegisterType(cachetype.ConnectCARootName, types.roots)
|
||||||
Refresh: true,
|
c.RegisterType(cachetype.ConnectCALeafName, types.leaf)
|
||||||
RefreshTimer: 0,
|
c.RegisterType(cachetype.IntentionMatchName, types.intentions)
|
||||||
RefreshTimeout: 10 * time.Minute,
|
c.RegisterType(cachetype.HealthServicesName, types.health)
|
||||||
})
|
c.RegisterType(cachetype.PreparedQueryName, types.query)
|
||||||
c.RegisterType(cachetype.ConnectCALeafName, types.leaf, &cache.RegisterOptions{
|
c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain)
|
||||||
Refresh: true,
|
c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks)
|
||||||
RefreshTimer: 0,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
c.RegisterType(cachetype.IntentionMatchName, types.intentions, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
c.RegisterType(cachetype.HealthServicesName, types.health, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{
|
|
||||||
Refresh: false,
|
|
||||||
})
|
|
||||||
c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain, &cache.RegisterOptions{
|
|
||||||
Refresh: true,
|
|
||||||
RefreshTimer: 0,
|
|
||||||
RefreshTimeout: 10 * time.Minute,
|
|
||||||
})
|
|
||||||
c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks, &cache.RegisterOptions{})
|
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
@ -1221,7 +1199,10 @@ func (ct *ControllableCacheType) Fetch(opts cache.FetchOptions, req cache.Reques
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SupportsBlocking implements cache.Type
|
func (ct *ControllableCacheType) RegisterOptions() cache.RegisterOptions {
|
||||||
func (ct *ControllableCacheType) SupportsBlocking() bool {
|
return cache.RegisterOptions{
|
||||||
return ct.blocking
|
Refresh: ct.blocking,
|
||||||
|
SupportsBlocking: ct.blocking,
|
||||||
|
RefreshTimeout: 10 * time.Minute,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue