From 7c25869e6776d0e677d2b1630a9e34772af2033f Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 17 Apr 2019 21:35:19 -0700 Subject: [PATCH] Add the service registration manager to the agent --- agent/agent.go | 7 + agent/cache-types/resolved_service_config.go | 52 +++++ .../resolved_service_config_test.go | 67 +++++++ agent/service_manager.go | 183 ++++++++++++++++++ agent/structs/config_entry.go | 27 +++ 5 files changed, 336 insertions(+) create mode 100644 agent/cache-types/resolved_service_config.go create mode 100644 agent/cache-types/resolved_service_config_test.go create mode 100644 agent/service_manager.go diff --git a/agent/agent.go b/agent/agent.go index aa9de46bb4..771f8a4119 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -243,6 +243,8 @@ type Agent struct { // directly. proxyConfig *proxycfg.Manager + serviceManager *ServiceManager + // xdsServer is the Server instance that serves xDS gRPC API. xdsServer *xds.Server @@ -473,6 +475,9 @@ func (a *Agent) Start() error { } }() + // Start the service registration manager. + a.serviceManager = NewServiceManager(a) + // Start watching for critical services to deregister, based on their // checks. go a.reapServices() @@ -1892,6 +1897,7 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { a.stateLock.Lock() defer a.stateLock.Unlock() + a.serviceManager.AddService(service, chkTypes, persist, token, source) return a.addServiceLocked(service, chkTypes, persist, token, source) } @@ -2055,6 +2061,7 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check func (a *Agent) RemoveService(serviceID string, persist bool) error { a.stateLock.Lock() defer a.stateLock.Unlock() + a.serviceManager.RemoveService(serviceID) return a.removeServiceLocked(serviceID, persist) } diff --git a/agent/cache-types/resolved_service_config.go b/agent/cache-types/resolved_service_config.go new file mode 100644 index 0000000000..d1038283fc --- /dev/null +++ b/agent/cache-types/resolved_service_config.go @@ -0,0 +1,52 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const ResolvedServiceConfigName = "resolved-service-config" + +// ResolvedServiceConfig supports fetching the config for a service resolved from +// the global proxy defaults and the centrally registered service config. +type ResolvedServiceConfig struct { + RPC RPC +} + +func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a ServiceConfigRequest. + reqReal, ok := req.(*structs.ServiceConfigRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Allways allow stale - there's no point in hitting leader if the request is + // going to be served from cache and endup arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.ServiceConfigResponse + if err := c.RPC.RPC("ConfigEntry.ResolveServiceConfig", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} + +func (c *ResolvedServiceConfig) SupportsBlocking() bool { + return true +} diff --git a/agent/cache-types/resolved_service_config_test.go b/agent/cache-types/resolved_service_config_test.go new file mode 100644 index 0000000000..f890023934 --- /dev/null +++ b/agent/cache-types/resolved_service_config_test.go @@ -0,0 +1,67 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestResolvedServiceConfig(t *testing.T) { + require := require.New(t) + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &ResolvedServiceConfig{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.ServiceConfigResponse + rpc.On("RPC", "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.ServiceConfigRequest) + require.Equal(uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime) + require.Equal("foo", req.Name) + require.True(req.AllowStale) + + reply := args.Get(2).(*structs.ServiceConfigResponse) + reply.Definition = structs.ServiceDefinition{ + ID: "1234", + Name: "foo", + } + + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.ServiceConfigRequest{ + Datacenter: "dc1", + Name: "foo", + }) + require.NoError(err) + require.Equal(cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) +} + +func TestResolvedServiceConfig_badReqType(t *testing.T) { + require := require.New(t) + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &ResolvedServiceConfig{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(err) + require.Contains(err.Error(), "wrong type") + +} diff --git a/agent/service_manager.go b/agent/service_manager.go new file mode 100644 index 0000000000..ea131dd105 --- /dev/null +++ b/agent/service_manager.go @@ -0,0 +1,183 @@ +package agent + +import ( + "fmt" + "sync" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/structs" + "golang.org/x/net/context" +) + +type ServiceManager struct { + services map[string]*serviceConfigWatch + agent *Agent + + sync.Mutex +} + +func NewServiceManager(agent *Agent) *ServiceManager { + return &ServiceManager{ + services: make(map[string]*serviceConfigWatch), + agent: agent, + } +} + +func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) { + s.Lock() + defer s.Unlock() + + reg := serviceRegistration{ + service: service, + chkTypes: chkTypes, + persist: persist, + token: token, + source: source, + } + + // If a service watch already exists, update the registration. Otherwise, + // start a new config watcher. + watch, ok := s.services[service.ID] + if ok { + watch.updateRegistration(®) + } else { + watch := &serviceConfigWatch{ + registration: ®, + updateCh: make(chan cache.UpdateEvent, 1), + agent: s.agent, + } + + s.services[service.ID] = watch + watch.Start() + } +} + +func (s *ServiceManager) RemoveService(serviceID string) { + s.Lock() + defer s.Unlock() + + serviceWatch, ok := s.services[serviceID] + if !ok { + return + } + + serviceWatch.Stop() + delete(s.services, serviceID) +} + +type serviceRegistration struct { + service *structs.NodeService + chkTypes []*structs.CheckType + persist bool + token string + source configSource +} + +type serviceConfigWatch struct { + registration *serviceRegistration + config *structs.ServiceDefinition + + agent *Agent + + updateCh chan cache.UpdateEvent + ctx context.Context + cancelFunc func() + + sync.RWMutex +} + +func (s *serviceConfigWatch) Start() error { + s.ctx, s.cancelFunc = context.WithCancel(context.Background()) + if err := s.startConfigWatch(); err != nil { + return err + } + go s.runWatch() + + return nil +} + +func (s *serviceConfigWatch) runWatch() { + for { + select { + case <-s.ctx.Done(): + return + case event := <-s.updateCh: + s.handleUpdate(event) + } + } +} + +func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) { + switch event.Result.(type) { + case serviceRegistration: + s.Lock() + s.registration = event.Result.(*serviceRegistration) + s.Unlock() + case structs.ServiceConfigResponse: + s.Lock() + s.config = &event.Result.(*structs.ServiceConfigResponse).Definition + s.Unlock() + default: + s.agent.logger.Printf("[ERR] unknown update event type: %T", event) + } + + service := s.mergeServiceConfig() + s.agent.logger.Printf("[INFO] updating service registration: %v, %v", service.ID, service.Meta) + /*err := s.agent.AddService(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) + if err != nil { + s.agent.logger.Printf("[ERR] error updating service registration: %v", err) + }*/ +} + +func (s *serviceConfigWatch) startConfigWatch() error { + s.RLock() + name := s.registration.service.Service + s.RUnlock() + + req := &structs.ServiceConfigRequest{ + Name: name, + Datacenter: s.agent.config.Datacenter, + } + err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh) + + return err +} + +func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) { + s.updateCh <- cache.UpdateEvent{ + Result: registration, + } +} + +func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { + return nil +} + +func (s *serviceConfigWatch) Stop() { + s.cancelFunc() +} + +/* +// Construct the service config request. This will be re-used with an updated + // index to watch for changes in the effective service config. + req := structs.ServiceConfigRequest{ + Name: s.registration.service.Service, + Datacenter: s.agent.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.agent.tokens.AgentToken()}, + } + + consul.RetryLoopBackoff(s.shutdownCh, func() error { + var reply structs.ServiceConfigResponse + if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &req, &reply); err != nil { + return err + } + + s.updateConfig(&reply.Definition) + + req.QueryOptions.MinQueryIndex = reply.QueryMeta.Index + return nil + }, func(err error) { + s.agent.logger.Printf("[ERR] Error getting service config: %v", err) + }) +*/ diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 7f571109d9..e1ee3fd5cc 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -2,10 +2,13 @@ package structs import ( "fmt" + "strconv" "strings" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/go-msgpack/codec" + "github.com/mitchellh/hashstructure" ) const ( @@ -265,6 +268,30 @@ func (s *ServiceConfigRequest) RequestDatacenter() string { return s.Datacenter } +func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + Token: r.Token, + Datacenter: r.Datacenter, + MinIndex: r.MinQueryIndex, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + } + + // To calculate the cache key we only hash the service name. The + // datacenter is handled by the cache framework. The other fields are + // not, but should not be used in any cache types. + v, err := hashstructure.Hash(r.Name, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + type ServiceConfigResponse struct { Definition ServiceDefinition