Merge pull request #7675 from hashicorp/dnephin/add-service

service_manager: small changes
pull/8120/head
Daniel Nephin 2020-06-16 12:40:11 -04:00 committed by GitHub
commit bb512f5c07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 58 deletions

View File

@ -27,10 +27,11 @@ type ServiceManager struct {
// services tracks all active watches for registered services // services tracks all active watches for registered services
services map[structs.ServiceID]*serviceConfigWatch services map[structs.ServiceID]*serviceConfigWatch
// registerCh is a channel for processing service registrations in the // registerCh is a channel for receiving service registration requests from
// background when watches are notified of changes. All sends and receives // from serviceConfigWatchers.
// must also obey the ctx.Done() channel to avoid a deadlock during // The registrations are handled in the background when watches are notified of
// shutdown. // changes. All sends and receives must also obey the ctx.Done() channel to
// avoid a deadlock during shutdown.
registerCh chan *asyncRegisterRequest registerCh chan *asyncRegisterRequest
// ctx is the shared context for all goroutines launched // ctx is the shared context for all goroutines launched
@ -167,16 +168,15 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error {
// merged config. // merged config.
watch := &serviceConfigWatch{ watch := &serviceConfigWatch{
registration: reg, registration: reg,
updateCh: make(chan cache.UpdateEvent, 1),
agent: s.agent, agent: s.agent,
registerCh: s.registerCh, registerCh: s.registerCh,
} }
err := watch.RegisterAndStart( err := watch.RegisterAndStart(
s.ctx,
previousDefaults, previousDefaults,
waitForCentralConfig, waitForCentralConfig,
persistServiceConfig, persistServiceConfig,
s.ctx,
&s.running, &s.running,
) )
if err != nil { if err != nil {
@ -220,7 +220,6 @@ type serviceRegistration struct {
// service/proxy defaults. // service/proxy defaults.
type serviceConfigWatch struct { type serviceConfigWatch struct {
registration *serviceRegistration registration *serviceRegistration
defaults *structs.ServiceConfigResponse
agent *Agent agent *Agent
registerCh chan<- *asyncRegisterRequest registerCh chan<- *asyncRegisterRequest
@ -229,39 +228,34 @@ type serviceConfigWatch struct {
// we check to see if a new cache watch is needed. // we check to see if a new cache watch is needed.
cacheKey string cacheKey string
// updateCh receives changes from cache watchers
updateCh chan cache.UpdateEvent
ctx context.Context
cancelFunc func() cancelFunc func()
running sync.WaitGroup running sync.WaitGroup
} }
// NOTE: this is called while holding the Agent.stateLock // NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) RegisterAndStart( func (w *serviceConfigWatch) RegisterAndStart(
previousDefaults *structs.ServiceConfigResponse, ctx context.Context,
serviceDefaults *structs.ServiceConfigResponse,
waitForCentralConfig bool, waitForCentralConfig bool,
persistServiceConfig bool, persistServiceConfig bool,
ctx context.Context,
wg *sync.WaitGroup, wg *sync.WaitGroup,
) error { ) error {
service := w.registration.service
// Either we explicitly block waiting for defaults before registering, // Either we explicitly block waiting for defaults before registering,
// or we feed it some seed data (or NO data) and bypass the blocking // or we feed it some seed data (or NO data) and bypass the blocking
// operation. Either way the watcher will end up with something flagged // operation. Either way the watcher will end up with something flagged
// as defaults even if they don't actually reflect actual defaults. // as defaults even if they don't actually reflect actual defaults.
if waitForCentralConfig { if waitForCentralConfig {
if err := w.fetchDefaults(ctx); err != nil { var err error
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) serviceDefaults, err = w.fetchDefaults(ctx)
if err != nil {
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
w.registration.service.ID, err)
} }
} else {
w.defaults = previousDefaults
} }
// Merge the local registration with the central defaults and update this service // Merge the local registration with the central defaults and update this service
// in the local state. // in the local state.
merged, err := w.mergeServiceConfig() merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
if err != nil { if err != nil {
return err return err
} }
@ -273,7 +267,7 @@ func (w *serviceConfigWatch) RegisterAndStart(
service: merged, service: merged,
chkTypes: w.registration.chkTypes, chkTypes: w.registration.chkTypes,
persistService: w.registration.service, persistService: w.registration.service,
persistDefaults: w.defaults, persistDefaults: serviceDefaults,
persist: w.registration.persist, persist: w.registration.persist,
persistServiceConfig: persistServiceConfig, persistServiceConfig: persistServiceConfig,
token: w.registration.token, token: w.registration.token,
@ -290,22 +284,20 @@ func (w *serviceConfigWatch) RegisterAndStart(
} }
// NOTE: this is called while holding the Agent.stateLock // NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) (*structs.ServiceConfigResponse, error) {
req := makeConfigRequest(w.agent, w.registration) req := makeConfigRequest(w.agent, w.registration)
raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req)
if err != nil { if err != nil {
return err return nil, err
} }
reply, ok := raw.(*structs.ServiceConfigResponse) serviceConfig, ok := raw.(*structs.ServiceConfigResponse)
if !ok { if !ok {
// This should never happen, but we want to protect against panics // This should never happen, but we want to protect against panics
return fmt.Errorf("internal error: response type not correct") return nil, fmt.Errorf("internal error: response type not correct")
} }
return serviceConfig, nil
w.defaults = reply
return nil
} }
// Start starts the config watch and a goroutine to handle updates over the // Start starts the config watch and a goroutine to handle updates over the
@ -314,13 +306,15 @@ func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error {
// //
// NOTE: this is called while holding the Agent.stateLock // NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error { func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error {
w.ctx, w.cancelFunc = context.WithCancel(ctx) ctx, w.cancelFunc = context.WithCancel(ctx)
// Configure and start a cache.Notify goroutine to run a continuous // Configure and start a cache.Notify goroutine to run a continuous
// blocking query on the resolved service config for this service. // blocking query on the resolved service config for this service.
req := makeConfigRequest(w.agent, w.registration) req := makeConfigRequest(w.agent, w.registration)
w.cacheKey = req.CacheInfo().Key w.cacheKey = req.CacheInfo().Key
updateCh := make(chan cache.UpdateEvent, 1)
// We use the cache key as the correlationID here. Notify in general will not // We use the cache key as the correlationID here. Notify in general will not
// respond on the updateCh after the context is cancelled however there could // respond on the updateCh after the context is cancelled however there could
// possible be a race where it has only just got an update and checked the // possible be a race where it has only just got an update and checked the
@ -328,11 +322,11 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
// the cacheKey allows us to ignore updates from the old cache watch and makes // the cacheKey allows us to ignore updates from the old cache watch and makes
// even this rare edge case safe. // even this rare edge case safe.
err := w.agent.cache.Notify( err := w.agent.cache.Notify(
w.ctx, ctx,
cachetype.ResolvedServiceConfigName, cachetype.ResolvedServiceConfigName,
req, req,
w.cacheKey, w.cacheKey,
w.updateCh, updateCh,
) )
if err != nil { if err != nil {
w.cancelFunc() w.cancelFunc()
@ -341,7 +335,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
w.running.Add(1) w.running.Add(1)
wg.Add(1) wg.Add(1)
go w.runWatch(wg) go w.runWatch(ctx, wg, updateCh)
return nil return nil
} }
@ -355,16 +349,16 @@ func (w *serviceConfigWatch) Stop() {
// config watch is shut down. // config watch is shut down.
// //
// NOTE: the caller must NOT hold the Agent.stateLock! // NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) {
defer wg.Done() defer wg.Done()
defer w.running.Done() defer w.running.Done()
for { for {
select { select {
case <-w.ctx.Done(): case <-ctx.Done():
return return
case event := <-w.updateCh: case event := <-updateCh:
if err := w.handleUpdate(event); err != nil { if err := w.handleUpdate(ctx, event); err != nil {
w.agent.logger.Error("error handling service update", "error", err) w.agent.logger.Error("error handling service update", "error", err)
} }
} }
@ -375,14 +369,14 @@ func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) {
// the local state and re-registers the service with the newly merged config. // the local state and re-registers the service with the newly merged config.
// //
// NOTE: the caller must NOT hold the Agent.stateLock! // NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.UpdateEvent) error {
// If we got an error, log a warning if this is the first update; otherwise return the error. // If we got an error, log a warning if this is the first update; otherwise return the error.
// We want the initial update to cause a service registration no matter what. // We want the initial update to cause a service registration no matter what.
if event.Err != nil { if event.Err != nil {
return fmt.Errorf("error watching service config: %v", event.Err) return fmt.Errorf("error watching service config: %v", event.Err)
} }
res, ok := event.Result.(*structs.ServiceConfigResponse) serviceDefaults, ok := event.Result.(*structs.ServiceConfigResponse)
if !ok { if !ok {
return fmt.Errorf("unknown update event type: %T", event) return fmt.Errorf("unknown update event type: %T", event)
} }
@ -394,21 +388,18 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error {
// delivered) the correct config so just ignore this old message. // delivered) the correct config so just ignore this old message.
return nil return nil
} }
w.defaults = res
// Merge the local registration with the central defaults and update this service // Merge the local registration with the central defaults and update this service
// in the local state. // in the local state.
merged, err := w.mergeServiceConfig() merged, err := mergeServiceConfig(serviceDefaults, w.registration.service)
if err != nil { if err != nil {
return err return err
} }
// While we were waiting on the agent state lock we may have been shutdown. // While we were waiting on the agent state lock we may have been shutdown.
// So avoid doing a registration in that case. // So avoid doing a registration in that case.
select { if err := ctx.Err(); err != nil {
case <-w.ctx.Done():
return nil return nil
default:
} }
registerReq := &asyncRegisterRequest{ registerReq := &asyncRegisterRequest{
@ -416,7 +407,7 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error {
service: merged, service: merged,
chkTypes: w.registration.chkTypes, chkTypes: w.registration.chkTypes,
persistService: w.registration.service, persistService: w.registration.service,
persistDefaults: w.defaults, persistDefaults: serviceDefaults,
persist: w.registration.persist, persist: w.registration.persist,
persistServiceConfig: true, persistServiceConfig: true,
token: w.registration.token, token: w.registration.token,
@ -427,13 +418,13 @@ func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error {
} }
select { select {
case <-w.ctx.Done(): case <-ctx.Done():
return nil return nil
case w.registerCh <- registerReq: case w.registerCh <- registerReq:
} }
select { select {
case <-w.ctx.Done(): case <-ctx.Done():
return nil return nil
case err := <-registerReq.Reply: case err := <-registerReq.Reply:
@ -485,19 +476,17 @@ func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs
return req return req
} }
// mergeServiceConfig returns the final effective config for the watched service, // mergeServiceConfig from service into defaults to produce the final effective
// including the latest known global defaults from the servers. // config for the watched service.
// func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) {
// NOTE: this is called while holding the Agent.stateLock if defaults == nil {
func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) { return service, nil
if w.defaults == nil {
return w.registration.service, nil
} }
// We don't want to change s.registration in place since it is our source of // We don't want to change s.registration in place since it is our source of
// truth about what was actually registered before defaults applied. So copy // truth about what was actually registered before defaults applied. So copy
// it first. // it first.
nsRaw, err := copystructure.Copy(w.registration.service) nsRaw, err := copystructure.Copy(service)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -505,16 +494,16 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error)
// Merge proxy defaults // Merge proxy defaults
ns := nsRaw.(*structs.NodeService) ns := nsRaw.(*structs.NodeService)
if err := mergo.Merge(&ns.Proxy.Config, w.defaults.ProxyConfig); err != nil { if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil {
return nil, err return nil, err
} }
if err := mergo.Merge(&ns.Proxy.Expose, w.defaults.Expose); err != nil { if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil {
return nil, err return nil, err
} }
if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault {
ns.Proxy.MeshGateway.Mode = w.defaults.MeshGateway.Mode ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode
} }
// Merge upstream defaults if there were any returned // Merge upstream defaults if there were any returned
@ -530,7 +519,7 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error)
us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode
} }
usCfg, ok := w.defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID())
if !ok { if !ok {
// No config defaults to merge // No config defaults to merge
continue continue