mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
382 lines
13 KiB
382 lines
13 KiB
package agent |
|
|
|
import ( |
|
"fmt" |
|
"sync" |
|
|
|
"github.com/hashicorp/consul/agent/cache" |
|
cachetype "github.com/hashicorp/consul/agent/cache-types" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/imdario/mergo" |
|
"github.com/mitchellh/copystructure" |
|
"golang.org/x/net/context" |
|
) |
|
|
|
// The ServiceManager is a layer for service registration in between the agent |
|
// and the local state. Any services must be registered with the ServiceManager, |
|
// which then maintains a long-running watch of any globally-set service or proxy |
|
// configuration that applies to the service in order to register the final, merged |
|
// service configuration locally in the agent state. |
|
type ServiceManager struct { |
|
services map[string]*serviceConfigWatch |
|
agent *Agent |
|
|
|
lock sync.Mutex |
|
} |
|
|
|
func NewServiceManager(agent *Agent) *ServiceManager { |
|
return &ServiceManager{ |
|
services: make(map[string]*serviceConfigWatch), |
|
agent: agent, |
|
} |
|
} |
|
|
|
// AddService starts a new serviceConfigWatch if the service has not been registered, and |
|
// updates the existing registration if it has. For a new service, a call will also be made |
|
// to fetch the merged global defaults that apply to the service in order to compose the |
|
// initial registration. |
|
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { |
|
// For now only sidecar proxies have anything that can be configured |
|
// centrally. So bypass the whole manager for regular services. |
|
if !service.IsSidecarProxy() { |
|
return s.agent.addServiceInternal(service, chkTypes, persist, token, source) |
|
} |
|
|
|
s.lock.Lock() |
|
defer s.lock.Unlock() |
|
|
|
reg := serviceRegistration{ |
|
service: service, |
|
chkTypes: chkTypes, |
|
persist: persist, |
|
token: token, |
|
source: source, |
|
} |
|
|
|
// If a service watch already exists, update the registration. Otherwise, |
|
// start a new config watcher. |
|
watch, ok := s.services[service.ID] |
|
if ok { |
|
if err := watch.updateRegistration(®); err != nil { |
|
return err |
|
} |
|
s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID) |
|
} else { |
|
// This is a new entry, so get the existing global config and do the initial |
|
// registration with the merged config. |
|
watch := &serviceConfigWatch{ |
|
registration: ®, |
|
readyCh: make(chan error), |
|
updateCh: make(chan cache.UpdateEvent, 1), |
|
agent: s.agent, |
|
} |
|
|
|
// Start the config watch, which starts a blocking query for the resolved service config |
|
// in the background. |
|
if err := watch.Start(); err != nil { |
|
return err |
|
} |
|
|
|
// Call ReadyWait to block until the cache has returned the initial config and the service |
|
// has been registered. |
|
if err := watch.ReadyWait(); err != nil { |
|
watch.Stop() |
|
return err |
|
} |
|
|
|
s.services[service.ID] = watch |
|
|
|
s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (s *ServiceManager) RemoveService(serviceID string) { |
|
s.lock.Lock() |
|
defer s.lock.Unlock() |
|
|
|
serviceWatch, ok := s.services[serviceID] |
|
if !ok { |
|
return |
|
} |
|
|
|
serviceWatch.Stop() |
|
delete(s.services, serviceID) |
|
} |
|
|
|
// serviceRegistration represents a locally registered service. |
|
type serviceRegistration struct { |
|
service *structs.NodeService |
|
chkTypes []*structs.CheckType |
|
persist bool |
|
token string |
|
source configSource |
|
} |
|
|
|
// serviceConfigWatch is a long running helper for composing the end config |
|
// for a given service from both the local registration and the global |
|
// service/proxy defaults. |
|
type serviceConfigWatch struct { |
|
registration *serviceRegistration |
|
defaults *structs.ServiceConfigResponse |
|
|
|
agent *Agent |
|
|
|
// readyCh is used for ReadyWait in order to block until the first update |
|
// for the resolved service config is received from the cache. |
|
readyCh chan error |
|
|
|
// ctx and cancelFunc store the overall context that lives as long as the |
|
// Watch instance is needed, possibly spanning multiple cache.Notify |
|
// lifetimes. |
|
ctx context.Context |
|
cancelFunc func() |
|
|
|
// cacheKey stores the key of the current request, when registration changes |
|
// we check to see if a new cache watch is needed. |
|
cacheKey string |
|
|
|
// updateCh receives changes from cache watchers or registration changes. |
|
updateCh chan cache.UpdateEvent |
|
|
|
// notifyCancel, if non-nil it the cancel func that will stop the currently |
|
// active Notify loop. It does not cancel ctx and is used when we need to |
|
// switch to a new Notify call because cache key changed. |
|
notifyCancel func() |
|
|
|
lock sync.Mutex |
|
} |
|
|
|
// Start starts the config watch and a goroutine to handle updates over |
|
// the updateCh. This is not safe to call more than once. |
|
func (s *serviceConfigWatch) Start() error { |
|
s.ctx, s.cancelFunc = context.WithCancel(context.Background()) |
|
if err := s.ensureConfigWatch(); err != nil { |
|
return err |
|
} |
|
go s.runWatch() |
|
|
|
return nil |
|
} |
|
|
|
func (s *serviceConfigWatch) Stop() { |
|
s.cancelFunc() |
|
} |
|
|
|
// ReadyWait blocks until the readyCh is closed, which means the initial |
|
// registration of the service has been completed. If there was an error |
|
// with the initial registration, it will be returned. |
|
func (s *serviceConfigWatch) ReadyWait() error { |
|
err := <-s.readyCh |
|
return err |
|
} |
|
|
|
// runWatch handles any update events from the cache.Notify until the |
|
// config watch is shut down. |
|
func (s *serviceConfigWatch) runWatch() { |
|
firstRun := true |
|
for { |
|
select { |
|
case <-s.ctx.Done(): |
|
return |
|
case event := <-s.updateCh: |
|
if err := s.handleUpdate(event, false, firstRun); err != nil { |
|
s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) |
|
} |
|
firstRun = false |
|
} |
|
} |
|
} |
|
|
|
// handleUpdate receives an update event about either the service registration or the |
|
// global config defaults, updates the local state and re-registers the service with |
|
// the newly merged config. This function takes the serviceConfigWatch lock to ensure |
|
// only one update can be happening at a time. |
|
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, firstRun bool) error { |
|
// Take the agent state lock if needed. This is done before the local config watch |
|
// lock in order to prevent a race between this config watch and others - the config |
|
// watch lock is the inner lock and the agent stateLock is the outer lock. If this is the |
|
// first run we also don't need to take the stateLock, as this is being waited on |
|
// synchronously by a caller that already holds it. |
|
if !locked && !firstRun { |
|
s.agent.stateLock.Lock() |
|
defer s.agent.stateLock.Unlock() |
|
} |
|
s.lock.Lock() |
|
defer s.lock.Unlock() |
|
|
|
// If we got an error, log a warning if this is the first update; otherwise return the error. |
|
// We want the initial update to cause a service registration no matter what. |
|
if event.Err != nil { |
|
if firstRun { |
|
s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v", |
|
s.registration.service.ID, event.Err) |
|
} else { |
|
return fmt.Errorf("error watching service config: %v", event.Err) |
|
} |
|
} else { |
|
switch res := event.Result.(type) { |
|
case *serviceRegistration: |
|
s.registration = res |
|
// We may need to restart watch if upstreams changed |
|
if err := s.ensureConfigWatch(); err != nil { |
|
return err |
|
} |
|
case *structs.ServiceConfigResponse: |
|
// Sanity check this even came from the currently active watch to ignore |
|
// rare races when switching cache keys |
|
if event.CorrelationID != s.cacheKey { |
|
// It's a no-op. The new watcher will deliver (or may have already |
|
// delivered) the correct config so just ignore this old message. |
|
return nil |
|
} |
|
s.defaults = res |
|
default: |
|
return fmt.Errorf("unknown update event type: %T", event) |
|
} |
|
} |
|
|
|
// Merge the local registration with the central defaults and update this service |
|
// in the local state. |
|
service, err := s.mergeServiceConfig() |
|
if err != nil { |
|
return err |
|
} |
|
if err := s.updateAgentRegistration(service); err != nil { |
|
// If this is the initial registration, return the error through the readyCh |
|
// so it can be passed back to the original caller. |
|
if firstRun { |
|
s.readyCh <- err |
|
} |
|
return fmt.Errorf("error updating service registration: %v", err) |
|
} |
|
|
|
// If this is the first registration, set the ready status by closing the channel. |
|
if firstRun { |
|
close(s.readyCh) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// updateAgentRegistration updates the service (and its sidecar, if applicable) in the |
|
// local state. |
|
func (s *serviceConfigWatch) updateAgentRegistration(ns *structs.NodeService) error { |
|
return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) |
|
} |
|
|
|
// ensureConfigWatch starts a cache.Notify goroutine to run a continuous |
|
// blocking query on the resolved service config for this service. If the |
|
// registration has changed in a way that requires a new blocking query, it will |
|
// cancel any current watch and start a new one. It is a no-op if there is an |
|
// existing watch that is sufficient for the current registration. It is not |
|
// thread-safe and must only be called from the Start method (which is only safe |
|
// to call once as documented) or from inside the run loop. |
|
func (s *serviceConfigWatch) ensureConfigWatch() error { |
|
ns := s.registration.service |
|
name := ns.Service |
|
var upstreams []string |
|
|
|
// Note that only sidecar proxies should even make it here for now although |
|
// later that will change to add the condition. |
|
if ns.IsSidecarProxy() { |
|
// This is a sidecar proxy, ignore the proxy service's config since we are |
|
// managed by the target service config. |
|
name = ns.Proxy.DestinationServiceName |
|
|
|
// Also if we have any upstreams defined, add them to the request so we can |
|
// learn about their configs. |
|
for _, us := range ns.Proxy.Upstreams { |
|
if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService { |
|
upstreams = append(upstreams, us.DestinationName) |
|
} |
|
} |
|
} |
|
|
|
req := &structs.ServiceConfigRequest{ |
|
Name: name, |
|
Datacenter: s.agent.config.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, |
|
Upstreams: upstreams, |
|
} |
|
if s.registration.token != "" { |
|
req.QueryOptions.Token = s.registration.token |
|
} |
|
|
|
// See if this request is different from the current one |
|
cacheKey := req.CacheInfo().Key |
|
if cacheKey == s.cacheKey { |
|
return nil |
|
} |
|
|
|
// If there is an existing notify running, stop it first. This may leave a |
|
// blocking query running in the background but the Notify loop will swallow |
|
// the response and exit when it next unblocks so we can consider it stopped. |
|
if s.notifyCancel != nil { |
|
s.notifyCancel() |
|
} |
|
|
|
// Make a new context just for this Notify call |
|
ctx, cancel := context.WithCancel(s.ctx) |
|
s.notifyCancel = cancel |
|
s.cacheKey = cacheKey |
|
// We use the cache key as the correlationID here. Notify in general will not |
|
// respond on the updateCh after the context is cancelled however there could |
|
// possible be a race where it has only just got an update and checked the |
|
// context before we cancel and so might still deliver the old event. Using |
|
// the cacheKey allows us to ignore updates from the old cache watch and makes |
|
// even this rare edge case safe. |
|
err := s.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req, |
|
s.cacheKey, s.updateCh) |
|
|
|
return err |
|
} |
|
|
|
// updateRegistration does a synchronous update of the local service registration and |
|
// returns the result. The agent stateLock should be held when calling this function. |
|
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { |
|
return s.handleUpdate(cache.UpdateEvent{ |
|
Result: registration, |
|
}, true, false) |
|
} |
|
|
|
// mergeServiceConfig returns the final effective config for the watched service, |
|
// including the latest known global defaults from the servers. |
|
func (s *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) { |
|
if s.defaults == nil || !s.registration.service.IsSidecarProxy() { |
|
return s.registration.service, nil |
|
} |
|
|
|
// We don't want to change s.registration in place since it is our source of |
|
// truth about what was actually registered before defaults applied. So copy |
|
// it first. |
|
nsRaw, err := copystructure.Copy(s.registration.service) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// Merge proxy defaults |
|
ns := nsRaw.(*structs.NodeService) |
|
|
|
if err := mergo.Merge(&ns.Proxy.Config, s.defaults.ProxyConfig); err != nil { |
|
return nil, err |
|
} |
|
// Merge upstream defaults if there were any returned |
|
for i := range ns.Proxy.Upstreams { |
|
// Get a pointer not a value copy of the upstream struct |
|
us := &ns.Proxy.Upstreams[i] |
|
if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService { |
|
continue |
|
} |
|
usCfg, ok := s.defaults.UpstreamConfigs[us.DestinationName] |
|
if !ok { |
|
// No config defaults to merge |
|
continue |
|
} |
|
if err := mergo.Merge(&us.Config, usCfg); err != nil { |
|
return nil, err |
|
} |
|
} |
|
return ns, err |
|
}
|
|
|