mirror of https://github.com/hashicorp/consul
Add the service registration manager to the agent
parent
b186c3020c
commit
7c25869e67
|
@ -243,6 +243,8 @@ type Agent struct {
|
||||||
// directly.
|
// directly.
|
||||||
proxyConfig *proxycfg.Manager
|
proxyConfig *proxycfg.Manager
|
||||||
|
|
||||||
|
serviceManager *ServiceManager
|
||||||
|
|
||||||
// xdsServer is the Server instance that serves xDS gRPC API.
|
// xdsServer is the Server instance that serves xDS gRPC API.
|
||||||
xdsServer *xds.Server
|
xdsServer *xds.Server
|
||||||
|
|
||||||
|
@ -473,6 +475,9 @@ func (a *Agent) Start() error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Start the service registration manager.
|
||||||
|
a.serviceManager = NewServiceManager(a)
|
||||||
|
|
||||||
// Start watching for critical services to deregister, based on their
|
// Start watching for critical services to deregister, based on their
|
||||||
// checks.
|
// checks.
|
||||||
go a.reapServices()
|
go a.reapServices()
|
||||||
|
@ -1892,6 +1897,7 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
|
||||||
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
|
||||||
a.stateLock.Lock()
|
a.stateLock.Lock()
|
||||||
defer a.stateLock.Unlock()
|
defer a.stateLock.Unlock()
|
||||||
|
a.serviceManager.AddService(service, chkTypes, persist, token, source)
|
||||||
return a.addServiceLocked(service, chkTypes, persist, token, source)
|
return a.addServiceLocked(service, chkTypes, persist, token, source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2055,6 +2061,7 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check
|
||||||
func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
||||||
a.stateLock.Lock()
|
a.stateLock.Lock()
|
||||||
defer a.stateLock.Unlock()
|
defer a.stateLock.Unlock()
|
||||||
|
a.serviceManager.RemoveService(serviceID)
|
||||||
return a.removeServiceLocked(serviceID, persist)
|
return a.removeServiceLocked(serviceID, persist)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Recommended name for registration.
|
||||||
|
const ResolvedServiceConfigName = "resolved-service-config"
|
||||||
|
|
||||||
|
// ResolvedServiceConfig supports fetching the config for a service resolved from
|
||||||
|
// the global proxy defaults and the centrally registered service config.
|
||||||
|
type ResolvedServiceConfig struct {
|
||||||
|
RPC RPC
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
|
var result cache.FetchResult
|
||||||
|
|
||||||
|
// The request should be a ServiceConfigRequest.
|
||||||
|
reqReal, ok := req.(*structs.ServiceConfigRequest)
|
||||||
|
if !ok {
|
||||||
|
return result, fmt.Errorf(
|
||||||
|
"Internal cache failure: request wrong type: %T", req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the minimum query index to our current index so we block
|
||||||
|
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
|
||||||
|
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
|
||||||
|
|
||||||
|
// Allways allow stale - there's no point in hitting leader if the request is
|
||||||
|
// going to be served from cache and endup arbitrarily stale anyway. This
|
||||||
|
// allows cached service-discover to automatically read scale across all
|
||||||
|
// servers too.
|
||||||
|
reqReal.AllowStale = true
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
var reply structs.ServiceConfigResponse
|
||||||
|
if err := c.RPC.RPC("ConfigEntry.ResolveServiceConfig", reqReal, &reply); err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Value = &reply
|
||||||
|
result.Index = reply.QueryMeta.Index
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ResolvedServiceConfig) SupportsBlocking() bool {
|
||||||
|
return true
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestResolvedServiceConfig(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
rpc := TestRPC(t)
|
||||||
|
defer rpc.AssertExpectations(t)
|
||||||
|
typ := &ResolvedServiceConfig{RPC: rpc}
|
||||||
|
|
||||||
|
// Expect the proper RPC call. This also sets the expected value
|
||||||
|
// since that is return-by-pointer in the arguments.
|
||||||
|
var resp *structs.ServiceConfigResponse
|
||||||
|
rpc.On("RPC", "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil).
|
||||||
|
Run(func(args mock.Arguments) {
|
||||||
|
req := args.Get(1).(*structs.ServiceConfigRequest)
|
||||||
|
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
|
||||||
|
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
|
||||||
|
require.Equal("foo", req.Name)
|
||||||
|
require.True(req.AllowStale)
|
||||||
|
|
||||||
|
reply := args.Get(2).(*structs.ServiceConfigResponse)
|
||||||
|
reply.Definition = structs.ServiceDefinition{
|
||||||
|
ID: "1234",
|
||||||
|
Name: "foo",
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.QueryMeta.Index = 48
|
||||||
|
resp = reply
|
||||||
|
})
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
resultA, err := typ.Fetch(cache.FetchOptions{
|
||||||
|
MinIndex: 24,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}, &structs.ServiceConfigRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Name: "foo",
|
||||||
|
})
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(cache.FetchResult{
|
||||||
|
Value: resp,
|
||||||
|
Index: 48,
|
||||||
|
}, resultA)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolvedServiceConfig_badReqType(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
rpc := TestRPC(t)
|
||||||
|
defer rpc.AssertExpectations(t)
|
||||||
|
typ := &ResolvedServiceConfig{RPC: rpc}
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
||||||
|
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
|
||||||
|
require.Error(err)
|
||||||
|
require.Contains(err.Error(), "wrong type")
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,183 @@
|
||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ServiceManager struct {
|
||||||
|
services map[string]*serviceConfigWatch
|
||||||
|
agent *Agent
|
||||||
|
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServiceManager(agent *Agent) *ServiceManager {
|
||||||
|
return &ServiceManager{
|
||||||
|
services: make(map[string]*serviceConfigWatch),
|
||||||
|
agent: agent,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
|
||||||
|
reg := serviceRegistration{
|
||||||
|
service: service,
|
||||||
|
chkTypes: chkTypes,
|
||||||
|
persist: persist,
|
||||||
|
token: token,
|
||||||
|
source: source,
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a service watch already exists, update the registration. Otherwise,
|
||||||
|
// start a new config watcher.
|
||||||
|
watch, ok := s.services[service.ID]
|
||||||
|
if ok {
|
||||||
|
watch.updateRegistration(®)
|
||||||
|
} else {
|
||||||
|
watch := &serviceConfigWatch{
|
||||||
|
registration: ®,
|
||||||
|
updateCh: make(chan cache.UpdateEvent, 1),
|
||||||
|
agent: s.agent,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.services[service.ID] = watch
|
||||||
|
watch.Start()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServiceManager) RemoveService(serviceID string) {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
|
||||||
|
serviceWatch, ok := s.services[serviceID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceWatch.Stop()
|
||||||
|
delete(s.services, serviceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
type serviceRegistration struct {
|
||||||
|
service *structs.NodeService
|
||||||
|
chkTypes []*structs.CheckType
|
||||||
|
persist bool
|
||||||
|
token string
|
||||||
|
source configSource
|
||||||
|
}
|
||||||
|
|
||||||
|
type serviceConfigWatch struct {
|
||||||
|
registration *serviceRegistration
|
||||||
|
config *structs.ServiceDefinition
|
||||||
|
|
||||||
|
agent *Agent
|
||||||
|
|
||||||
|
updateCh chan cache.UpdateEvent
|
||||||
|
ctx context.Context
|
||||||
|
cancelFunc func()
|
||||||
|
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) Start() error {
|
||||||
|
s.ctx, s.cancelFunc = context.WithCancel(context.Background())
|
||||||
|
if err := s.startConfigWatch(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
go s.runWatch()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) runWatch() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case event := <-s.updateCh:
|
||||||
|
s.handleUpdate(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) {
|
||||||
|
switch event.Result.(type) {
|
||||||
|
case serviceRegistration:
|
||||||
|
s.Lock()
|
||||||
|
s.registration = event.Result.(*serviceRegistration)
|
||||||
|
s.Unlock()
|
||||||
|
case structs.ServiceConfigResponse:
|
||||||
|
s.Lock()
|
||||||
|
s.config = &event.Result.(*structs.ServiceConfigResponse).Definition
|
||||||
|
s.Unlock()
|
||||||
|
default:
|
||||||
|
s.agent.logger.Printf("[ERR] unknown update event type: %T", event)
|
||||||
|
}
|
||||||
|
|
||||||
|
service := s.mergeServiceConfig()
|
||||||
|
s.agent.logger.Printf("[INFO] updating service registration: %v, %v", service.ID, service.Meta)
|
||||||
|
/*err := s.agent.AddService(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source)
|
||||||
|
if err != nil {
|
||||||
|
s.agent.logger.Printf("[ERR] error updating service registration: %v", err)
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) startConfigWatch() error {
|
||||||
|
s.RLock()
|
||||||
|
name := s.registration.service.Service
|
||||||
|
s.RUnlock()
|
||||||
|
|
||||||
|
req := &structs.ServiceConfigRequest{
|
||||||
|
Name: name,
|
||||||
|
Datacenter: s.agent.config.Datacenter,
|
||||||
|
}
|
||||||
|
err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) {
|
||||||
|
s.updateCh <- cache.UpdateEvent{
|
||||||
|
Result: registration,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceConfigWatch) Stop() {
|
||||||
|
s.cancelFunc()
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// Construct the service config request. This will be re-used with an updated
|
||||||
|
// index to watch for changes in the effective service config.
|
||||||
|
req := structs.ServiceConfigRequest{
|
||||||
|
Name: s.registration.service.Service,
|
||||||
|
Datacenter: s.agent.config.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.agent.tokens.AgentToken()},
|
||||||
|
}
|
||||||
|
|
||||||
|
consul.RetryLoopBackoff(s.shutdownCh, func() error {
|
||||||
|
var reply structs.ServiceConfigResponse
|
||||||
|
if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &req, &reply); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.updateConfig(&reply.Definition)
|
||||||
|
|
||||||
|
req.QueryOptions.MinQueryIndex = reply.QueryMeta.Index
|
||||||
|
return nil
|
||||||
|
}, func(err error) {
|
||||||
|
s.agent.logger.Printf("[ERR] Error getting service config: %v", err)
|
||||||
|
})
|
||||||
|
*/
|
|
@ -2,10 +2,13 @@ package structs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
|
"github.com/mitchellh/hashstructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -265,6 +268,30 @@ func (s *ServiceConfigRequest) RequestDatacenter() string {
|
||||||
return s.Datacenter
|
return s.Datacenter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
|
||||||
|
info := cache.RequestInfo{
|
||||||
|
Token: r.Token,
|
||||||
|
Datacenter: r.Datacenter,
|
||||||
|
MinIndex: r.MinQueryIndex,
|
||||||
|
Timeout: r.MaxQueryTime,
|
||||||
|
MaxAge: r.MaxAge,
|
||||||
|
MustRevalidate: r.MustRevalidate,
|
||||||
|
}
|
||||||
|
|
||||||
|
// To calculate the cache key we only hash the service name. The
|
||||||
|
// datacenter is handled by the cache framework. The other fields are
|
||||||
|
// not, but should not be used in any cache types.
|
||||||
|
v, err := hashstructure.Hash(r.Name, nil)
|
||||||
|
if err == nil {
|
||||||
|
// If there is an error, we don't set the key. A blank key forces
|
||||||
|
// no cache for this request so the request is forwarded directly
|
||||||
|
// to the server.
|
||||||
|
info.Key = strconv.FormatUint(v, 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
|
||||||
type ServiceConfigResponse struct {
|
type ServiceConfigResponse struct {
|
||||||
Definition ServiceDefinition
|
Definition ServiceDefinition
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue