mirror of https://github.com/hashicorp/consul
agent: initialize the cache and cache the CA roots
parent
c329b4cb34
commit
6902d721d6
|
@ -21,6 +21,8 @@ import (
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/ae"
|
"github.com/hashicorp/consul/agent/ae"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
|
@ -118,6 +120,9 @@ type Agent struct {
|
||||||
// and the remote state.
|
// and the remote state.
|
||||||
sync *ae.StateSyncer
|
sync *ae.StateSyncer
|
||||||
|
|
||||||
|
// cache is the in-memory cache for data the Agent requests.
|
||||||
|
cache *cache.Cache
|
||||||
|
|
||||||
// checkReapAfter maps the check ID to a timeout after which we should
|
// checkReapAfter maps the check ID to a timeout after which we should
|
||||||
// reap its associated service
|
// reap its associated service
|
||||||
checkReapAfter map[types.CheckID]time.Duration
|
checkReapAfter map[types.CheckID]time.Duration
|
||||||
|
@ -290,6 +295,9 @@ func (a *Agent) Start() error {
|
||||||
// regular and on-demand state synchronizations (anti-entropy).
|
// regular and on-demand state synchronizations (anti-entropy).
|
||||||
a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)
|
a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)
|
||||||
|
|
||||||
|
// create the cache
|
||||||
|
a.cache = cache.New(nil)
|
||||||
|
|
||||||
// create the config for the rpc server/client
|
// create the config for the rpc server/client
|
||||||
consulCfg, err := a.consulConfig()
|
consulCfg, err := a.consulConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -326,6 +334,9 @@ func (a *Agent) Start() error {
|
||||||
a.State.Delegate = a.delegate
|
a.State.Delegate = a.delegate
|
||||||
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
||||||
|
|
||||||
|
// Register the cache
|
||||||
|
a.registerCache()
|
||||||
|
|
||||||
// Load checks/services/metadata.
|
// Load checks/services/metadata.
|
||||||
if err := a.loadServices(c); err != nil {
|
if err := a.loadServices(c); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2624,3 +2635,18 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// registerCache configures the cache and registers all the supported
|
||||||
|
// types onto the cache. This is NOT safe to call multiple times so
|
||||||
|
// care should be taken to call this exactly once after the cache
|
||||||
|
// field has been initialized.
|
||||||
|
func (a *Agent) registerCache() {
|
||||||
|
a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{
|
||||||
|
RPC: a.delegate,
|
||||||
|
}, &cache.RegisterOptions{
|
||||||
|
// Maintain a blocking query, retry dropped connections quickly
|
||||||
|
Refresh: true,
|
||||||
|
RefreshTimer: 0,
|
||||||
|
RefreshTimeout: 10 * time.Minute,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/mitchellh/hashstructure"
|
"github.com/mitchellh/hashstructure"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/connect"
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
|
@ -885,10 +886,24 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in
|
||||||
|
|
||||||
// AgentConnectCARoots returns the trusted CA roots.
|
// AgentConnectCARoots returns the trusted CA roots.
|
||||||
func (s *HTTPServer) AgentConnectCARoots(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) AgentConnectCARoots(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
// NOTE(mitchellh): for now this is identical to /v1/connect/ca/roots.
|
var args structs.DCSpecificRequest
|
||||||
// In the future, we're going to do some agent-local caching and the
|
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||||
// behavior will differ.
|
return nil, nil
|
||||||
return s.ConnectCARoots(resp, req)
|
}
|
||||||
|
|
||||||
|
raw, err := s.agent.cache.Get(cachetype.ConnectCARootName, &args)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reply, ok := raw.(*structs.IndexedCARoots)
|
||||||
|
if !ok {
|
||||||
|
// This should never happen, but we want to protect against panics
|
||||||
|
return nil, fmt.Errorf("internal error: response type not correct")
|
||||||
|
}
|
||||||
|
defer setMeta(resp, &reply.QueryMeta)
|
||||||
|
|
||||||
|
return *reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AgentConnectCALeafCert returns the certificate bundle for a service
|
// AgentConnectCALeafCert returns the certificate bundle for a service
|
||||||
|
|
|
@ -7,12 +7,15 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TypeCARoot supports fetching the Connect CA roots.
|
// Recommended name for registration for ConnectCARoot
|
||||||
type TypeCARoot struct {
|
const ConnectCARootName = "connect-ca"
|
||||||
|
|
||||||
|
// ConnectCARoot supports fetching the Connect CA roots.
|
||||||
|
type ConnectCARoot struct {
|
||||||
RPC RPC
|
RPC RPC
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TypeCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
func (c *ConnectCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
var result cache.FetchResult
|
var result cache.FetchResult
|
||||||
|
|
||||||
// The request should be a DCSpecificRequest.
|
// The request should be a DCSpecificRequest.
|
||||||
|
|
|
@ -10,11 +10,11 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTypeCARoot(t *testing.T) {
|
func TestConnectCARoot(t *testing.T) {
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
rpc := TestRPC(t)
|
rpc := TestRPC(t)
|
||||||
defer rpc.AssertExpectations(t)
|
defer rpc.AssertExpectations(t)
|
||||||
typ := &TypeCARoot{RPC: rpc}
|
typ := &ConnectCARoot{RPC: rpc}
|
||||||
|
|
||||||
// Expect the proper RPC call. This also sets the expected value
|
// Expect the proper RPC call. This also sets the expected value
|
||||||
// since that is return-by-pointer in the arguments.
|
// since that is return-by-pointer in the arguments.
|
||||||
|
@ -42,11 +42,11 @@ func TestTypeCARoot(t *testing.T) {
|
||||||
}, result)
|
}, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTypeCARoot_badReqType(t *testing.T) {
|
func TestConnectCARoot_badReqType(t *testing.T) {
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
rpc := TestRPC(t)
|
rpc := TestRPC(t)
|
||||||
defer rpc.AssertExpectations(t)
|
defer rpc.AssertExpectations(t)
|
||||||
typ := &TypeCARoot{RPC: rpc}
|
typ := &ConnectCARoot{RPC: rpc}
|
||||||
|
|
||||||
// Fetch
|
// Fetch
|
||||||
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
||||||
|
|
Loading…
Reference in New Issue