mirror of https://github.com/hashicorp/consul
Merge pull request #5700 from hashicorp/service-reg-manager
Use centralized service config on agent service registrationspull/5715/head
commit
d8f8400fe1
114
agent/agent.go
114
agent/agent.go
|
@ -243,6 +243,10 @@ type Agent struct {
|
||||||
// directly.
|
// directly.
|
||||||
proxyConfig *proxycfg.Manager
|
proxyConfig *proxycfg.Manager
|
||||||
|
|
||||||
|
// serviceManager is the manager for combining local service registrations with
|
||||||
|
// the centrally configured proxy/service defaults.
|
||||||
|
serviceManager *ServiceManager
|
||||||
|
|
||||||
// xdsServer is the Server instance that serves xDS gRPC API.
|
// xdsServer is the Server instance that serves xDS gRPC API.
|
||||||
xdsServer *xds.Server
|
xdsServer *xds.Server
|
||||||
|
|
||||||
|
@ -287,6 +291,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) {
|
||||||
endpoints: make(map[string]string),
|
endpoints: make(map[string]string),
|
||||||
tokens: new(token.Store),
|
tokens: new(token.Store),
|
||||||
}
|
}
|
||||||
|
a.serviceManager = NewServiceManager(a)
|
||||||
|
|
||||||
if err := a.initializeACLs(); err != nil {
|
if err := a.initializeACLs(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1895,49 +1900,22 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
|
||||||
return a.addServiceLocked(service, chkTypes, persist, token, source)
|
return a.addServiceLocked(service, chkTypes, persist, token, source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addServiceLocked adds a service entry to the service manager if enabled, or directly
|
||||||
|
// to the local state if it is not. This function assumes the state lock is already held.
|
||||||
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
||||||
if service.Service == "" {
|
if err := a.validateService(service, chkTypes); err != nil {
|
||||||
return fmt.Errorf("Service name missing")
|
return err
|
||||||
}
|
|
||||||
if service.ID == "" && service.Service != "" {
|
|
||||||
service.ID = service.Service
|
|
||||||
}
|
|
||||||
for _, check := range chkTypes {
|
|
||||||
if err := check.Validate(); err != nil {
|
|
||||||
return fmt.Errorf("Check is not valid: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set default weights if not specified. This is important as it ensures AE
|
if a.config.EnableCentralServiceConfig {
|
||||||
// doesn't consider the service different since it has nil weights.
|
return a.serviceManager.AddService(service, chkTypes, persist, token, source)
|
||||||
if service.Weights == nil {
|
|
||||||
service.Weights = &structs.Weights{Passing: 1, Warning: 1}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Warn if the service name is incompatible with DNS
|
return a.addServiceInternal(service, chkTypes, persist, token, source)
|
||||||
if InvalidDnsRe.MatchString(service.Service) {
|
}
|
||||||
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
|
|
||||||
"via DNS due to invalid characters. Valid characters include "+
|
|
||||||
"all alpha-numerics and dashes.", service.Service)
|
|
||||||
} else if len(service.Service) > MaxDNSLabelLength {
|
|
||||||
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
|
|
||||||
"via DNS due to it being too long. Valid lengths are between "+
|
|
||||||
"1 and 63 bytes.", service.Service)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Warn if any tags are incompatible with DNS
|
|
||||||
for _, tag := range service.Tags {
|
|
||||||
if InvalidDnsRe.MatchString(tag) {
|
|
||||||
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
|
|
||||||
"via DNS due to invalid characters. Valid characters include "+
|
|
||||||
"all alpha-numerics and dashes.", tag)
|
|
||||||
} else if len(tag) > MaxDNSLabelLength {
|
|
||||||
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
|
|
||||||
"via DNS due to it being too long. Valid lengths are between "+
|
|
||||||
"1 and 63 bytes.", tag)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// addServiceInternal adds the given service and checks to the local state.
|
||||||
|
func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
||||||
// Pause the service syncs during modification
|
// Pause the service syncs during modification
|
||||||
a.PauseSync()
|
a.PauseSync()
|
||||||
defer a.ResumeSync()
|
defer a.ResumeSync()
|
||||||
|
@ -2027,6 +2005,54 @@ func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*struc
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validateService validates an service and its checks, either returning an error or emitting a
|
||||||
|
// warning based on the nature of the error.
|
||||||
|
func (a *Agent) validateService(service *structs.NodeService, chkTypes []*structs.CheckType) error {
|
||||||
|
if service.Service == "" {
|
||||||
|
return fmt.Errorf("Service name missing")
|
||||||
|
}
|
||||||
|
if service.ID == "" && service.Service != "" {
|
||||||
|
service.ID = service.Service
|
||||||
|
}
|
||||||
|
for _, check := range chkTypes {
|
||||||
|
if err := check.Validate(); err != nil {
|
||||||
|
return fmt.Errorf("Check is not valid: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set default weights if not specified. This is important as it ensures AE
|
||||||
|
// doesn't consider the service different since it has nil weights.
|
||||||
|
if service.Weights == nil {
|
||||||
|
service.Weights = &structs.Weights{Passing: 1, Warning: 1}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warn if the service name is incompatible with DNS
|
||||||
|
if InvalidDnsRe.MatchString(service.Service) {
|
||||||
|
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
|
||||||
|
"via DNS due to invalid characters. Valid characters include "+
|
||||||
|
"all alpha-numerics and dashes.", service.Service)
|
||||||
|
} else if len(service.Service) > MaxDNSLabelLength {
|
||||||
|
a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+
|
||||||
|
"via DNS due to it being too long. Valid lengths are between "+
|
||||||
|
"1 and 63 bytes.", service.Service)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warn if any tags are incompatible with DNS
|
||||||
|
for _, tag := range service.Tags {
|
||||||
|
if InvalidDnsRe.MatchString(tag) {
|
||||||
|
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
|
||||||
|
"via DNS due to invalid characters. Valid characters include "+
|
||||||
|
"all alpha-numerics and dashes.", tag)
|
||||||
|
} else if len(tag) > MaxDNSLabelLength {
|
||||||
|
a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+
|
||||||
|
"via DNS due to it being too long. Valid lengths are between "+
|
||||||
|
"1 and 63 bytes.", tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// cleanupRegistration is called on registration error to ensure no there are no
|
// cleanupRegistration is called on registration error to ensure no there are no
|
||||||
// leftovers after a partial failure
|
// leftovers after a partial failure
|
||||||
func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) {
|
func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) {
|
||||||
|
@ -2066,6 +2092,11 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error {
|
||||||
return fmt.Errorf("ServiceID missing")
|
return fmt.Errorf("ServiceID missing")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shut down the config watch in the service manager if enabled.
|
||||||
|
if a.config.EnableCentralServiceConfig {
|
||||||
|
a.serviceManager.RemoveService(serviceID)
|
||||||
|
}
|
||||||
|
|
||||||
checks := a.State.Checks()
|
checks := a.State.Checks()
|
||||||
var checkIDs []types.CheckID
|
var checkIDs []types.CheckID
|
||||||
for id, check := range checks {
|
for id, check := range checks {
|
||||||
|
@ -3676,6 +3707,15 @@ func (a *Agent) registerCache() {
|
||||||
RefreshTimer: 0 * time.Second,
|
RefreshTimer: 0 * time.Second,
|
||||||
RefreshTimeout: 10 * time.Minute,
|
RefreshTimeout: 10 * time.Minute,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{
|
||||||
|
RPC: a,
|
||||||
|
}, &cache.RegisterOptions{
|
||||||
|
// Maintain a blocking query, retry dropped connections quickly
|
||||||
|
Refresh: true,
|
||||||
|
RefreshTimer: 0 * time.Second,
|
||||||
|
RefreshTimeout: 10 * time.Minute,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// defaultProxyCommand returns the default Connect managed proxy command.
|
// defaultProxyCommand returns the default Connect managed proxy command.
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Recommended name for registration.
|
||||||
|
const ResolvedServiceConfigName = "resolved-service-config"
|
||||||
|
|
||||||
|
// ResolvedServiceConfig supports fetching the config for a service resolved from
|
||||||
|
// the global proxy defaults and the centrally registered service config.
|
||||||
|
type ResolvedServiceConfig struct {
|
||||||
|
RPC RPC
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
|
var result cache.FetchResult
|
||||||
|
|
||||||
|
// The request should be a ServiceConfigRequest.
|
||||||
|
reqReal, ok := req.(*structs.ServiceConfigRequest)
|
||||||
|
if !ok {
|
||||||
|
return result, fmt.Errorf(
|
||||||
|
"Internal cache failure: request wrong type: %T", req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the minimum query index to our current index so we block
|
||||||
|
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
|
||||||
|
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
|
||||||
|
|
||||||
|
// Always allow stale - there's no point in hitting leader if the request is
|
||||||
|
// going to be served from cache and endup arbitrarily stale anyway. This
|
||||||
|
// allows cached resolved-service-config to automatically read scale across all
|
||||||
|
// servers too.
|
||||||
|
reqReal.AllowStale = true
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
var reply structs.ServiceConfigResponse
|
||||||
|
if err := c.RPC.RPC("ConfigEntry.ResolveServiceConfig", reqReal, &reply); err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Value = &reply
|
||||||
|
result.Index = reply.QueryMeta.Index
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ResolvedServiceConfig) SupportsBlocking() bool {
|
||||||
|
return true
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestResolvedServiceConfig(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
rpc := TestRPC(t)
|
||||||
|
defer rpc.AssertExpectations(t)
|
||||||
|
typ := &ResolvedServiceConfig{RPC: rpc}
|
||||||
|
|
||||||
|
// Expect the proper RPC call. This also sets the expected value
|
||||||
|
// since that is return-by-pointer in the arguments.
|
||||||
|
var resp *structs.ServiceConfigResponse
|
||||||
|
rpc.On("RPC", "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil).
|
||||||
|
Run(func(args mock.Arguments) {
|
||||||
|
req := args.Get(1).(*structs.ServiceConfigRequest)
|
||||||
|
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
|
||||||
|
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
|
||||||
|
require.Equal("foo", req.Name)
|
||||||
|
require.True(req.AllowStale)
|
||||||
|
|
||||||
|
reply := args.Get(2).(*structs.ServiceConfigResponse)
|
||||||
|
reply.Definition = structs.ServiceDefinition{
|
||||||
|
ID: "1234",
|
||||||
|
Name: "foo",
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.QueryMeta.Index = 48
|
||||||
|
resp = reply
|
||||||
|
})
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
resultA, err := typ.Fetch(cache.FetchOptions{
|
||||||
|
MinIndex: 24,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}, &structs.ServiceConfigRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Name: "foo",
|
||||||
|
})
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(cache.FetchResult{
|
||||||
|
Value: resp,
|
||||||
|
Index: 48,
|
||||||
|
}, resultA)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolvedServiceConfig_badReqType(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
rpc := TestRPC(t)
|
||||||
|
defer rpc.AssertExpectations(t)
|
||||||
|
typ := &ResolvedServiceConfig{RPC: rpc}
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
||||||
|
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
|
||||||
|
require.Error(err)
|
||||||
|
require.Contains(err.Error(), "wrong type")
|
||||||
|
|
||||||
|
}
|
|
@ -95,7 +95,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co
|
||||||
|
|
||||||
// Check the index of the value returned in the cache entry to be sure it
|
// Check the index of the value returned in the cache entry to be sure it
|
||||||
// changed
|
// changed
|
||||||
if index < meta.Index {
|
if index == 0 || index < meta.Index {
|
||||||
u := UpdateEvent{correlationID, res, meta, err}
|
u := UpdateEvent{correlationID, res, meta, err}
|
||||||
select {
|
select {
|
||||||
case ch <- u:
|
case ch <- u:
|
||||||
|
|
|
@ -2,6 +2,7 @@ package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -23,11 +24,15 @@ func TestCacheNotify(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Setup triggers to control when "updates" should be delivered
|
// Setup triggers to control when "updates" should be delivered
|
||||||
trigger := make([]chan time.Time, 4)
|
trigger := make([]chan time.Time, 5)
|
||||||
for i := range trigger {
|
for i := range trigger {
|
||||||
trigger[i] = make(chan time.Time)
|
trigger[i] = make(chan time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send an error to fake a situation where the servers aren't reachable
|
||||||
|
// initially.
|
||||||
|
typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once()
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
|
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
|
||||||
// Assert the right request type - all real Fetch implementations do this so
|
// Assert the right request type - all real Fetch implementations do this so
|
||||||
|
@ -35,16 +40,16 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// break in real life (hint: it did on the first attempt)
|
// break in real life (hint: it did on the first attempt)
|
||||||
_, ok := args.Get(1).(*MockRequest)
|
_, ok := args.Get(1).(*MockRequest)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
})
|
}).WaitUntil(trigger[0])
|
||||||
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[0])
|
|
||||||
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1])
|
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1])
|
||||||
typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[2])
|
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[2])
|
||||||
|
typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[3])
|
||||||
// It's timing dependent whether the blocking loop manages to make another
|
// It's timing dependent whether the blocking loop manages to make another
|
||||||
// call before we cancel so don't require it. We need to have a higher index
|
// call before we cancel so don't require it. We need to have a higher index
|
||||||
// here because if the index is the same then the cache Get will not return
|
// here because if the index is the same then the cache Get will not return
|
||||||
// until the full 10 min timeout expires. This causes the last fetch to return
|
// until the full 10 min timeout expires. This causes the last fetch to return
|
||||||
// after cancellation as if it had timed out.
|
// after cancellation as if it had timed out.
|
||||||
typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[3])
|
typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[4])
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
|
||||||
|
@ -56,12 +61,12 @@ func TestCacheNotify(t *testing.T) {
|
||||||
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
|
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
|
||||||
// Should receive the first result pretty soon
|
// Should receive the error with index == 0 first.
|
||||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
CorrelationID: "test",
|
CorrelationID: "test",
|
||||||
Result: 1,
|
Result: nil,
|
||||||
Meta: ResultMeta{Hit: false, Index: 4},
|
Meta: ResultMeta{Hit: false, Index: 0},
|
||||||
Err: nil,
|
Err: errors.New("no servers available"),
|
||||||
})
|
})
|
||||||
|
|
||||||
// There should be no more updates delivered yet
|
// There should be no more updates delivered yet
|
||||||
|
@ -70,6 +75,17 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// Trigger blocking query to return a "change"
|
// Trigger blocking query to return a "change"
|
||||||
close(trigger[0])
|
close(trigger[0])
|
||||||
|
|
||||||
|
// Should receive the first real update next.
|
||||||
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
|
CorrelationID: "test",
|
||||||
|
Result: 1,
|
||||||
|
Meta: ResultMeta{Hit: false, Index: 4},
|
||||||
|
Err: nil,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Trigger blocking query to return a "change"
|
||||||
|
close(trigger[1])
|
||||||
|
|
||||||
// Should receive the next result pretty soon
|
// Should receive the next result pretty soon
|
||||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
CorrelationID: "test",
|
CorrelationID: "test",
|
||||||
|
@ -99,7 +115,7 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// We could wait for a full timeout but we can't directly observe it so
|
// We could wait for a full timeout but we can't directly observe it so
|
||||||
// simulate the behavior by triggering a response with the same value and
|
// simulate the behavior by triggering a response with the same value and
|
||||||
// index as the last one.
|
// index as the last one.
|
||||||
close(trigger[1])
|
close(trigger[2])
|
||||||
|
|
||||||
// We should NOT be notified about that. Note this is timing dependent but
|
// We should NOT be notified about that. Note this is timing dependent but
|
||||||
// it's only a sanity check, if we somehow _do_ get the change delivered later
|
// it's only a sanity check, if we somehow _do_ get the change delivered later
|
||||||
|
@ -108,7 +124,7 @@ func TestCacheNotify(t *testing.T) {
|
||||||
require.Len(ch, 0)
|
require.Len(ch, 0)
|
||||||
|
|
||||||
// Trigger final update
|
// Trigger final update
|
||||||
close(trigger[2])
|
close(trigger[3])
|
||||||
|
|
||||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
CorrelationID: "test",
|
CorrelationID: "test",
|
||||||
|
@ -134,7 +150,7 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// have no way to interrupt a blocking query. In practice it's fine to know
|
// have no way to interrupt a blocking query. In practice it's fine to know
|
||||||
// that after 10 mins max the blocking query will return and the resources
|
// that after 10 mins max the blocking query will return and the resources
|
||||||
// will be cleaned.
|
// will be cleaned.
|
||||||
close(trigger[3])
|
close(trigger[4])
|
||||||
|
|
||||||
// I want to test that canceling the context cleans up goroutines (which it
|
// I want to test that canceling the context cleans up goroutines (which it
|
||||||
// does from manual verification with debugger etc). I had a check based on a
|
// does from manual verification with debugger etc). I had a check based on a
|
||||||
|
|
|
@ -793,6 +793,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||||
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
|
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
|
||||||
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
|
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
|
||||||
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
|
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
|
||||||
|
EnableCentralServiceConfig: b.boolVal(c.EnableCentralServiceConfig),
|
||||||
EnableDebug: b.boolVal(c.EnableDebug),
|
EnableDebug: b.boolVal(c.EnableDebug),
|
||||||
EnableRemoteScriptChecks: enableRemoteScriptChecks,
|
EnableRemoteScriptChecks: enableRemoteScriptChecks,
|
||||||
EnableLocalScriptChecks: enableLocalScriptChecks,
|
EnableLocalScriptChecks: enableLocalScriptChecks,
|
||||||
|
|
|
@ -201,6 +201,7 @@ type Config struct {
|
||||||
DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"`
|
DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"`
|
||||||
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
|
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
|
||||||
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
|
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
|
||||||
|
EnableCentralServiceConfig *bool `json:"enable_central_service_config,omitempty" hcl:"enable_central_service_config" mapstructure:"enable_central_service_config"`
|
||||||
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`
|
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`
|
||||||
EnableScriptChecks *bool `json:"enable_script_checks,omitempty" hcl:"enable_script_checks" mapstructure:"enable_script_checks"`
|
EnableScriptChecks *bool `json:"enable_script_checks,omitempty" hcl:"enable_script_checks" mapstructure:"enable_script_checks"`
|
||||||
EnableLocalScriptChecks *bool `json:"enable_local_script_checks,omitempty" hcl:"enable_local_script_checks" mapstructure:"enable_local_script_checks"`
|
EnableLocalScriptChecks *bool `json:"enable_local_script_checks,omitempty" hcl:"enable_local_script_checks" mapstructure:"enable_local_script_checks"`
|
||||||
|
|
|
@ -669,6 +669,12 @@ type RuntimeConfig struct {
|
||||||
// and key).
|
// and key).
|
||||||
EnableAgentTLSForChecks bool
|
EnableAgentTLSForChecks bool
|
||||||
|
|
||||||
|
// EnableCentralServiceConfig controls whether the agent should incorporate
|
||||||
|
// centralized config such as service-defaults into local service registrations.
|
||||||
|
//
|
||||||
|
// hcl: enable_central_service_config = (true|false)
|
||||||
|
EnableCentralServiceConfig bool
|
||||||
|
|
||||||
// EnableDebug is used to enable various debugging features.
|
// EnableDebug is used to enable various debugging features.
|
||||||
//
|
//
|
||||||
// hcl: enable_debug = (true|false)
|
// hcl: enable_debug = (true|false)
|
||||||
|
|
|
@ -3073,6 +3073,7 @@ func TestFullConfig(t *testing.T) {
|
||||||
},
|
},
|
||||||
"enable_acl_replication": true,
|
"enable_acl_replication": true,
|
||||||
"enable_agent_tls_for_checks": true,
|
"enable_agent_tls_for_checks": true,
|
||||||
|
"enable_central_service_config": true,
|
||||||
"enable_debug": true,
|
"enable_debug": true,
|
||||||
"enable_script_checks": true,
|
"enable_script_checks": true,
|
||||||
"enable_local_script_checks": true,
|
"enable_local_script_checks": true,
|
||||||
|
@ -3629,6 +3630,7 @@ func TestFullConfig(t *testing.T) {
|
||||||
}
|
}
|
||||||
enable_acl_replication = true
|
enable_acl_replication = true
|
||||||
enable_agent_tls_for_checks = true
|
enable_agent_tls_for_checks = true
|
||||||
|
enable_central_service_config = true
|
||||||
enable_debug = true
|
enable_debug = true
|
||||||
enable_script_checks = true
|
enable_script_checks = true
|
||||||
enable_local_script_checks = true
|
enable_local_script_checks = true
|
||||||
|
@ -4270,6 +4272,7 @@ func TestFullConfig(t *testing.T) {
|
||||||
DiscardCheckOutput: true,
|
DiscardCheckOutput: true,
|
||||||
DiscoveryMaxStale: 5 * time.Second,
|
DiscoveryMaxStale: 5 * time.Second,
|
||||||
EnableAgentTLSForChecks: true,
|
EnableAgentTLSForChecks: true,
|
||||||
|
EnableCentralServiceConfig: true,
|
||||||
EnableDebug: true,
|
EnableDebug: true,
|
||||||
EnableRemoteScriptChecks: true,
|
EnableRemoteScriptChecks: true,
|
||||||
EnableLocalScriptChecks: true,
|
EnableLocalScriptChecks: true,
|
||||||
|
@ -5067,6 +5070,7 @@ func TestSanitize(t *testing.T) {
|
||||||
"DiscoveryMaxStale": "0s",
|
"DiscoveryMaxStale": "0s",
|
||||||
"EnableAgentTLSForChecks": false,
|
"EnableAgentTLSForChecks": false,
|
||||||
"EnableDebug": false,
|
"EnableDebug": false,
|
||||||
|
"EnableCentralServiceConfig": false,
|
||||||
"EnableLocalScriptChecks": false,
|
"EnableLocalScriptChecks": false,
|
||||||
"EnableRemoteScriptChecks": false,
|
"EnableRemoteScriptChecks": false,
|
||||||
"EnableSyslog": false,
|
"EnableSyslog": false,
|
||||||
|
|
|
@ -191,18 +191,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
serviceConf, ok := serviceEntry.(*structs.ServiceConfigEntry)
|
var serviceConf *structs.ServiceConfigEntry
|
||||||
if !ok {
|
var ok bool
|
||||||
return fmt.Errorf("invalid service config type %T", serviceEntry)
|
if serviceEntry != nil {
|
||||||
|
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid service config type %T", serviceEntry)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal)
|
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry)
|
var proxyConf *structs.ProxyConfigEntry
|
||||||
if !ok {
|
if proxyEntry != nil {
|
||||||
return fmt.Errorf("invalid proxy config type %T", serviceEntry)
|
proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid proxy config type %T", proxyEntry)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve the service definition by overlaying the service config onto the global
|
// Resolve the service definition by overlaying the service config onto the global
|
||||||
|
|
|
@ -0,0 +1,264 @@
|
||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The ServiceManager is a layer for service registration in between the agent
|
||||||
|
// and the local state. Any services must be registered with the ServiceManager,
|
||||||
|
// which then maintains a long-running watch of any globally-set service or proxy
|
||||||
|
// configuration that applies to the service in order to register the final, merged
|
||||||
|
// service configuration locally in the agent state.
|
||||||
|
type ServiceManager struct {
|
||||||
|
services map[string]*serviceConfigWatch
|
||||||
|
agent *Agent
|
||||||
|
|
||||||
|
lock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServiceManager(agent *Agent) *ServiceManager {
|
||||||
|
return &ServiceManager{
|
||||||
|
services: make(map[string]*serviceConfigWatch),
|
||||||
|
agent: agent,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddService starts a new serviceConfigWatch if the service has not been registered, and
|
||||||
|
// updates the existing registration if it has. For a new service, a call will also be made
|
||||||
|
// to fetch the merged global defaults that apply to the service in order to compose the
|
||||||
|
// initial registration.
|
||||||
|
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
reg := serviceRegistration{
|
||||||
|
service: service,
|
||||||
|
chkTypes: chkTypes,
|
||||||
|
persist: persist,
|
||||||
|
token: token,
|
||||||
|
source: source,
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a service watch already exists, update the registration. Otherwise,
|
||||||
|
// start a new config watcher.
|
||||||
|
watch, ok := s.services[service.ID]
|
||||||
|
if ok {
|
||||||
|
if err := watch.updateRegistration(®); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID)
|
||||||
|
} else {
|
||||||
|
// This is a new entry, so get the existing global config and do the initial
|
||||||
|
// registration with the merged config.
|
||||||
|
watch := &serviceConfigWatch{
|
||||||
|
registration: ®,
|
||||||
|
readyCh: make(chan error),
|
||||||
|
updateCh: make(chan cache.UpdateEvent, 1),
|
||||||
|
agent: s.agent,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the config watch, which starts a blocking query for the resolved service config
|
||||||
|
// in the background.
|
||||||
|
if err := watch.Start(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call ReadyWait to block until the cache has returned the initial config and the service
|
||||||
|
// has been registered.
|
||||||
|
if err := watch.ReadyWait(); err != nil {
|
||||||
|
watch.Stop()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.services[service.ID] = watch
|
||||||
|
|
||||||
|
s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServiceManager) RemoveService(serviceID string) {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
serviceWatch, ok := s.services[serviceID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceWatch.Stop()
|
||||||
|
delete(s.services, serviceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// serviceRegistration represents a locally registered service.
|
||||||
|
type serviceRegistration struct {
|
||||||
|
service *structs.NodeService
|
||||||
|
chkTypes []*structs.CheckType
|
||||||
|
persist bool
|
||||||
|
token string
|
||||||
|
source configSource
|
||||||
|
}
|
||||||
|
|
||||||
|
// serviceConfigWatch is a long running helper for composing the end config
|
||||||
|
// for a given service from both the local registration and the global
|
||||||
|
// service/proxy defaults.
|
||||||
|
type serviceConfigWatch struct {
|
||||||
|
registration *serviceRegistration
|
||||||
|
config *structs.ServiceDefinition
|
||||||
|
|
||||||
|
agent *Agent
|
||||||
|
|
||||||
|
// readyCh is used for ReadyWait in order to block until the first update
|
||||||
|
// for the resolved service config is received from the cache.
|
||||||
|
readyCh chan error
|
||||||
|
|
||||||
|
updateCh chan cache.UpdateEvent
|
||||||
|
ctx context.Context
|
||||||
|
cancelFunc func()
|
||||||
|
|
||||||
|
lock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the config watch and a goroutine to handle updates over
|
||||||
|
// the updateCh. This is not safe to call more than once.
|
||||||
|
func (s *serviceConfigWatch) Start() error {
|
||||||
|
s.ctx, s.cancelFunc = context.WithCancel(context.Background())
|
||||||
|
if err := s.startConfigWatch(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
go s.runWatch()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) Stop() {
|
||||||
|
s.cancelFunc()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadyWait blocks until the readyCh is closed, which means the initial
|
||||||
|
// registration of the service has been completed. If there was an error
|
||||||
|
// with the initial registration, it will be returned.
|
||||||
|
func (s *serviceConfigWatch) ReadyWait() error {
|
||||||
|
err := <-s.readyCh
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// runWatch handles any update events from the cache.Notify until the
|
||||||
|
// config watch is shut down.
|
||||||
|
func (s *serviceConfigWatch) runWatch() {
|
||||||
|
firstRun := true
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case event := <-s.updateCh:
|
||||||
|
if err := s.handleUpdate(event, false, firstRun); err != nil {
|
||||||
|
s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err)
|
||||||
|
}
|
||||||
|
firstRun = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleUpdate receives an update event about either the service registration or the
|
||||||
|
// global config defaults, updates the local state and re-registers the service with
|
||||||
|
// the newly merged config. This function takes the serviceConfigWatch lock to ensure
|
||||||
|
// only one update can be happening at a time.
|
||||||
|
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, firstRun bool) error {
|
||||||
|
// Take the agent state lock if needed. This is done before the local config watch
|
||||||
|
// lock in order to prevent a race between this config watch and others - the config
|
||||||
|
// watch lock is the inner lock and the agent stateLock is the outer lock. If this is the
|
||||||
|
// first run we also don't need to take the stateLock, as this is being waited on
|
||||||
|
// synchronously by a caller that already holds it.
|
||||||
|
if !locked && !firstRun {
|
||||||
|
s.agent.stateLock.Lock()
|
||||||
|
defer s.agent.stateLock.Unlock()
|
||||||
|
}
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
// If we got an error, log a warning if this is the first update; otherwise return the error.
|
||||||
|
// We want the initial update to cause a service registration no matter what.
|
||||||
|
if event.Err != nil {
|
||||||
|
if firstRun {
|
||||||
|
s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v",
|
||||||
|
s.registration.service.ID, event.Err)
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("error watching service config: %v", event.Err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
switch event.Result.(type) {
|
||||||
|
case *serviceRegistration:
|
||||||
|
s.registration = event.Result.(*serviceRegistration)
|
||||||
|
case *structs.ServiceConfigResponse:
|
||||||
|
resp := event.Result.(*structs.ServiceConfigResponse)
|
||||||
|
s.config = &resp.Definition
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown update event type: %T", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
service := s.mergeServiceConfig()
|
||||||
|
err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source)
|
||||||
|
if err != nil {
|
||||||
|
// If this is the initial registration, return the error through the readyCh
|
||||||
|
// so it can be passed back to the original caller.
|
||||||
|
if firstRun {
|
||||||
|
s.readyCh <- err
|
||||||
|
}
|
||||||
|
return fmt.Errorf("error updating service registration: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is the first registration, set the ready status by closing the channel.
|
||||||
|
if firstRun {
|
||||||
|
close(s.readyCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// startConfigWatch starts a cache.Notify goroutine to run a continuous blocking query
|
||||||
|
// on the resolved service config for this service.
|
||||||
|
func (s *serviceConfigWatch) startConfigWatch() error {
|
||||||
|
name := s.registration.service.Service
|
||||||
|
|
||||||
|
req := &structs.ServiceConfigRequest{
|
||||||
|
Name: name,
|
||||||
|
Datacenter: s.agent.config.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken},
|
||||||
|
}
|
||||||
|
if s.registration.token != "" {
|
||||||
|
req.QueryOptions.Token = s.registration.token
|
||||||
|
}
|
||||||
|
err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateRegistration does a synchronous update of the local service registration and
|
||||||
|
// returns the result. The agent stateLock should be held when calling this function.
|
||||||
|
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error {
|
||||||
|
return s.handleUpdate(cache.UpdateEvent{
|
||||||
|
Result: registration,
|
||||||
|
}, true, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeServiceConfig returns the final effective config for the watched service,
|
||||||
|
// including the latest known global defaults from the servers.
|
||||||
|
func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService {
|
||||||
|
if s.config == nil {
|
||||||
|
return s.registration.service
|
||||||
|
}
|
||||||
|
|
||||||
|
svc := s.config.NodeService()
|
||||||
|
svc.Merge(s.registration.service)
|
||||||
|
|
||||||
|
return svc
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServiceManager_RegisterService(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
a := NewTestAgent(t, t.Name(), "enable_central_service_config = true")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some global proxy config
|
||||||
|
args := &structs.ConfigEntryRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Entry: &structs.ProxyConfigEntry{
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
|
||||||
|
|
||||||
|
// Now register a service locally and make sure the resulting State entry
|
||||||
|
// has the global config in it.
|
||||||
|
svc := &structs.NodeService{
|
||||||
|
ID: "redis",
|
||||||
|
Service: "redis",
|
||||||
|
Port: 8000,
|
||||||
|
}
|
||||||
|
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
mergedService := a.State.Service("redis")
|
||||||
|
require.NotNil(mergedService)
|
||||||
|
require.Equal(&structs.NodeService{
|
||||||
|
ID: "redis",
|
||||||
|
Service: "redis",
|
||||||
|
Port: 8000,
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": int64(1),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Weights: &structs.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
}, mergedService)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServiceManager_Disabled(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
a := NewTestAgent(t, t.Name(), "enable_central_service_config = false")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some global proxy config
|
||||||
|
args := &structs.ConfigEntryRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Entry: &structs.ProxyConfigEntry{
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
require.NoError(a.RPC("ConfigEntry.Apply", args, &out))
|
||||||
|
|
||||||
|
// Now register a service locally and make sure the resulting State entry
|
||||||
|
// has the global config in it.
|
||||||
|
svc := &structs.NodeService{
|
||||||
|
ID: "redis",
|
||||||
|
Service: "redis",
|
||||||
|
Port: 8000,
|
||||||
|
}
|
||||||
|
require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal))
|
||||||
|
mergedService := a.State.Service("redis")
|
||||||
|
require.NotNil(mergedService)
|
||||||
|
// The proxy config map shouldn't be present; the agent should ignore global
|
||||||
|
// defaults here.
|
||||||
|
require.Equal(&structs.NodeService{
|
||||||
|
ID: "redis",
|
||||||
|
Service: "redis",
|
||||||
|
Port: 8000,
|
||||||
|
Weights: &structs.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
}, mergedService)
|
||||||
|
}
|
|
@ -2,10 +2,13 @@ package structs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
|
"github.com/mitchellh/hashstructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -265,6 +268,30 @@ func (s *ServiceConfigRequest) RequestDatacenter() string {
|
||||||
return s.Datacenter
|
return s.Datacenter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
|
||||||
|
info := cache.RequestInfo{
|
||||||
|
Token: r.Token,
|
||||||
|
Datacenter: r.Datacenter,
|
||||||
|
MinIndex: r.MinQueryIndex,
|
||||||
|
Timeout: r.MaxQueryTime,
|
||||||
|
MaxAge: r.MaxAge,
|
||||||
|
MustRevalidate: r.MustRevalidate,
|
||||||
|
}
|
||||||
|
|
||||||
|
// To calculate the cache key we only hash the service name. The
|
||||||
|
// datacenter is handled by the cache framework. The other fields are
|
||||||
|
// not, but should not be used in any cache types.
|
||||||
|
v, err := hashstructure.Hash(r.Name, nil)
|
||||||
|
if err == nil {
|
||||||
|
// If there is an error, we don't set the key. A blank key forces
|
||||||
|
// no cache for this request so the request is forwarded directly
|
||||||
|
// to the server.
|
||||||
|
info.Key = strconv.FormatUint(v, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
|
||||||
type ServiceConfigResponse struct {
|
type ServiceConfigResponse struct {
|
||||||
Definition ServiceDefinition
|
Definition ServiceDefinition
|
||||||
|
|
||||||
|
|
|
@ -765,6 +765,78 @@ type ServiceConnect struct {
|
||||||
SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"`
|
SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merge overlays any non-empty fields of other onto s. Tags, metadata and proxy
|
||||||
|
// config are unioned together instead of overwritten. The Connect field and the
|
||||||
|
// non-config proxy fields are taken from other.
|
||||||
|
func (s *NodeService) Merge(other *NodeService) {
|
||||||
|
if other.Kind != "" {
|
||||||
|
s.Kind = other.Kind
|
||||||
|
}
|
||||||
|
if other.ID != "" {
|
||||||
|
s.ID = other.ID
|
||||||
|
}
|
||||||
|
if other.Service != "" {
|
||||||
|
s.Service = other.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.Tags == nil {
|
||||||
|
s.Tags = other.Tags
|
||||||
|
} else if other.Tags != nil {
|
||||||
|
// Both nodes have tags, so deduplicate and merge them.
|
||||||
|
tagSet := make(map[string]struct{})
|
||||||
|
for _, tag := range s.Tags {
|
||||||
|
tagSet[tag] = struct{}{}
|
||||||
|
}
|
||||||
|
for _, tag := range other.Tags {
|
||||||
|
tagSet[tag] = struct{}{}
|
||||||
|
}
|
||||||
|
tags := make([]string, 0, len(tagSet))
|
||||||
|
for tag, _ := range tagSet {
|
||||||
|
tags = append(tags, tag)
|
||||||
|
}
|
||||||
|
sort.Strings(tags)
|
||||||
|
s.Tags = tags
|
||||||
|
}
|
||||||
|
|
||||||
|
if other.Address != "" {
|
||||||
|
s.Address = other.Address
|
||||||
|
}
|
||||||
|
if s.Meta == nil {
|
||||||
|
s.Meta = other.Meta
|
||||||
|
} else {
|
||||||
|
for k, v := range other.Meta {
|
||||||
|
s.Meta[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if other.Port != 0 {
|
||||||
|
s.Port = other.Port
|
||||||
|
}
|
||||||
|
if other.Weights != nil {
|
||||||
|
s.Weights = other.Weights
|
||||||
|
}
|
||||||
|
s.EnableTagOverride = other.EnableTagOverride
|
||||||
|
if other.ProxyDestination != "" {
|
||||||
|
s.ProxyDestination = other.ProxyDestination
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take the incoming service's proxy fields and merge the config map.
|
||||||
|
proxyConf := s.Proxy.Config
|
||||||
|
s.Proxy = other.Proxy
|
||||||
|
if proxyConf == nil {
|
||||||
|
proxyConf = other.Proxy.Config
|
||||||
|
} else {
|
||||||
|
for k, v := range other.Proxy.Config {
|
||||||
|
proxyConf[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.Proxy.Config = proxyConf
|
||||||
|
|
||||||
|
// Just take the entire Connect block from the other node.
|
||||||
|
// We can revisit this when adding more fields to centralized config.
|
||||||
|
s.Connect = other.Connect
|
||||||
|
s.LocallyRegisteredAsSidecar = other.LocallyRegisteredAsSidecar
|
||||||
|
}
|
||||||
|
|
||||||
// Validate validates the node service configuration.
|
// Validate validates the node service configuration.
|
||||||
//
|
//
|
||||||
// NOTE(mitchellh): This currently only validates fields for a ConnectProxy.
|
// NOTE(mitchellh): This currently only validates fields for a ConnectProxy.
|
||||||
|
|
|
@ -561,6 +561,103 @@ func TestStructs_NodeService_IsSame(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStructs_NodeService_Merge(t *testing.T) {
|
||||||
|
a := &NodeService{
|
||||||
|
Kind: "service",
|
||||||
|
ID: "foo:1",
|
||||||
|
Service: "foo",
|
||||||
|
Tags: []string{"a", "b"},
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Meta: map[string]string{"a": "b"},
|
||||||
|
Port: 1234,
|
||||||
|
Weights: &Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
EnableTagOverride: false,
|
||||||
|
ProxyDestination: "asdf",
|
||||||
|
Proxy: ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "baz",
|
||||||
|
DestinationServiceID: "baz:1",
|
||||||
|
LocalServiceAddress: "127.0.0.1",
|
||||||
|
LocalServicePort: 2345,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Connect: ServiceConnect{
|
||||||
|
Native: false,
|
||||||
|
},
|
||||||
|
LocallyRegisteredAsSidecar: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
b := &NodeService{
|
||||||
|
Kind: "other",
|
||||||
|
ID: "bar:1",
|
||||||
|
Service: "bar",
|
||||||
|
Tags: []string{"c", "d"},
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Meta: map[string]string{"c": "d"},
|
||||||
|
Port: 4567,
|
||||||
|
Weights: &Weights{
|
||||||
|
Passing: 2,
|
||||||
|
Warning: 2,
|
||||||
|
},
|
||||||
|
EnableTagOverride: true,
|
||||||
|
ProxyDestination: "qwer",
|
||||||
|
Proxy: ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "zoo",
|
||||||
|
DestinationServiceID: "zoo:1",
|
||||||
|
LocalServiceAddress: "127.0.0.2",
|
||||||
|
LocalServicePort: 6789,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"bar": 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Connect: ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
LocallyRegisteredAsSidecar: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := &NodeService{
|
||||||
|
Kind: "other",
|
||||||
|
ID: "bar:1",
|
||||||
|
Service: "bar",
|
||||||
|
Tags: []string{"a", "b", "c", "d"},
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Meta: map[string]string{
|
||||||
|
"a": "b",
|
||||||
|
"c": "d",
|
||||||
|
},
|
||||||
|
Port: 4567,
|
||||||
|
Weights: &Weights{
|
||||||
|
Passing: 2,
|
||||||
|
Warning: 2,
|
||||||
|
},
|
||||||
|
EnableTagOverride: true,
|
||||||
|
ProxyDestination: "qwer",
|
||||||
|
Proxy: ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "zoo",
|
||||||
|
DestinationServiceID: "zoo:1",
|
||||||
|
LocalServiceAddress: "127.0.0.2",
|
||||||
|
LocalServicePort: 6789,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": 1,
|
||||||
|
"bar": 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Connect: ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
LocallyRegisteredAsSidecar: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
a.Merge(b)
|
||||||
|
|
||||||
|
require.Equal(t, expected, a)
|
||||||
|
}
|
||||||
|
|
||||||
func TestStructs_HealthCheck_IsSame(t *testing.T) {
|
func TestStructs_HealthCheck_IsSame(t *testing.T) {
|
||||||
hc := &HealthCheck{
|
hc := &HealthCheck{
|
||||||
Node: "node1",
|
Node: "node1",
|
||||||
|
|
|
@ -23,8 +23,8 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/freeport"
|
|
||||||
"github.com/hashicorp/consul/logger"
|
"github.com/hashicorp/consul/logger"
|
||||||
|
"github.com/hashicorp/consul/sdk/freeport"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
Loading…
Reference in New Issue