mirror of https://github.com/hashicorp/consul
Make central service config opt-in and rework the initial registration
parent
b58572afbd
commit
c269369760
|
@ -1898,18 +1898,21 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
|
|||
return a.addServiceLocked(service, chkTypes, persist, token, source)
|
||||
}
|
||||
|
||||
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
||||
// to the local state if it is not. This function assumes the state lock is already held.
|
||||
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
||||
if err := a.validateService(service, chkTypes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := a.serviceManager.AddService(service, chkTypes, persist, token, source); err != nil {
|
||||
return err
|
||||
if a.config.EnableCentralServiceConfig {
|
||||
return a.serviceManager.AddService(service, chkTypes, persist, token, source)
|
||||
}
|
||||
|
||||
return nil
|
||||
return a.addServiceInternal(service, chkTypes, persist, token, source)
|
||||
}
|
||||
|
||||
// addServiceInternal adds the given service and checks to the local state.
|
||||
func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
||||
// Pause the service syncs during modification
|
||||
a.PauseSync()
|
||||
|
|
|
@ -30,9 +30,9 @@ func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request
|
|||
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
|
||||
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
|
||||
|
||||
// Allways allow stale - there's no point in hitting leader if the request is
|
||||
// Always allow stale - there's no point in hitting leader if the request is
|
||||
// going to be served from cache and endup arbitrarily stale anyway. This
|
||||
// allows cached service-discover to automatically read scale across all
|
||||
// allows cached resolved-service-config to automatically read scale across all
|
||||
// servers too.
|
||||
reqReal.AllowStale = true
|
||||
|
||||
|
|
|
@ -95,7 +95,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
|
|||
|
||||
// Check the index of the value returned in the cache entry to be sure it
|
||||
// changed
|
||||
if index < meta.Index {
|
||||
if index == 0 || index < meta.Index {
|
||||
u := UpdateEvent{correlationID, res, meta, err}
|
||||
select {
|
||||
case ch <- u:
|
||||
|
|
|
@ -793,6 +793,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
|
||||
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
|
||||
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
|
||||
EnableCentralServiceConfig: b.boolVal(c.EnableCentralServiceConfig),
|
||||
EnableDebug: b.boolVal(c.EnableDebug),
|
||||
EnableRemoteScriptChecks: enableRemoteScriptChecks,
|
||||
EnableLocalScriptChecks: enableLocalScriptChecks,
|
||||
|
|
|
@ -201,6 +201,7 @@ type Config struct {
|
|||
DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"`
|
||||
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
|
||||
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
|
||||
EnableCentralServiceConfig *bool `json:"enable_central_service_config,omitempty" hcl:"enable_central_service_config" mapstructure:"enable_central_service_config"`
|
||||
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`
|
||||
EnableScriptChecks *bool `json:"enable_script_checks,omitempty" hcl:"enable_script_checks" mapstructure:"enable_script_checks"`
|
||||
EnableLocalScriptChecks *bool `json:"enable_local_script_checks,omitempty" hcl:"enable_local_script_checks" mapstructure:"enable_local_script_checks"`
|
||||
|
|
|
@ -669,6 +669,12 @@ type RuntimeConfig struct {
|
|||
// and key).
|
||||
EnableAgentTLSForChecks bool
|
||||
|
||||
// EnableCentralServiceConfig controls whether the agent should incorporate
|
||||
// centralized config such as service-defaults into local service registrations.
|
||||
//
|
||||
// hcl: (enable)
|
||||
EnableCentralServiceConfig bool
|
||||
|
||||
// EnableDebug is used to enable various debugging features.
|
||||
//
|
||||
// hcl: enable_debug = (true|false)
|
||||
|
|
|
@ -3073,6 +3073,7 @@ func TestFullConfig(t *testing.T) {
|
|||
},
|
||||
"enable_acl_replication": true,
|
||||
"enable_agent_tls_for_checks": true,
|
||||
"enable_central_service_config": true,
|
||||
"enable_debug": true,
|
||||
"enable_script_checks": true,
|
||||
"enable_local_script_checks": true,
|
||||
|
@ -3629,6 +3630,7 @@ func TestFullConfig(t *testing.T) {
|
|||
}
|
||||
enable_acl_replication = true
|
||||
enable_agent_tls_for_checks = true
|
||||
enable_central_service_config = true
|
||||
enable_debug = true
|
||||
enable_script_checks = true
|
||||
enable_local_script_checks = true
|
||||
|
@ -4270,6 +4272,7 @@ func TestFullConfig(t *testing.T) {
|
|||
DiscardCheckOutput: true,
|
||||
DiscoveryMaxStale: 5 * time.Second,
|
||||
EnableAgentTLSForChecks: true,
|
||||
EnableCentralServiceConfig: true,
|
||||
EnableDebug: true,
|
||||
EnableRemoteScriptChecks: true,
|
||||
EnableLocalScriptChecks: true,
|
||||
|
@ -5067,6 +5070,7 @@ func TestSanitize(t *testing.T) {
|
|||
"DiscoveryMaxStale": "0s",
|
||||
"EnableAgentTLSForChecks": false,
|
||||
"EnableDebug": false,
|
||||
"EnableCentralServiceConfig": false,
|
||||
"EnableLocalScriptChecks": false,
|
||||
"EnableRemoteScriptChecks": false,
|
||||
"EnableSyslog": false,
|
||||
|
|
|
@ -19,7 +19,7 @@ type ServiceManager struct {
|
|||
services map[string]*serviceConfigWatch
|
||||
agent *Agent
|
||||
|
||||
sync.Mutex
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func NewServiceManager(agent *Agent) *ServiceManager {
|
||||
|
@ -44,9 +44,9 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st
|
|||
|
||||
// If a service watch already exists, update the registration. Otherwise,
|
||||
// start a new config watcher.
|
||||
s.Lock()
|
||||
s.lock.Lock()
|
||||
watch, ok := s.services[service.ID]
|
||||
s.Unlock()
|
||||
s.lock.Unlock()
|
||||
if ok {
|
||||
if err := watch.updateRegistration(®); err != nil {
|
||||
return err
|
||||
|
@ -55,46 +55,39 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st
|
|||
} else {
|
||||
// This is a new entry, so get the existing global config and do the initial
|
||||
// registration with the merged config.
|
||||
args := structs.ServiceConfigRequest{
|
||||
Name: service.Service,
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken},
|
||||
}
|
||||
if token != "" {
|
||||
args.QueryOptions.Token = token
|
||||
}
|
||||
var resp structs.ServiceConfigResponse
|
||||
if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil {
|
||||
s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v",
|
||||
service.Service, err)
|
||||
}
|
||||
|
||||
watch := &serviceConfigWatch{
|
||||
registration: ®,
|
||||
readyCh: make(chan error),
|
||||
updateCh: make(chan cache.UpdateEvent, 1),
|
||||
agent: s.agent,
|
||||
config: &resp.Definition,
|
||||
}
|
||||
|
||||
// Force an update/register immediately.
|
||||
if err := watch.updateRegistration(®); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
s.services[service.ID] = watch
|
||||
s.Unlock()
|
||||
// Start the config watch, which starts a blocking query for the resolved service config
|
||||
// in the background.
|
||||
if err := watch.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.agent.logger.Printf("[DEBUG] agent.manager: adding local registration for service %q", service.ID)
|
||||
|
||||
// Call ReadyWait to block until the cache has returned the initial config and the service
|
||||
// has been registered.
|
||||
if err := watch.ReadyWait(); err != nil {
|
||||
watch.Stop()
|
||||
return err
|
||||
}
|
||||
|
||||
s.lock.Lock()
|
||||
s.services[service.ID] = watch
|
||||
s.lock.Unlock()
|
||||
|
||||
s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ServiceManager) RemoveService(serviceID string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
serviceWatch, ok := s.services[serviceID]
|
||||
if !ok {
|
||||
|
@ -123,13 +116,21 @@ type serviceConfigWatch struct {
|
|||
|
||||
agent *Agent
|
||||
|
||||
// readyCh is used for ReadyWait in order to block until the first update
|
||||
// for the resolved service config is received from the cache. Both this
|
||||
// and ready are protected the lock.
|
||||
readyCh chan error
|
||||
ready bool
|
||||
|
||||
updateCh chan cache.UpdateEvent
|
||||
ctx context.Context
|
||||
cancelFunc func()
|
||||
|
||||
sync.Mutex
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// Start starts the config watch and a goroutine to handle updates over
|
||||
// the updateCh. This is not safe to call more than once.
|
||||
func (s *serviceConfigWatch) Start() error {
|
||||
s.ctx, s.cancelFunc = context.WithCancel(context.Background())
|
||||
if err := s.startConfigWatch(); err != nil {
|
||||
|
@ -144,6 +145,14 @@ func (s *serviceConfigWatch) Stop() {
|
|||
s.cancelFunc()
|
||||
}
|
||||
|
||||
// ReadyWait blocks until the readyCh is closed, which means the initial
|
||||
// registration of the service has been completed. If there was an error
|
||||
// with the initial registration, it will be returned.
|
||||
func (s *serviceConfigWatch) ReadyWait() error {
|
||||
err := <-s.readyCh
|
||||
return err
|
||||
}
|
||||
|
||||
// runWatch handles any update events from the cache.Notify until the
|
||||
// config watch is shut down.
|
||||
func (s *serviceConfigWatch) runWatch() {
|
||||
|
@ -166,18 +175,27 @@ func (s *serviceConfigWatch) runWatch() {
|
|||
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error {
|
||||
// Take the agent state lock if needed. This is done before the local config watch
|
||||
// lock in order to prevent a race between this config watch and others - the config
|
||||
// watch lock is the inner lock and the agent stateLock is the outer lock.
|
||||
if !locked {
|
||||
// watch lock is the inner lock and the agent stateLock is the outer lock. In the case
|
||||
// where s.ready == false we assume the lock is already held, since this update is being
|
||||
// waited on for the initial registration by a call from the agent that already holds
|
||||
// the lock.
|
||||
if !locked && s.ready {
|
||||
s.agent.stateLock.Lock()
|
||||
defer s.agent.stateLock.Unlock()
|
||||
}
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
// If we got an error, log a warning if this is the first update; otherwise return the error.
|
||||
// We want the initial update to cause a service registration no matter what.
|
||||
if event.Err != nil {
|
||||
if !s.ready {
|
||||
s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v",
|
||||
s.registration.service.ID, event.Err)
|
||||
} else {
|
||||
return fmt.Errorf("error watching service config: %v", event.Err)
|
||||
}
|
||||
|
||||
} else {
|
||||
switch event.Result.(type) {
|
||||
case *serviceRegistration:
|
||||
s.registration = event.Result.(*serviceRegistration)
|
||||
|
@ -187,13 +205,25 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool)
|
|||
default:
|
||||
return fmt.Errorf("unknown update event type: %T", event)
|
||||
}
|
||||
}
|
||||
|
||||
service := s.mergeServiceConfig()
|
||||
err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source)
|
||||
if err != nil {
|
||||
// If this is the initial registration, return the error through the readyCh
|
||||
// so it can be passed back to the original caller.
|
||||
if !s.ready {
|
||||
s.readyCh <- err
|
||||
}
|
||||
return fmt.Errorf("error updating service registration: %v", err)
|
||||
}
|
||||
|
||||
// If this is the first registration, set the ready status by closing the channel.
|
||||
if !s.ready {
|
||||
close(s.readyCh)
|
||||
s.ready = true
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -216,7 +246,7 @@ func (s *serviceConfigWatch) startConfigWatch() error {
|
|||
}
|
||||
|
||||
// updateRegistration does a synchronous update of the local service registration and
|
||||
// returns the result.
|
||||
// returns the result. The agent stateLock should be held when calling this function.
|
||||
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error {
|
||||
return s.handleUpdate(cache.UpdateEvent{
|
||||
Result: registration,
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
func TestServiceManager_RegisterService(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
a := NewTestAgent(t, t.Name(), "")
|
||||
a := NewTestAgent(t, t.Name(), "enable_central_service_config = true")
|
||||
defer a.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
@ -53,3 +53,46 @@ func TestServiceManager_RegisterService(t *testing.T) {
|
|||
},
|
||||
}, mergedService)
|
||||
}
|
||||
|
||||
func TestServiceManager_Disabled(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
a := NewTestAgent(t, t.Name(), "enable_central_service_config = false")
|
||||
defer a.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
// Register some global proxy config
|
||||
args := &structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ProxyConfigEntry{
|
||||
Config: map[string]interface{}{
|
||||
"foo": 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
|
||||
|
||||
// Now register a service locally and make sure the resulting State entry
|
||||
// has the global config in it.
|
||||
svc := &structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Port: 8000,
|
||||
}
|
||||
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
||||
mergedService := a.State.Service("redis")
|
||||
require.NotNil(mergedService)
|
||||
// The proxy config map shouldn't be present; the agent should ignore global
|
||||
// defaults here.
|
||||
require.Equal(&structs.NodeService{
|
||||
ID: "redis",
|
||||
Service: "redis",
|
||||
Port: 8000,
|
||||
Weights: &structs.Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
}, mergedService)
|
||||
}
|
||||
|
|
|
@ -765,7 +765,9 @@ type ServiceConnect struct {
|
|||
SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"`
|
||||
}
|
||||
|
||||
// Merge overlays the given node's attributes onto the existing node.
|
||||
// Merge overlays any non-empty fields of other onto s. Tags, metadata and proxy
|
||||
// config are unioned together instead of overwritten. The Connect field and the
|
||||
// non-config proxy fields are taken from other.
|
||||
func (s *NodeService) Merge(other *NodeService) {
|
||||
if other.Kind != "" {
|
||||
s.Kind = other.Kind
|
||||
|
@ -776,9 +778,26 @@ func (s *NodeService) Merge(other *NodeService) {
|
|||
if other.Service != "" {
|
||||
s.Service = other.Service
|
||||
}
|
||||
for _, tag := range other.Tags {
|
||||
s.Tags = append(s.Tags, tag)
|
||||
|
||||
if s.Tags == nil {
|
||||
s.Tags = other.Tags
|
||||
} else if other.Tags != nil {
|
||||
// Both nodes have tags, so deduplicate and merge them.
|
||||
tagSet := make(map[string]struct{})
|
||||
for _, tag := range s.Tags {
|
||||
tagSet[tag] = struct{}{}
|
||||
}
|
||||
for _, tag := range other.Tags {
|
||||
tagSet[tag] = struct{}{}
|
||||
}
|
||||
tags := make([]string, 0, len(tagSet))
|
||||
for tag, _ := range tagSet {
|
||||
tags = append(tags, tag)
|
||||
}
|
||||
sort.Strings(tags)
|
||||
s.Tags = tags
|
||||
}
|
||||
|
||||
if other.Address != "" {
|
||||
s.Address = other.Address
|
||||
}
|
||||
|
@ -812,6 +831,8 @@ func (s *NodeService) Merge(other *NodeService) {
|
|||
}
|
||||
s.Proxy.Config = proxyConf
|
||||
|
||||
// Just take the entire Connect block from the other node.
|
||||
// We can revisit this when adding more fields to centralized config.
|
||||
s.Connect = other.Connect
|
||||
s.LocallyRegisteredAsSidecar = other.LocallyRegisteredAsSidecar
|
||||
}
|
||||
|
|
|
@ -561,6 +561,103 @@ func TestStructs_NodeService_IsSame(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStructs_NodeService_Merge(t *testing.T) {
|
||||
a := &NodeService{
|
||||
Kind: "service",
|
||||
ID: "foo:1",
|
||||
Service: "foo",
|
||||
Tags: []string{"a", "b"},
|
||||
Address: "127.0.0.1",
|
||||
Meta: map[string]string{"a": "b"},
|
||||
Port: 1234,
|
||||
Weights: &Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
EnableTagOverride: false,
|
||||
ProxyDestination: "asdf",
|
||||
Proxy: ConnectProxyConfig{
|
||||
DestinationServiceName: "baz",
|
||||
DestinationServiceID: "baz:1",
|
||||
LocalServiceAddress: "127.0.0.1",
|
||||
LocalServicePort: 2345,
|
||||
Config: map[string]interface{}{
|
||||
"foo": 1,
|
||||
},
|
||||
},
|
||||
Connect: ServiceConnect{
|
||||
Native: false,
|
||||
},
|
||||
LocallyRegisteredAsSidecar: false,
|
||||
}
|
||||
|
||||
b := &NodeService{
|
||||
Kind: "other",
|
||||
ID: "bar:1",
|
||||
Service: "bar",
|
||||
Tags: []string{"c", "d"},
|
||||
Address: "127.0.0.2",
|
||||
Meta: map[string]string{"c": "d"},
|
||||
Port: 4567,
|
||||
Weights: &Weights{
|
||||
Passing: 2,
|
||||
Warning: 2,
|
||||
},
|
||||
EnableTagOverride: true,
|
||||
ProxyDestination: "qwer",
|
||||
Proxy: ConnectProxyConfig{
|
||||
DestinationServiceName: "zoo",
|
||||
DestinationServiceID: "zoo:1",
|
||||
LocalServiceAddress: "127.0.0.2",
|
||||
LocalServicePort: 6789,
|
||||
Config: map[string]interface{}{
|
||||
"bar": 2,
|
||||
},
|
||||
},
|
||||
Connect: ServiceConnect{
|
||||
Native: true,
|
||||
},
|
||||
LocallyRegisteredAsSidecar: true,
|
||||
}
|
||||
|
||||
expected := &NodeService{
|
||||
Kind: "other",
|
||||
ID: "bar:1",
|
||||
Service: "bar",
|
||||
Tags: []string{"a", "b", "c", "d"},
|
||||
Address: "127.0.0.2",
|
||||
Meta: map[string]string{
|
||||
"a": "b",
|
||||
"c": "d",
|
||||
},
|
||||
Port: 4567,
|
||||
Weights: &Weights{
|
||||
Passing: 2,
|
||||
Warning: 2,
|
||||
},
|
||||
EnableTagOverride: true,
|
||||
ProxyDestination: "qwer",
|
||||
Proxy: ConnectProxyConfig{
|
||||
DestinationServiceName: "zoo",
|
||||
DestinationServiceID: "zoo:1",
|
||||
LocalServiceAddress: "127.0.0.2",
|
||||
LocalServicePort: 6789,
|
||||
Config: map[string]interface{}{
|
||||
"foo": 1,
|
||||
"bar": 2,
|
||||
},
|
||||
},
|
||||
Connect: ServiceConnect{
|
||||
Native: true,
|
||||
},
|
||||
LocallyRegisteredAsSidecar: true,
|
||||
}
|
||||
|
||||
a.Merge(b)
|
||||
|
||||
require.Equal(t, expected, a)
|
||||
}
|
||||
|
||||
func TestStructs_HealthCheck_IsSame(t *testing.T) {
|
||||
hc := &HealthCheck{
|
||||
Node: "node1",
|
||||
|
|
|
@ -23,8 +23,8 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/logger"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
Loading…
Reference in New Issue