You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/service_manager.go

238 lines
7.0 KiB

package agent
import (
"fmt"
"sync"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"golang.org/x/net/context"
)
// The ServiceManager is a layer for service registration in between the agent
// and the local state. Any services must be registered with the ServiceManager,
// which then maintains a long-running watch of any globally-set service or proxy
// configuration that applies to the service in order to register the final, merged
// service configuration locally in the agent state.
type ServiceManager struct {
services map[string]*serviceConfigWatch
agent *Agent
sync.Mutex
}
func NewServiceManager(agent *Agent) *ServiceManager {
return &ServiceManager{
services: make(map[string]*serviceConfigWatch),
agent: agent,
}
}
// AddService starts a new serviceConfigWatch if the service has not been registered, and
// updates the existing registration if it has. For a new service, a call will also be made
// to fetch the merged global defaults that apply to the service in order to compose the
// initial registration.
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
reg := serviceRegistration{
service: service,
chkTypes: chkTypes,
persist: persist,
token: token,
source: source,
}
// If a service watch already exists, update the registration. Otherwise,
// start a new config watcher.
s.Lock()
watch, ok := s.services[service.ID]
s.Unlock()
if ok {
if err := watch.updateRegistration(&reg); err != nil {
return err
}
s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID)
} else {
// This is a new entry, so get the existing global config and do the initial
// registration with the merged config.
args := structs.ServiceConfigRequest{
Name: service.Service,
Datacenter: s.agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken},
}
if token != "" {
args.QueryOptions.Token = token
}
var resp structs.ServiceConfigResponse
if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil {
s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v",
service.Service, err)
}
watch := &serviceConfigWatch{
updateCh: make(chan cache.UpdateEvent, 1),
agent: s.agent,
config: &resp.Definition,
}
// Force an update/register immediately.
if err := watch.updateRegistration(&reg); err != nil {
return err
}
s.Lock()
s.services[service.ID] = watch
s.Unlock()
if err := watch.Start(); err != nil {
return err
}
s.agent.logger.Printf("[DEBUG] agent.manager: adding local registration for service %q", service.ID)
}
return nil
}
func (s *ServiceManager) RemoveService(serviceID string) {
s.Lock()
defer s.Unlock()
serviceWatch, ok := s.services[serviceID]
if !ok {
return
}
serviceWatch.Stop()
delete(s.services, serviceID)
}
// serviceRegistration represents a locally registered service.
type serviceRegistration struct {
service *structs.NodeService
chkTypes []*structs.CheckType
persist bool
token string
source configSource
}
// serviceConfigWatch is a long running helper for composing the end config
// for a given service from both the local registration and the global
// service/proxy defaults.
type serviceConfigWatch struct {
registration *serviceRegistration
config *structs.ServiceDefinition
agent *Agent
updateCh chan cache.UpdateEvent
ctx context.Context
cancelFunc func()
sync.Mutex
}
func (s *serviceConfigWatch) Start() error {
s.ctx, s.cancelFunc = context.WithCancel(context.Background())
if err := s.startConfigWatch(); err != nil {
return err
}
go s.runWatch()
return nil
}
func (s *serviceConfigWatch) Stop() {
s.cancelFunc()
}
// runWatch handles any update events from the cache.Notify until the
// config watch is shut down.
func (s *serviceConfigWatch) runWatch() {
for {
select {
case <-s.ctx.Done():
return
case event := <-s.updateCh:
if err := s.handleUpdate(event, false); err != nil {
s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err)
}
}
}
}
// handleUpdate receives an update event about either the service registration or the
// global config defaults, updates the local state and re-registers the service with
// the newly merged config. This function takes the serviceConfigWatch lock to ensure
// only one update can be happening at a time.
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error {
// Take the agent state lock if needed. This is done before the local config watch
// lock in order to prevent a race between this config watch and others - the config
// watch lock is the inner lock and the agent stateLock is the outer lock.
if !locked {
s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock()
}
s.Lock()
defer s.Unlock()
if event.Err != nil {
return fmt.Errorf("error watching service config: %v", event.Err)
}
switch event.Result.(type) {
case *serviceRegistration:
s.registration = event.Result.(*serviceRegistration)
case *structs.ServiceConfigResponse:
resp := event.Result.(*structs.ServiceConfigResponse)
s.config = &resp.Definition
default:
return fmt.Errorf("unknown update event type: %T", event)
}
service := s.mergeServiceConfig()
err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source)
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
}
// startConfigWatch starts a cache.Notify goroutine to run a continuous blocking query
// on the resolved service config for this service.
func (s *serviceConfigWatch) startConfigWatch() error {
name := s.registration.service.Service
req := &structs.ServiceConfigRequest{
Name: name,
Datacenter: s.agent.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken},
}
if s.registration.token != "" {
req.QueryOptions.Token = s.registration.token
}
err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh)
return err
}
// updateRegistration does a synchronous update of the local service registration and
// returns the result.
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error {
return s.handleUpdate(cache.UpdateEvent{
Result: registration,
}, true)
}
// mergeServiceConfig returns the final effective config for the watched service,
// including the latest known global defaults from the servers.
func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService {
if s.config == nil {
return s.registration.service
}
svc := s.config.NodeService()
svc.Merge(s.registration.service)
return svc
}