diff --git a/agent/agent.go b/agent/agent.go index 74c63c8933..2526f89ffd 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -53,7 +53,8 @@ import ( const ( // Path to save agent service definitions - servicesDir = "services" + servicesDir = "services" + serviceConfigDir = "services/configs" // Path to save agent proxy definitions proxyDir = "proxies" @@ -85,6 +86,28 @@ const ( ConfigSourceRemote ) +var configSourceToName = map[configSource]string{ + ConfigSourceLocal: "local", + ConfigSourceRemote: "remote", +} +var configSourceFromName = map[string]configSource{ + "local": ConfigSourceLocal, + "remote": ConfigSourceRemote, + // If the value is not found in the persisted config file, then use the + // former default. + "": ConfigSourceLocal, +} + +func (s configSource) String() string { + return configSourceToName[s] +} + +// ConfigSourceFromName will unmarshal the string form of a configSource. +func ConfigSourceFromName(name string) (configSource, bool) { + s, ok := configSourceFromName[name] + return s, ok +} + // delegate defines the interface shared by both // consul.Client and consul.Server. type delegate interface { @@ -419,6 +442,8 @@ func (a *Agent) Start() error { a.logger.Printf("[INFO] AutoEncrypt: upgraded to TLS") } + a.serviceManager.Start() + // Load checks/services/metadata. if err := a.loadServices(c); err != nil { return err @@ -1582,6 +1607,11 @@ func (a *Agent) ShutdownAgent() error { } a.logger.Println("[INFO] agent: Requesting shutdown") + // Stop the service manager (must happen before we take the stateLock to avoid deadlock) + if a.serviceManager != nil { + a.serviceManager.Stop() + } + // Stop all the checks a.stateLock.Lock() defer a.stateLock.Unlock() @@ -1898,7 +1928,7 @@ func (a *Agent) reapServicesInternal() { // this is so that we won't try to remove it again. if timeout > 0 && cs.CriticalFor() > timeout { reaped[serviceID] = true - if err := a.RemoveService(serviceID, true); err != nil { + if err := a.RemoveService(serviceID); err != nil { a.logger.Printf("[ERR] agent: unable to deregister service %q after check %q has been critical for too long: %s", serviceID, checkID, err) } else { @@ -1929,15 +1959,17 @@ func (a *Agent) reapServices() { type persistedService struct { Token string Service *structs.NodeService + Source string } // persistService saves a service definition to a JSON file in the data dir -func (a *Agent) persistService(service *structs.NodeService) error { +func (a *Agent) persistService(service *structs.NodeService, source configSource) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) wrapped := persistedService{ Token: a.State.ServiceToken(service.ID), Service: service, + Source: source.String(), } encoded, err := json.Marshal(wrapped) if err != nil { @@ -1957,7 +1989,7 @@ func (a *Agent) purgeService(serviceID string) error { } // persistCheck saves a check definition to the local agent's state directory -func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckType) error { +func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckType, source configSource) error { checkPath := filepath.Join(a.config.DataDir, checksDir, checkIDHash(check.CheckID)) // Create the persisted check @@ -1965,6 +1997,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT Check: check, ChkType: chkType, Token: a.State.CheckToken(check.CheckID), + Source: source.String(), } encoded, err := json.Marshal(wrapped) @@ -1984,13 +2017,105 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { return nil } +// persistedServiceConfig is used to serialize the resolved service config that +// feeds into the ServiceManager at registration time so that it may be +// restored later on. +type persistedServiceConfig struct { + ServiceID string + Defaults *structs.ServiceConfigResponse +} + +func (a *Agent) persistServiceConfig(serviceID string, defaults *structs.ServiceConfigResponse) error { + // Create the persisted config. + wrapped := persistedServiceConfig{ + ServiceID: serviceID, + Defaults: defaults, + } + + encoded, err := json.Marshal(wrapped) + if err != nil { + return err + } + + dir := filepath.Join(a.config.DataDir, serviceConfigDir) + configPath := filepath.Join(dir, stringHash(serviceID)) + + // Create the config dir if it doesn't exist + if err := os.MkdirAll(dir, 0700); err != nil { + return fmt.Errorf("failed creating service configs dir %q: %s", dir, err) + } + + return file.WriteAtomic(configPath, encoded) +} + +func (a *Agent) purgeServiceConfig(serviceID string) error { + configPath := filepath.Join(a.config.DataDir, serviceConfigDir, stringHash(serviceID)) + if _, err := os.Stat(configPath); err == nil { + return os.Remove(configPath) + } + return nil +} + +func (a *Agent) readPersistedServiceConfigs() (map[string]*structs.ServiceConfigResponse, error) { + out := make(map[string]*structs.ServiceConfigResponse) + + configDir := filepath.Join(a.config.DataDir, serviceConfigDir) + files, err := ioutil.ReadDir(configDir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("Failed reading service configs dir %q: %s", configDir, err) + } + + for _, fi := range files { + // Skip all dirs + if fi.IsDir() { + continue + } + + // Skip all partially written temporary files + if strings.HasSuffix(fi.Name(), "tmp") { + a.logger.Printf("[WARN] agent: Ignoring temporary service config file %v", fi.Name()) + continue + } + + // Read the contents into a buffer + file := filepath.Join(configDir, fi.Name()) + buf, err := ioutil.ReadFile(file) + if err != nil { + return nil, fmt.Errorf("failed reading service config file %q: %s", file, err) + } + + // Try decoding the service config definition + var p persistedServiceConfig + if err := json.Unmarshal(buf, &p); err != nil { + a.logger.Printf("[ERR] agent: Failed decoding service config file %q: %s", file, err) + continue + } + out[p.ServiceID] = p.Defaults + } + + return out, nil +} + // AddServiceAndReplaceChecks is used to add a service entry and its check. Any check for this service missing from chkTypes will be deleted. // This entry is persistent and the agent will make a best effort to // ensure it is registered func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { a.stateLock.Lock() defer a.stateLock.Unlock() - return a.addServiceLocked(service, chkTypes, persist, token, true, source) + return a.addServiceLocked(&addServiceRequest{ + service: service, + chkTypes: chkTypes, + previousDefaults: nil, + waitForCentralConfig: true, + persist: persist, + persistServiceConfig: true, + token: token, + replaceExistingChecks: true, + source: source, + }) } // AddService is used to add a service entry. @@ -1999,25 +2124,89 @@ func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkType func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { a.stateLock.Lock() defer a.stateLock.Unlock() - return a.addServiceLocked(service, chkTypes, persist, token, false, source) + return a.addServiceLocked(&addServiceRequest{ + service: service, + chkTypes: chkTypes, + previousDefaults: nil, + waitForCentralConfig: true, + persist: persist, + persistServiceConfig: true, + token: token, + replaceExistingChecks: false, + source: source, + }) } // addServiceLocked adds a service entry to the service manager if enabled, or directly // to the local state if it is not. This function assumes the state lock is already held. -func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error { - if err := a.validateService(service, chkTypes); err != nil { +func (a *Agent) addServiceLocked(req *addServiceRequest) error { + req.fixupForAddServiceLocked() + + if err := a.validateService(req.service, req.chkTypes); err != nil { return err } if a.config.EnableCentralServiceConfig { - return a.serviceManager.AddService(service, chkTypes, persist, token, source) + return a.serviceManager.AddService(req) } - return a.addServiceInternal(service, chkTypes, persist, token, replaceExistingChecks, source) + // previousDefaults are ignored here because they are only relevant for central config. + req.persistService = nil + req.persistDefaults = nil + req.persistServiceConfig = false + + return a.addServiceInternal(req) +} + +// addServiceRequest is the union of arguments for calling both +// addServiceLocked and addServiceInternal. The overlap was significant enough +// to warrant merging them and indicating which fields are meant to be set only +// in one of the two contexts. +// +// Before using the request struct one of the fixupFor*() methods should be +// invoked to clear irrelevant fields. +// +// The ServiceManager.AddService signature is largely just a passthrough for +// addServiceLocked and should be treated as such. +type addServiceRequest struct { + service *structs.NodeService + chkTypes []*structs.CheckType + previousDefaults *structs.ServiceConfigResponse // just for: addServiceLocked + waitForCentralConfig bool // just for: addServiceLocked + persistService *structs.NodeService // just for: addServiceInternal + persistDefaults *structs.ServiceConfigResponse // just for: addServiceInternal + persist bool + persistServiceConfig bool + token string + replaceExistingChecks bool + source configSource +} + +func (r *addServiceRequest) fixupForAddServiceLocked() { + r.persistService = nil + r.persistDefaults = nil +} + +func (r *addServiceRequest) fixupForAddServiceInternal() { + r.previousDefaults = nil + r.waitForCentralConfig = false } // addServiceInternal adds the given service and checks to the local state. -func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error { +func (a *Agent) addServiceInternal(req *addServiceRequest) error { + req.fixupForAddServiceInternal() + var ( + service = req.service + chkTypes = req.chkTypes + persistService = req.persistService + persistDefaults = req.persistDefaults + persist = req.persist + persistServiceConfig = req.persistServiceConfig + token = req.token + replaceExistingChecks = req.replaceExistingChecks + source = req.source + ) + // Pause the service syncs during modification a.PauseSync() defer a.ResumeSync() @@ -2104,7 +2293,7 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str } if persist && a.config.DataDir != "" { - if err := a.persistCheck(checks[i], chkTypes[i]); err != nil { + if err := a.persistCheck(checks[i], chkTypes[i], source); err != nil { a.cleanupRegistration(cleanupServices, cleanupChecks) return err @@ -2112,9 +2301,27 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str } } + if persistServiceConfig && a.config.DataDir != "" { + var err error + if persistDefaults != nil { + err = a.persistServiceConfig(service.ID, persistDefaults) + } else { + err = a.purgeServiceConfig(service.ID) + } + + if err != nil { + a.cleanupRegistration(cleanupServices, cleanupChecks) + return err + } + } + // Persist the service to a file if persist && a.config.DataDir != "" { - if err := a.persistService(service); err != nil { + if persistService == nil { + persistService = service + } + + if err := a.persistService(persistService, source); err != nil { a.cleanupRegistration(cleanupServices, cleanupChecks) return err } @@ -2189,6 +2396,9 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check if err := a.purgeService(s); err != nil { a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service %s file: %s", s, err) } + if err := a.purgeServiceConfig(s); err != nil { + a.logger.Printf("[ERR] consul: service registration: cleanup: failed to purge service config %s file: %s", s, err) + } } for _, c := range checksIDs { @@ -2204,7 +2414,11 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check // RemoveService is used to remove a service entry. // The agent will make a best effort to ensure it is deregistered -func (a *Agent) RemoveService(serviceID string, persist bool) error { +func (a *Agent) RemoveService(serviceID string) error { + return a.removeService(serviceID, true) +} + +func (a *Agent) removeService(serviceID string, persist bool) error { a.stateLock.Lock() defer a.stateLock.Unlock() return a.removeServiceLocked(serviceID, persist) @@ -2243,6 +2457,9 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error { if err := a.purgeService(serviceID); err != nil { return err } + if err := a.purgeServiceConfig(serviceID); err != nil { + return err + } } // Deregister any associated health checks @@ -2315,7 +2532,7 @@ func (a *Agent) addCheckLocked(check *structs.HealthCheck, chkType *structs.Chec // Persist the check if persist && a.config.DataDir != "" { - return a.persistCheck(check, chkType) + return a.persistCheck(check, chkType, source) } return nil @@ -2853,6 +3070,13 @@ func (a *Agent) deletePid() error { // loadServices will load service definitions from configuration and persisted // definitions on disk, and load them into the local agent. func (a *Agent) loadServices(conf *config.RuntimeConfig) error { + // Load any persisted service configs so we can feed those into the initial + // registrations below. + persistedServiceConfigs, err := a.readPersistedServiceConfigs() + if err != nil { + return err + } + // Register the services from config for _, service := range conf.Services { ns := service.NodeService() @@ -2871,13 +3095,37 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { // syntax sugar and shouldn't be persisted in local or server state. ns.Connect.SidecarService = nil - if err := a.addServiceLocked(ns, chkTypes, false, service.Token, false, ConfigSourceLocal); err != nil { + serviceID := defaultIfEmpty(ns.ID, ns.Service) + err = a.addServiceLocked(&addServiceRequest{ + service: ns, + chkTypes: chkTypes, + previousDefaults: persistedServiceConfigs[serviceID], + waitForCentralConfig: false, // exclusively use cached values + persist: false, // don't rewrite the file with the same data we just read + persistServiceConfig: false, // don't rewrite the file with the same data we just read + token: service.Token, + replaceExistingChecks: false, // do default behavior + source: ConfigSourceLocal, + }) + if err != nil { return fmt.Errorf("Failed to register service %q: %v", service.Name, err) } // If there is a sidecar service, register that too. if sidecar != nil { - if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, false, ConfigSourceLocal); err != nil { + sidecarServiceID := defaultIfEmpty(sidecar.ID, sidecar.Service) + err = a.addServiceLocked(&addServiceRequest{ + service: sidecar, + chkTypes: sidecarChecks, + previousDefaults: persistedServiceConfigs[sidecarServiceID], + waitForCentralConfig: false, // exclusively use cached values + persist: false, // don't rewrite the file with the same data we just read + persistServiceConfig: false, // don't rewrite the file with the same data we just read + token: sidecarToken, + replaceExistingChecks: false, // do default behavior + source: ConfigSourceLocal, + }) + if err != nil { return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err) } } @@ -2904,16 +3152,9 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { continue } - // Open the file for reading - file := filepath.Join(svcDir, fi.Name()) - fh, err := os.Open(file) - if err != nil { - return fmt.Errorf("failed opening service file %q: %s", file, err) - } - // Read the contents into a buffer - buf, err := ioutil.ReadAll(fh) - fh.Close() + file := filepath.Join(svcDir, fi.Name()) + buf, err := ioutil.ReadFile(file) if err != nil { return fmt.Errorf("failed reading service file %q: %s", file, err) } @@ -2929,6 +3170,18 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { } serviceID := p.Service.ID + source, ok := ConfigSourceFromName(p.Source) + if !ok { + a.logger.Printf("[WARN] agent: service %q exists with invalid source %q, purging", serviceID, p.Source) + if err := a.purgeService(serviceID); err != nil { + return fmt.Errorf("failed purging service %q: %s", serviceID, err) + } + if err := a.purgeServiceConfig(serviceID); err != nil { + return fmt.Errorf("failed purging service config %q: %s", serviceID, err) + } + continue + } + if a.State.Service(serviceID) != nil { // Purge previously persisted service. This allows config to be // preferred over services persisted from the API. @@ -2937,15 +3190,39 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { if err := a.purgeService(serviceID); err != nil { return fmt.Errorf("failed purging service %q: %s", serviceID, err) } + if err := a.purgeServiceConfig(serviceID); err != nil { + return fmt.Errorf("failed purging service config %q: %s", serviceID, err) + } } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", serviceID, file) - if err := a.addServiceLocked(p.Service, nil, false, p.Token, false, ConfigSourceLocal); err != nil { + err = a.addServiceLocked(&addServiceRequest{ + service: p.Service, + chkTypes: nil, + previousDefaults: persistedServiceConfigs[serviceID], + waitForCentralConfig: false, // exclusively use cached values + persist: false, // don't rewrite the file with the same data we just read + persistServiceConfig: false, // don't rewrite the file with the same data we just read + token: p.Token, + replaceExistingChecks: false, // do default behavior + source: source, + }, + ) + if err != nil { return fmt.Errorf("failed adding service %q: %s", serviceID, err) } } } + for serviceID, _ := range persistedServiceConfigs { + if a.State.Service(serviceID) == nil { + // This can be cleaned up now. + if err := a.purgeServiceConfig(serviceID); err != nil { + return fmt.Errorf("failed purging service config %q: %s", serviceID, err) + } + } + } + return nil } @@ -2993,16 +3270,9 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[types.CheckID]*s continue } - // Open the file for reading - file := filepath.Join(checkDir, fi.Name()) - fh, err := os.Open(file) - if err != nil { - return fmt.Errorf("Failed opening check file %q: %s", file, err) - } - // Read the contents into a buffer - buf, err := ioutil.ReadAll(fh) - fh.Close() + file := filepath.Join(checkDir, fi.Name()) + buf, err := ioutil.ReadFile(file) if err != nil { return fmt.Errorf("failed reading check file %q: %s", file, err) } @@ -3015,6 +3285,15 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[types.CheckID]*s } checkID := p.Check.CheckID + source, ok := ConfigSourceFromName(p.Source) + if !ok { + a.logger.Printf("[WARN] agent: check %q exists with invalid source %q, purging", checkID, p.Source) + if err := a.purgeCheck(checkID); err != nil { + return fmt.Errorf("failed purging check %q: %s", checkID, err) + } + continue + } + if a.State.Check(checkID) != nil { // Purge previously persisted check. This allows config to be // preferred over persisted checks from the API. @@ -3034,7 +3313,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig, snap map[types.CheckID]*s p.Check.Status = prev.Status } - if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, ConfigSourceLocal); err != nil { + if err := a.addCheckLocked(p.Check, p.ChkType, false, p.Token, source); err != nil { // Purge the check if it is unable to be restored. a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", checkID, err) @@ -3466,3 +3745,11 @@ func (a *Agent) registerCache() { RefreshTimeout: 10 * time.Minute, }) } + +// defaultIfEmpty returns the value if not empty otherwise the default value. +func defaultIfEmpty(val, defaultVal string) string { + if val != "" { + return val + } + return defaultVal +} diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 6653704025..678dc4cab2 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -933,8 +933,14 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re } // Add sidecar. if sidecar != nil { - if err := s.agent.AddService(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil { - return nil, err + if replaceExistingChecks { + if err := s.agent.AddServiceAndReplaceChecks(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil { + return nil, err + } + } else { + if err := s.agent.AddService(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil { + return nil, err + } } } s.syncChanges() @@ -951,7 +957,7 @@ func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http. return nil, err } - if err := s.agent.RemoveService(serviceID, true); err != nil { + if err := s.agent.RemoveService(serviceID); err != nil { return nil, err } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index cb8ceec536..bbdc689574 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -2349,8 +2349,20 @@ func TestAgent_UpdateCheck_ACLDeny(t *testing.T) { } func TestAgent_RegisterService(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -2416,8 +2428,20 @@ func TestAgent_RegisterService(t *testing.T) { } func TestAgent_RegisterService_ReRegister(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ReRegister(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ReRegister(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_ReRegister(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -2481,8 +2505,19 @@ func TestAgent_RegisterService_ReRegister(t *testing.T) { } func TestAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ReRegister_ReplaceExistingChecks(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ReRegister_ReplaceExistingChecks(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -2546,7 +2581,19 @@ func TestAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T) { } func TestAgent_RegisterService_TranslateKeys(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ACLDeny(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ACLDeny(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_TranslateKeys(t *testing.T, extraHCL string) { + t.Helper() + tests := []struct { ip string expectedTCPCheckStart string @@ -2558,7 +2605,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { t.Run(tt.ip, func(t *testing.T) { a := NewTestAgent(t, t.Name(), ` connect {} -`) +`+extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -2750,8 +2797,20 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { } func TestAgent_RegisterService_ACLDeny(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), TestACLConfig()) + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ACLDeny(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ACLDeny(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_ACLDeny(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), TestACLConfig()+" "+extraHCL) defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -2788,8 +2847,20 @@ func TestAgent_RegisterService_ACLDeny(t *testing.T) { } func TestAgent_RegisterService_InvalidAddress(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_UnmanagedConnectProxy(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_InvalidAddress(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_InvalidAddress(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -2820,10 +2891,21 @@ func TestAgent_RegisterService_InvalidAddress(t *testing.T) { // This verifies that it is put in the local state store properly for syncing // later. func TestAgent_RegisterService_UnmanagedConnectProxy(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_UnmanagedConnectProxy(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_UnmanagedConnectProxy(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_UnmanagedConnectProxy(t *testing.T, extraHCL string) { + t.Helper() assert := assert.New(t) - a := NewTestAgent(t, t.Name(), "") + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -2941,7 +3023,18 @@ func testCreatePolicy(t *testing.T, a *TestAgent, name, rules string) string { // TestAgent_sidecarServiceFromNodeService. Note it also tests Deregister // explicitly too since setup is identical. func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterServiceDeregisterService_Sidecar(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterServiceDeregisterService_Sidecar(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T, extraHCL string) { + t.Helper() tests := []struct { name string @@ -3324,7 +3417,7 @@ func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) { hcl = hcl + TestACLConfig() } - a := NewTestAgent(t, t.Name(), hcl) + a := NewTestAgent(t, t.Name(), hcl+" "+extraHCL) defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -3422,10 +3515,21 @@ func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) { // registration. This doesn't need to test validation exhaustively since // that is done via a table test in the structs package. func TestAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_UnmanagedConnectProxyInvalid(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_UnmanagedConnectProxyInvalid(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T, extraHCL string) { + t.Helper() assert := assert.New(t) - a := NewTestAgent(t, t.Name(), "") + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -3455,10 +3559,21 @@ func TestAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T) { // Tests agent registration of a service that is connect native. func TestAgent_RegisterService_ConnectNative(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ConnectNative(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ConnectNative(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_ConnectNative(t *testing.T, extraHCL string) { + t.Helper() assert := assert.New(t) - a := NewTestAgent(t, t.Name(), "") + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -3489,8 +3604,20 @@ func TestAgent_RegisterService_ConnectNative(t *testing.T) { } func TestAgent_RegisterService_ScriptCheck_ExecDisable(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ScriptCheck_ExecDisable(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ScriptCheck_ExecDisable(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_ScriptCheck_ExecDisable(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -3525,10 +3652,22 @@ func TestAgent_RegisterService_ScriptCheck_ExecDisable(t *testing.T) { } func TestAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t, "enable_central_service_config = true") + }) +} + +func testAgent_RegisterService_ScriptCheck_ExecRemoteDisable(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` enable_local_script_checks = true - `) + `+extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") diff --git a/agent/agent_test.go b/agent/agent_test.go index 887ba9b5f1..8be15fa854 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -332,10 +332,22 @@ func TestAgent_makeNodeID(t *testing.T) { } func TestAgent_AddService(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_AddService(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_AddService(t, "enable_central_service_config = true") + }) +} + +func testAgent_AddService(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` node_name = "node1" - `) + `+extraHCL) defer a.Shutdown() tests := []struct { @@ -504,10 +516,22 @@ func TestAgent_AddService(t *testing.T) { } func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_AddServices_AliasUpdateCheckNotReverted(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_AddServices_AliasUpdateCheckNotReverted(t, "enable_central_service_config = true") + }) +} + +func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` node_name = "node1" - `) + `+extraHCL) defer a.Shutdown() // It's tricky to get an UpdateCheck call to be timed properly so it lands @@ -560,10 +584,22 @@ func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) { } func TestAgent_AddServiceNoExec(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_AddServiceNoExec(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_AddServiceNoExec(t, "enable_central_service_config = true") + }) +} + +func testAgent_AddServiceNoExec(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` node_name = "node1" - `) + `+extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -590,11 +626,23 @@ func TestAgent_AddServiceNoExec(t *testing.T) { } func TestAgent_AddServiceNoRemoteExec(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_AddServiceNoRemoteExec(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_AddServiceNoRemoteExec(t, "enable_central_service_config = true") + }) +} + +func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` node_name = "node1" enable_local_script_checks = true - `) + `+extraHCL) defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -616,17 +664,29 @@ func TestAgent_AddServiceNoRemoteExec(t *testing.T) { } func TestAgent_RemoveService(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RemoveService(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RemoveService(t, "enable_central_service_config = true") + }) +} + +func testAgent_RemoveService(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() // Remove a service that doesn't exist - if err := a.RemoveService("redis", false); err != nil { + if err := a.RemoveService("redis"); err != nil { t.Fatalf("err: %v", err) } // Remove without an ID - if err := a.RemoveService("", false); err == nil { + if err := a.RemoveService(""); err == nil { t.Fatalf("should have errored") } @@ -655,7 +715,7 @@ func TestAgent_RemoveService(t *testing.T) { t.Fatalf("err: %s", err) } - if err := a.RemoveService("memcache", false); err != nil { + if err := a.RemoveService("memcache"); err != nil { t.Fatalf("err: %s", err) } if _, ok := a.State.Checks()["service:memcache"]; ok { @@ -697,7 +757,7 @@ func TestAgent_RemoveService(t *testing.T) { } // Remove the service - if err := a.RemoveService("redis", false); err != nil { + if err := a.RemoveService("redis"); err != nil { t.Fatalf("err: %v", err) } @@ -745,10 +805,22 @@ func TestAgent_RemoveService(t *testing.T) { } func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_RemoveServiceRemovesAllChecks(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_RemoveServiceRemovesAllChecks(t, "enable_central_service_config = true") + }) +} + +func testAgent_RemoveServiceRemovesAllChecks(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` node_name = "node1" - `) + `+extraHCL) defer a.Shutdown() svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} @@ -781,7 +853,7 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // Remove service - if err := a.RemoveService("redis", false); err != nil { + if err := a.RemoveService("redis"); err != nil { t.Fatal("Failed to remove service", err) } @@ -1688,15 +1760,28 @@ func TestAgent_updateTTLCheck(t *testing.T) { } func TestAgent_PersistService(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_PersistService(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_PersistService(t, "enable_central_service_config = true") + }) +} + +func testAgent_PersistService(t *testing.T, extraHCL string) { + t.Helper() + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + defer os.RemoveAll(dataDir) + cfg := ` server = false bootstrap = false data_dir = "` + dataDir + `" - ` + ` + extraHCL a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir}) - defer os.RemoveAll(dataDir) defer a.Shutdown() svc := &structs.NodeService{ @@ -1726,6 +1811,7 @@ func TestAgent_PersistService(t *testing.T) { expected, err := json.Marshal(persistedService{ Token: "mytoken", Service: svc, + Source: "local", }) if err != nil { t.Fatalf("err: %s", err) @@ -1746,6 +1832,7 @@ func TestAgent_PersistService(t *testing.T) { expected, err = json.Marshal(persistedService{ Token: "mytoken", Service: svc, + Source: "local", }) if err != nil { t.Fatalf("err: %s", err) @@ -1776,9 +1863,21 @@ func TestAgent_PersistService(t *testing.T) { } func TestAgent_persistedService_compat(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_persistedService_compat(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_persistedService_compat(t, "enable_central_service_config = true") + }) +} + +func testAgent_persistedService_compat(t *testing.T, extraHCL string) { + t.Helper() + // Tests backwards compatibility of persisted services from pre-0.5.1 - a := NewTestAgent(t, t.Name(), "") + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() svc := &structs.NodeService{ @@ -1820,8 +1919,20 @@ func TestAgent_persistedService_compat(t *testing.T) { } func TestAgent_PurgeService(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_PurgeService(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_PurgeService(t, "enable_central_service_config = true") + }) +} + +func testAgent_PurgeService(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() svc := &structs.NodeService{ @@ -1835,9 +1946,13 @@ func TestAgent_PurgeService(t *testing.T) { if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil { t.Fatalf("err: %v", err) } + // Exists + if _, err := os.Stat(file); err != nil { + t.Fatalf("err: %s", err) + } // Not removed - if err := a.RemoveService(svc.ID, false); err != nil { + if err := a.removeService(svc.ID, false); err != nil { t.Fatalf("err: %s", err) } if _, err := os.Stat(file); err != nil { @@ -1850,7 +1965,7 @@ func TestAgent_PurgeService(t *testing.T) { } // Removed - if err := a.RemoveService(svc.ID, true); err != nil { + if err := a.removeService(svc.ID, true); err != nil { t.Fatalf("err: %s", err) } if _, err := os.Stat(file); !os.IsNotExist(err) { @@ -1859,16 +1974,29 @@ func TestAgent_PurgeService(t *testing.T) { } func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_PurgeServiceOnDuplicate(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_PurgeServiceOnDuplicate(t, "enable_central_service_config = true") + }) +} + +func testAgent_PurgeServiceOnDuplicate(t *testing.T, extraHCL string) { + t.Helper() + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + defer os.RemoveAll(dataDir) + cfg := ` data_dir = "` + dataDir + `" server = false bootstrap = false - ` + ` + extraHCL a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir}) defer a.Shutdown() - defer os.RemoveAll(dataDir) svc1 := &structs.NodeService{ ID: "redis", @@ -1953,6 +2081,7 @@ func TestAgent_PersistCheck(t *testing.T) { Check: check, ChkType: chkType, Token: "mytoken", + Source: "local", }) if err != nil { t.Fatalf("err: %s", err) @@ -1974,6 +2103,7 @@ func TestAgent_PersistCheck(t *testing.T) { Check: check, ChkType: chkType, Token: "mytoken", + Source: "local", }) if err != nil { t.Fatalf("err: %s", err) @@ -2185,7 +2315,19 @@ func TestAgent_unloadChecks(t *testing.T) { } func TestAgent_loadServices_token(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_token(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_token(t, "enable_central_service_config = true") + }) +} + +func testAgent_loadServices_token(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` service = { id = "rabbitmq" @@ -2193,7 +2335,7 @@ func TestAgent_loadServices_token(t *testing.T) { port = 5672 token = "abc123" } - `) + `+extraHCL) defer a.Shutdown() services := a.State.Services() @@ -2206,7 +2348,19 @@ func TestAgent_loadServices_token(t *testing.T) { } func TestAgent_loadServices_sidecar(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecar(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecar(t, "enable_central_service_config = true") + }) +} + +func testAgent_loadServices_sidecar(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` service = { id = "rabbitmq" @@ -2217,7 +2371,7 @@ func TestAgent_loadServices_sidecar(t *testing.T) { sidecar_service {} } } - `) + `+extraHCL) defer a.Shutdown() services := a.State.Services() @@ -2240,7 +2394,19 @@ func TestAgent_loadServices_sidecar(t *testing.T) { } func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecarSeparateToken(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecarSeparateToken(t, "enable_central_service_config = true") + }) +} + +func testAgent_loadServices_sidecarSeparateToken(t *testing.T, extraHCL string) { + t.Helper() + a := NewTestAgent(t, t.Name(), ` service = { id = "rabbitmq" @@ -2253,7 +2419,7 @@ func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) { } } } - `) + `+extraHCL) defer a.Shutdown() services := a.State.Services() @@ -2272,7 +2438,18 @@ func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) { } func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecarInheritMeta(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecarInheritMeta(t, "enable_central_service_config = true") + }) +} + +func testAgent_loadServices_sidecarInheritMeta(t *testing.T, extraHCL string) { + t.Helper() a := NewTestAgent(t, t.Name(), ` service = { @@ -2289,7 +2466,7 @@ func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) { } } } - `) + `+extraHCL) defer a.Shutdown() services := a.State.Services() @@ -2309,7 +2486,18 @@ func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) { } func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) { - t.Parallel() + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecarOverrideMeta(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_loadServices_sidecarOverrideMeta(t, "enable_central_service_config = true") + }) +} + +func testAgent_loadServices_sidecarOverrideMeta(t *testing.T, extraHCL string) { + t.Helper() a := NewTestAgent(t, t.Name(), ` service = { @@ -2329,7 +2517,7 @@ func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) { } } } - `) + `+extraHCL) defer a.Shutdown() services := a.State.Services() @@ -2350,8 +2538,20 @@ func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) { } func TestAgent_unloadServices(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_unloadServices(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_unloadServices(t, "enable_central_service_config = true") + }) +} + +func testAgent_unloadServices(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() svc := &structs.NodeService{ @@ -2578,8 +2778,20 @@ func TestAgent_Service_NoReap(t *testing.T) { } func TestAgent_AddService_restoresSnapshot(t *testing.T) { - t.Parallel() - a := NewTestAgent(t, t.Name(), "") + t.Run("normal", func(t *testing.T) { + t.Parallel() + testAgent_AddService_restoresSnapshot(t, "") + }) + t.Run("service manager", func(t *testing.T) { + t.Parallel() + testAgent_AddService_restoresSnapshot(t, "enable_central_service_config = true") + }) +} + +func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) { + t.Helper() + + a := NewTestAgent(t, t.Name(), extraHCL) defer a.Shutdown() // First register a service @@ -2775,7 +2987,7 @@ func TestAgent_loadChecks_checkFails(t *testing.T) { Status: api.HealthPassing, ServiceID: "nope", } - if err := a.persistCheck(check, nil); err != nil { + if err := a.persistCheck(check, nil, ConfigSourceLocal); err != nil { t.Fatalf("err: %s", err) } @@ -3325,3 +3537,40 @@ func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) { defer a.Shutdown() require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs) } + +func TestDefaultIfEmpty(t *testing.T) { + require.Equal(t, "", defaultIfEmpty("", "")) + require.Equal(t, "foo", defaultIfEmpty("", "foo")) + require.Equal(t, "bar", defaultIfEmpty("bar", "foo")) + require.Equal(t, "bar", defaultIfEmpty("bar", "")) +} + +func TestConfigSourceFromName(t *testing.T) { + cases := []struct { + in string + expect configSource + bad bool + }{ + {in: "local", expect: ConfigSourceLocal}, + {in: "remote", expect: ConfigSourceRemote}, + {in: "", expect: ConfigSourceLocal}, + {in: "LOCAL", bad: true}, + {in: "REMOTE", bad: true}, + {in: "garbage", bad: true}, + {in: " ", bad: true}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.in, func(t *testing.T) { + got, ok := ConfigSourceFromName(tc.in) + if tc.bad { + require.False(t, ok) + require.Empty(t, got) + } else { + require.True(t, ok) + require.Equal(t, tc.expect, got) + } + }) + } +} diff --git a/agent/check.go b/agent/check.go index 0a3ce6b48d..05257fe061 100644 --- a/agent/check.go +++ b/agent/check.go @@ -11,6 +11,7 @@ type persistedCheck struct { Check *structs.HealthCheck ChkType *structs.CheckType Token string + Source string } // persistedCheckState is used to persist the current state of a given diff --git a/agent/service_manager.go b/agent/service_manager.go index 001d125da6..52319f9eeb 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -18,100 +18,197 @@ import ( // configuration that applies to the service in order to register the final, merged // service configuration locally in the agent state. type ServiceManager struct { - services map[string]*serviceConfigWatch - agent *Agent + agent *Agent - lock sync.Mutex + // servicesLock guards the services map, but not the watches contained + // therein + servicesLock sync.Mutex + + // services tracks all active watches for registered services + services map[string]*serviceConfigWatch + + // registerCh is a channel for processing service registrations in the + // background when watches are notified of changes. All sends and receives + // must also obey the ctx.Done() channel to avoid a deadlock during + // shutdown. + registerCh chan *asyncRegisterRequest + + // ctx is the shared context for all goroutines launched + ctx context.Context + + // cancel can be used to stop all goroutines launched + cancel context.CancelFunc + + // running keeps track of live goroutines (worker and watcher) + running sync.WaitGroup } func NewServiceManager(agent *Agent) *ServiceManager { + ctx, cancel := context.WithCancel(context.Background()) return &ServiceManager{ - services: make(map[string]*serviceConfigWatch), - agent: agent, + agent: agent, + services: make(map[string]*serviceConfigWatch), + registerCh: make(chan *asyncRegisterRequest), // must be unbuffered + ctx: ctx, + cancel: cancel, } } -// AddService starts a new serviceConfigWatch if the service has not been registered, and -// updates the existing registration if it has. For a new service, a call will also be made -// to fetch the merged global defaults that apply to the service in order to compose the -// initial registration. -func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { +// Stop forces all background goroutines to terminate and blocks until they complete. +// +// NOTE: the caller must NOT hold the Agent.stateLock! +func (s *ServiceManager) Stop() { + s.cancel() + s.running.Wait() +} + +// Start starts a background worker goroutine that writes back into the Agent +// state. This only exists to keep the need to lock the agent state lock out of +// the main AddService/RemoveService codepaths to avoid deadlocks. +func (s *ServiceManager) Start() { + s.running.Add(1) + + go func() { + defer s.running.Done() + for { + select { + case <-s.ctx.Done(): + return + case req := <-s.registerCh: + req.Reply <- s.registerOnce(req.Args) + } + } + }() +} + +// runOnce will process a single registration request +func (s *ServiceManager) registerOnce(args *addServiceRequest) error { + s.agent.stateLock.Lock() + defer s.agent.stateLock.Unlock() + + err := s.agent.addServiceInternal(args) + if err != nil { + return fmt.Errorf("error updating service registration: %v", err) + } + return nil +} + +// AddService will (re)create a serviceConfigWatch on the given service. For +// each call of this function the first registration will happen inline and +// will read the merged global defaults for the service through the agent cache +// (regardless of whether or not the service was already registered). This +// lets validation or authorization related errors bubble back up to the +// caller's RPC inline with their request. Upon success a goroutine will keep +// this updated in the background. +// +// If waitForCentralConfig=true is used, the initial registration blocks on +// fetching the merged global config through the cache. If false, no such RPC +// occurs and only the previousDefaults are used. +// +// persistServiceConfig controls if the INITIAL registration will result in +// persisting the service config to disk again. All background updates will +// always persist. +// +// service, chkTypes, persist, token, replaceExistingChecks, and source are +// basically pass-through arguments to Agent.addServiceInternal that follow the +// semantics there. The one key difference is that the service provided will be +// merged with the global defaults before registration. +// +// NOTE: the caller must hold the Agent.stateLock! +func (s *ServiceManager) AddService(req *addServiceRequest) error { + req.fixupForAddServiceLocked() + // For now only sidecar proxies have anything that can be configured // centrally. So bypass the whole manager for regular services. - if !service.IsSidecarProxy() && !service.IsMeshGateway() { - return s.agent.addServiceInternal(service, chkTypes, persist, token, false, source) + if !req.service.IsSidecarProxy() && !req.service.IsMeshGateway() { + // previousDefaults are ignored here because they are only relevant for central config. + req.persistService = nil + req.persistDefaults = nil + req.persistServiceConfig = false + return s.agent.addServiceInternal(req) } - s.lock.Lock() - defer s.lock.Unlock() + var ( + service = req.service + chkTypes = req.chkTypes + previousDefaults = req.previousDefaults + waitForCentralConfig = req.waitForCentralConfig + persist = req.persist + persistServiceConfig = req.persistServiceConfig + token = req.token + replaceExistingChecks = req.replaceExistingChecks + source = req.source + ) - reg := serviceRegistration{ - service: service, - chkTypes: chkTypes, - persist: persist, - token: token, - source: source, + reg := &serviceRegistration{ + service: service, + chkTypes: chkTypes, + persist: persist, + token: token, + replaceExistingChecks: replaceExistingChecks, + source: source, } - // If a service watch already exists, update the registration. Otherwise, - // start a new config watcher. - watch, ok := s.services[service.ID] - if ok { - if err := watch.updateRegistration(®); err != nil { - return err - } + s.servicesLock.Lock() + defer s.servicesLock.Unlock() + + // If a service watch already exists, shut it down and replace it. + oldWatch, updating := s.services[service.ID] + if updating { + oldWatch.Stop() + delete(s.services, service.ID) + } + + // Get the existing global config and do the initial registration with the + // merged config. + watch := &serviceConfigWatch{ + registration: reg, + updateCh: make(chan cache.UpdateEvent, 1), + agent: s.agent, + registerCh: s.registerCh, + } + + err := watch.RegisterAndStart( + previousDefaults, + waitForCentralConfig, + persistServiceConfig, + s.ctx, + &s.running, + ) + if err != nil { + return err + } + + s.services[service.ID] = watch + + if updating { s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID) } else { - // This is a new entry, so get the existing global config and do the initial - // registration with the merged config. - watch := &serviceConfigWatch{ - registration: ®, - readyCh: make(chan error), - updateCh: make(chan cache.UpdateEvent, 1), - agent: s.agent, - } - - // Start the config watch, which starts a blocking query for the resolved service config - // in the background. - if err := watch.Start(); err != nil { - return err - } - - // Call ReadyWait to block until the cache has returned the initial config and the service - // has been registered. - if err := watch.ReadyWait(); err != nil { - watch.Stop() - return err - } - - s.services[service.ID] = watch - s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID) } return nil } +// NOTE: the caller must hold the Agent.stateLock! func (s *ServiceManager) RemoveService(serviceID string) { - s.lock.Lock() - defer s.lock.Unlock() + s.servicesLock.Lock() + defer s.servicesLock.Unlock() - serviceWatch, ok := s.services[serviceID] - if !ok { - return + if oldWatch, exists := s.services[serviceID]; exists { + oldWatch.Stop() + delete(s.services, serviceID) } - - serviceWatch.Stop() - delete(s.services, serviceID) } // serviceRegistration represents a locally registered service. type serviceRegistration struct { - service *structs.NodeService - chkTypes []*structs.CheckType - persist bool - token string - source configSource + service *structs.NodeService + chkTypes []*structs.CheckType + persist bool + token string + replaceExistingChecks bool + source configSource } // serviceConfigWatch is a long running helper for composing the end config @@ -121,160 +218,235 @@ type serviceConfigWatch struct { registration *serviceRegistration defaults *structs.ServiceConfigResponse - agent *Agent - - // readyCh is used for ReadyWait in order to block until the first update - // for the resolved service config is received from the cache. - readyCh chan error - - // ctx and cancelFunc store the overall context that lives as long as the - // Watch instance is needed, possibly spanning multiple cache.Notify - // lifetimes. - ctx context.Context - cancelFunc func() + agent *Agent + registerCh chan<- *asyncRegisterRequest // cacheKey stores the key of the current request, when registration changes // we check to see if a new cache watch is needed. cacheKey string - // updateCh receives changes from cache watchers or registration changes. + // updateCh receives changes from cache watchers updateCh chan cache.UpdateEvent - // notifyCancel, if non-nil it the cancel func that will stop the currently - // active Notify loop. It does not cancel ctx and is used when we need to - // switch to a new Notify call because cache key changed. - notifyCancel func() - - lock sync.Mutex + ctx context.Context + cancelFunc func() + running sync.WaitGroup } -// Start starts the config watch and a goroutine to handle updates over -// the updateCh. This is not safe to call more than once. -func (s *serviceConfigWatch) Start() error { - s.ctx, s.cancelFunc = context.WithCancel(context.Background()) - if err := s.ensureConfigWatch(); err != nil { - return err - } - go s.runWatch() +// NOTE: this is called while holding the Agent.stateLock +func (w *serviceConfigWatch) RegisterAndStart( + previousDefaults *structs.ServiceConfigResponse, + waitForCentralConfig bool, + persistServiceConfig bool, + ctx context.Context, + wg *sync.WaitGroup, +) error { + service := w.registration.service - return nil -} - -func (s *serviceConfigWatch) Stop() { - s.cancelFunc() -} - -// ReadyWait blocks until the readyCh is closed, which means the initial -// registration of the service has been completed. If there was an error -// with the initial registration, it will be returned. -func (s *serviceConfigWatch) ReadyWait() error { - err := <-s.readyCh - return err -} - -// runWatch handles any update events from the cache.Notify until the -// config watch is shut down. -func (s *serviceConfigWatch) runWatch() { - firstRun := true - for { - select { - case <-s.ctx.Done(): - return - case event := <-s.updateCh: - if err := s.handleUpdate(event, false, firstRun); err != nil { - s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) - } - firstRun = false - } - } -} - -// handleUpdate receives an update event about either the service registration or the -// global config defaults, updates the local state and re-registers the service with -// the newly merged config. This function takes the serviceConfigWatch lock to ensure -// only one update can be happening at a time. -func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, firstRun bool) error { - // Take the agent state lock if needed. This is done before the local config watch - // lock in order to prevent a race between this config watch and others - the config - // watch lock is the inner lock and the agent stateLock is the outer lock. If this is the - // first run we also don't need to take the stateLock, as this is being waited on - // synchronously by a caller that already holds it. - if !locked && !firstRun { - s.agent.stateLock.Lock() - defer s.agent.stateLock.Unlock() - } - s.lock.Lock() - defer s.lock.Unlock() - - // If we got an error, log a warning if this is the first update; otherwise return the error. - // We want the initial update to cause a service registration no matter what. - if event.Err != nil { - if firstRun { - s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v", - s.registration.service.ID, event.Err) - } else { - return fmt.Errorf("error watching service config: %v", event.Err) + // Either we explicitly block waiting for defaults before registering, + // or we feed it some seed data (or NO data) and bypass the blocking + // operation. Either way the watcher will end up with something flagged + // as defaults even if they don't actually reflect actual defaults. + if waitForCentralConfig { + if err := w.fetchDefaults(); err != nil { + return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) } } else { - switch res := event.Result.(type) { - case *serviceRegistration: - s.registration = res - // We may need to restart watch if upstreams changed - if err := s.ensureConfigWatch(); err != nil { - return err - } - case *structs.ServiceConfigResponse: - // Sanity check this even came from the currently active watch to ignore - // rare races when switching cache keys - if event.CorrelationID != s.cacheKey { - // It's a no-op. The new watcher will deliver (or may have already - // delivered) the correct config so just ignore this old message. - return nil - } - s.defaults = res - default: - return fmt.Errorf("unknown update event type: %T", event) - } + w.defaults = previousDefaults } // Merge the local registration with the central defaults and update this service // in the local state. - service, err := s.mergeServiceConfig() + merged, err := w.mergeServiceConfig() if err != nil { return err } - if err := s.updateAgentRegistration(service); err != nil { - // If this is the initial registration, return the error through the readyCh - // so it can be passed back to the original caller. - if firstRun { - s.readyCh <- err - } + + // The first time we do this interactively, we need to know if it + // failed for validation reasons which we only get back from the + // initial underlying add service call. + err = w.agent.addServiceInternal(&addServiceRequest{ + service: merged, + chkTypes: w.registration.chkTypes, + persistService: w.registration.service, + persistDefaults: w.defaults, + persist: w.registration.persist, + persistServiceConfig: persistServiceConfig, + token: w.registration.token, + replaceExistingChecks: w.registration.replaceExistingChecks, + source: w.registration.source, + }) + if err != nil { return fmt.Errorf("error updating service registration: %v", err) } - // If this is the first registration, set the ready status by closing the channel. - if firstRun { - close(s.readyCh) + // Start the config watch, which starts a blocking query for the + // resolved service config in the background. + return w.start(ctx, wg) +} + +// NOTE: this is called while holding the Agent.stateLock +func (w *serviceConfigWatch) fetchDefaults() error { + req := makeConfigRequest(w.agent, w.registration) + + raw, _, err := w.agent.cache.Get(cachetype.ResolvedServiceConfigName, req) + if err != nil { + return err } + reply, ok := raw.(*structs.ServiceConfigResponse) + if !ok { + // This should never happen, but we want to protect against panics + return fmt.Errorf("internal error: response type not correct") + } + + w.defaults = reply + return nil +} + +// Start starts the config watch and a goroutine to handle updates over the +// updateCh. This is safe to call more than once assuming you have called Stop +// after each Start. +// +// NOTE: this is called while holding the Agent.stateLock +func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error { + w.ctx, w.cancelFunc = context.WithCancel(ctx) + + // Configure and start a cache.Notify goroutine to run a continuous + // blocking query on the resolved service config for this service. + req := makeConfigRequest(w.agent, w.registration) + w.cacheKey = req.CacheInfo().Key + + // We use the cache key as the correlationID here. Notify in general will not + // respond on the updateCh after the context is cancelled however there could + // possible be a race where it has only just got an update and checked the + // context before we cancel and so might still deliver the old event. Using + // the cacheKey allows us to ignore updates from the old cache watch and makes + // even this rare edge case safe. + err := w.agent.cache.Notify( + w.ctx, + cachetype.ResolvedServiceConfigName, + req, + w.cacheKey, + w.updateCh, + ) + if err != nil { + w.cancelFunc() + return err + } + + w.running.Add(1) + wg.Add(1) + go w.runWatch(wg) + return nil } -// updateAgentRegistration updates the service (and its sidecar, if applicable) in the -// local state. -func (s *serviceConfigWatch) updateAgentRegistration(ns *structs.NodeService) error { - return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, false, s.registration.source) +func (w *serviceConfigWatch) Stop() { + w.cancelFunc() + w.running.Wait() } -// ensureConfigWatch starts a cache.Notify goroutine to run a continuous -// blocking query on the resolved service config for this service. If the -// registration has changed in a way that requires a new blocking query, it will -// cancel any current watch and start a new one. It is a no-op if there is an -// existing watch that is sufficient for the current registration. It is not -// thread-safe and must only be called from the Start method (which is only safe -// to call once as documented) or from inside the run loop. -func (s *serviceConfigWatch) ensureConfigWatch() error { - ns := s.registration.service +// runWatch handles any update events from the cache.Notify until the +// config watch is shut down. +// +// NOTE: the caller must NOT hold the Agent.stateLock! +func (w *serviceConfigWatch) runWatch(wg *sync.WaitGroup) { + defer wg.Done() + defer w.running.Done() + + for { + select { + case <-w.ctx.Done(): + return + case event := <-w.updateCh: + if err := w.handleUpdate(event); err != nil { + w.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) + } + } + } +} + +// handleUpdate receives an update event the global config defaults, updates +// the local state and re-registers the service with the newly merged config. +// +// NOTE: the caller must NOT hold the Agent.stateLock! +func (w *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) error { + // If we got an error, log a warning if this is the first update; otherwise return the error. + // We want the initial update to cause a service registration no matter what. + if event.Err != nil { + return fmt.Errorf("error watching service config: %v", event.Err) + } + + res, ok := event.Result.(*structs.ServiceConfigResponse) + if !ok { + return fmt.Errorf("unknown update event type: %T", event) + } + + // Sanity check this even came from the currently active watch to ignore + // rare races when switching cache keys + if event.CorrelationID != w.cacheKey { + // It's a no-op. The new watcher will deliver (or may have already + // delivered) the correct config so just ignore this old message. + return nil + } + w.defaults = res + + // Merge the local registration with the central defaults and update this service + // in the local state. + merged, err := w.mergeServiceConfig() + if err != nil { + return err + } + + // While we were waiting on the agent state lock we may have been shutdown. + // So avoid doing a registration in that case. + select { + case <-w.ctx.Done(): + return nil + default: + } + + registerReq := &asyncRegisterRequest{ + Args: &addServiceRequest{ + service: merged, + chkTypes: w.registration.chkTypes, + persistService: w.registration.service, + persistDefaults: w.defaults, + persist: w.registration.persist, + persistServiceConfig: true, + token: w.registration.token, + replaceExistingChecks: w.registration.replaceExistingChecks, + source: w.registration.source, + }, + Reply: make(chan error, 1), + } + + select { + case <-w.ctx.Done(): + return nil + case w.registerCh <- registerReq: + } + + select { + case <-w.ctx.Done(): + return nil + + case err := <-registerReq.Reply: + if err != nil { + return fmt.Errorf("error updating service registration: %v", err) + } + return nil + } +} + +type asyncRegisterRequest struct { + Args *addServiceRequest + Reply chan error +} + +func makeConfigRequest(agent *Agent, registration *serviceRegistration) *structs.ServiceConfigRequest { + ns := registration.service name := ns.Service var upstreams []string @@ -296,62 +468,29 @@ func (s *serviceConfigWatch) ensureConfigWatch() error { req := &structs.ServiceConfigRequest{ Name: name, - Datacenter: s.agent.config.Datacenter, - QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, + Datacenter: agent.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: agent.tokens.AgentToken()}, Upstreams: upstreams, } - if s.registration.token != "" { - req.QueryOptions.Token = s.registration.token + if registration.token != "" { + req.QueryOptions.Token = registration.token } - - // See if this request is different from the current one - cacheKey := req.CacheInfo().Key - if cacheKey == s.cacheKey { - return nil - } - - // If there is an existing notify running, stop it first. This may leave a - // blocking query running in the background but the Notify loop will swallow - // the response and exit when it next unblocks so we can consider it stopped. - if s.notifyCancel != nil { - s.notifyCancel() - } - - // Make a new context just for this Notify call - ctx, cancel := context.WithCancel(s.ctx) - s.notifyCancel = cancel - s.cacheKey = cacheKey - // We use the cache key as the correlationID here. Notify in general will not - // respond on the updateCh after the context is cancelled however there could - // possible be a race where it has only just got an update and checked the - // context before we cancel and so might still deliver the old event. Using - // the cacheKey allows us to ignore updates from the old cache watch and makes - // even this rare edge case safe. - err := s.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req, - s.cacheKey, s.updateCh) - - return err -} - -// updateRegistration does a synchronous update of the local service registration and -// returns the result. The agent stateLock should be held when calling this function. -func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { - return s.handleUpdate(cache.UpdateEvent{ - Result: registration, - }, true, false) + return req } // mergeServiceConfig returns the final effective config for the watched service, // including the latest known global defaults from the servers. -func (s *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) { - if s.defaults == nil || (!s.registration.service.IsSidecarProxy() && !s.registration.service.IsMeshGateway()) { - return s.registration.service, nil +// +// NOTE: this is called while holding the Agent.stateLock +func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) { + if w.defaults == nil { + return w.registration.service, nil } // We don't want to change s.registration in place since it is our source of // truth about what was actually registered before defaults applied. So copy // it first. - nsRaw, err := copystructure.Copy(s.registration.service) + nsRaw, err := copystructure.Copy(w.registration.service) if err != nil { return nil, err } @@ -359,12 +498,12 @@ func (s *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) // Merge proxy defaults ns := nsRaw.(*structs.NodeService) - if err := mergo.Merge(&ns.Proxy.Config, s.defaults.ProxyConfig); err != nil { + if err := mergo.Merge(&ns.Proxy.Config, w.defaults.ProxyConfig); err != nil { return nil, err } if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { - ns.Proxy.MeshGateway.Mode = s.defaults.MeshGateway.Mode + ns.Proxy.MeshGateway.Mode = w.defaults.MeshGateway.Mode } // Merge upstream defaults if there were any returned @@ -380,7 +519,7 @@ func (s *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error) us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode } - usCfg, ok := s.defaults.UpstreamConfigs[us.DestinationName] + usCfg, ok := w.defaults.UpstreamConfigs[us.DestinationName] if !ok { // No config defaults to merge continue diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index f3afab6a46..892f2f542c 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -1,9 +1,17 @@ package agent import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" "testing" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/stretchr/testify/require" ) @@ -17,30 +25,18 @@ func TestServiceManager_RegisterService(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a global proxy and service config - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ProxyConfigEntry{ - Config: map[string]interface{}{ - "foo": 1, - }, + testApplyConfigEntries(t, a, + &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "redis", - Protocol: "tcp", - }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "redis", + Protocol: "tcp", + }, + ) // Now register a service locally with no sidecar, it should be a no-op. svc := &structs.NodeService{ @@ -73,42 +69,23 @@ func TestServiceManager_RegisterSidecar(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a global proxy and service config - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ProxyConfigEntry{ - Config: map[string]interface{}{ - "foo": 1, - }, + testApplyConfigEntries(t, a, + &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "web", - Protocol: "http", - }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "redis", - Protocol: "tcp", - }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "redis", + Protocol: "tcp", + }, + ) // Now register a sidecar proxy. Note we don't use SidecarService here because // that gets resolved earlier in config handling than the AddService call @@ -176,30 +153,18 @@ func TestServiceManager_RegisterMeshGateway(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a global proxy and service config - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ProxyConfigEntry{ - Config: map[string]interface{}{ - "foo": 1, - }, + testApplyConfigEntries(t, a, + &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "mesh-gateway", - Protocol: "http", - }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "mesh-gateway", + Protocol: "http", + }, + ) // Now register a mesh-gateway. svc := &structs.NodeService{ @@ -232,6 +197,382 @@ func TestServiceManager_RegisterMeshGateway(t *testing.T) { }, gateway) } +func TestServiceManager_PersistService_API(t *testing.T) { + // This is the ServiceManager version of TestAgent_PersistService and + // TestAgent_PurgeService. + t.Parallel() + + require := require.New(t) + + // Launch a server to manage the config entries. + serverAgent := NewTestAgent(t, t.Name(), `enable_central_service_config = true`) + defer serverAgent.Shutdown() + testrpc.WaitForLeader(t, serverAgent.RPC, "dc1") + + // Register a global proxy and service config + testApplyConfigEntries(t, serverAgent, + &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, + }, + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "redis", + Protocol: "tcp", + }, + ) + + // Now launch a single client agent + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + defer os.RemoveAll(dataDir) + + cfg := ` + enable_central_service_config = true + server = false + bootstrap = false + data_dir = "` + dataDir + `" + ` + a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir}) + defer a.Shutdown() + + // Join first + _, err := a.JoinLAN([]string{ + fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN), + }) + require.NoError(err) + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Now register a sidecar proxy via the API. + svc := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 21000, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + DestinationServiceID: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8000, + Upstreams: structs.Upstreams{ + { + DestinationName: "redis", + LocalBindPort: 5000, + }, + }, + }, + } + + expectState := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 21000, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + DestinationServiceID: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8000, + Config: map[string]interface{}{ + "foo": int64(1), + "protocol": "http", + }, + Upstreams: structs.Upstreams{ + { + DestinationName: "redis", + LocalBindPort: 5000, + Config: map[string]interface{}{ + "protocol": "tcp", + }, + }, + }, + }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + } + + svcFile := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID)) + configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, stringHash(svc.ID)) + + // Service is not persisted unless requested, but we always persist service configs. + require.NoError(a.AddService(svc, nil, false, "", ConfigSourceRemote)) + requireFileIsAbsent(t, svcFile) + requireFileIsPresent(t, configFile) + + // Persists to file if requested + require.NoError(a.AddService(svc, nil, true, "mytoken", ConfigSourceRemote)) + requireFileIsPresent(t, svcFile) + requireFileIsPresent(t, configFile) + + // Service definition file is sane. + expectJSONFile(t, svcFile, persistedService{ + Token: "mytoken", + Service: svc, + Source: "remote", + }, nil) + + // Service config file is sane. + expectJSONFile(t, configFile, persistedServiceConfig{ + ServiceID: "web-sidecar-proxy", + Defaults: &structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "foo": 1, + "protocol": "http", + }, + UpstreamConfigs: map[string]map[string]interface{}{ + "redis": map[string]interface{}{ + "protocol": "tcp", + }, + }, + }, + }, resetDefaultsQueryMeta) + + // Verify in memory state. + { + sidecarService := a.State.Service("web-sidecar-proxy") + require.NotNil(sidecarService) + require.Equal(expectState, sidecarService) + } + + // Updates service definition on disk + svc.Proxy.LocalServicePort = 8001 + require.NoError(a.AddService(svc, nil, true, "mytoken", ConfigSourceRemote)) + requireFileIsPresent(t, svcFile) + requireFileIsPresent(t, configFile) + + // Service definition file is updated. + expectJSONFile(t, svcFile, persistedService{ + Token: "mytoken", + Service: svc, + Source: "remote", + }, nil) + + // Service config file is the same. + expectJSONFile(t, configFile, persistedServiceConfig{ + ServiceID: "web-sidecar-proxy", + Defaults: &structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "foo": 1, + "protocol": "http", + }, + UpstreamConfigs: map[string]map[string]interface{}{ + "redis": map[string]interface{}{ + "protocol": "tcp", + }, + }, + }, + }, resetDefaultsQueryMeta) + + // Verify in memory state. + expectState.Proxy.LocalServicePort = 8001 + { + sidecarService := a.State.Service("web-sidecar-proxy") + require.NotNil(sidecarService) + require.Equal(expectState, sidecarService) + } + + // Kill the agent to restart it. + a.Shutdown() + + // Kill the server so that it can't phone home and must rely upon the persisted defaults. + serverAgent.Shutdown() + + // Should load it back during later start. + a2 := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir}) + defer a2.Shutdown() + + { + restored := a.State.Service("web-sidecar-proxy") + require.NotNil(restored) + require.Equal(expectState, restored) + } + + // Now remove it. + require.NoError(a2.RemoveService("web-sidecar-proxy")) + requireFileIsAbsent(t, svcFile) + requireFileIsAbsent(t, configFile) +} + +func TestServiceManager_PersistService_ConfigFiles(t *testing.T) { + // This is the ServiceManager version of TestAgent_PersistService and + // TestAgent_PurgeService but for config files. + t.Parallel() + + require := require.New(t) + + // Launch a server to manage the config entries. + serverAgent := NewTestAgent(t, t.Name(), `enable_central_service_config = true`) + defer serverAgent.Shutdown() + testrpc.WaitForLeader(t, serverAgent.RPC, "dc1") + + // Register a global proxy and service config + testApplyConfigEntries(t, serverAgent, + &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, + }, + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "redis", + Protocol: "tcp", + }, + ) + + // Now launch a single client agent + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + defer os.RemoveAll(dataDir) + + serviceSnippet := ` + service = { + kind = "connect-proxy" + id = "web-sidecar-proxy" + name = "web-sidecar-proxy" + port = 21000 + token = "mytoken" + proxy { + destination_service_name = "web" + destination_service_id = "web" + local_service_address = "127.0.0.1" + local_service_port = 8000 + upstreams = [{ + destination_name = "redis" + local_bind_port = 5000 + }] + } + } + ` + + cfg := ` + enable_central_service_config = true + data_dir = "` + dataDir + `" + server = false + bootstrap = false + ` + serviceSnippet + + a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir}) + defer a.Shutdown() + + // Join first + _, err := a.JoinLAN([]string{ + fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN), + }) + require.NoError(err) + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Now register a sidecar proxy via the API. + svcID := "web-sidecar-proxy" + + expectState := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 21000, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + DestinationServiceID: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8000, + Config: map[string]interface{}{ + "foo": int64(1), + "protocol": "http", + }, + Upstreams: structs.Upstreams{ + { + DestinationType: "service", + DestinationName: "redis", + LocalBindPort: 5000, + Config: map[string]interface{}{ + "protocol": "tcp", + }, + }, + }, + }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + } + + // Now wait until we've re-registered using central config updated data. + retry.Run(t, func(r *retry.R) { + a.stateLock.Lock() + defer a.stateLock.Unlock() + current := a.State.Service("web-sidecar-proxy") + if current == nil { + r.Fatalf("service is missing") + } + if !reflect.DeepEqual(expectState, current) { + r.Fatalf("expected: %#v\nactual :%#v", expectState, current) + } + }) + + svcFile := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svcID)) + configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, stringHash(svcID)) + + // Service is never persisted, but we always persist service configs. + requireFileIsAbsent(t, svcFile) + requireFileIsPresent(t, configFile) + + // Service config file is sane. + expectJSONFile(t, configFile, persistedServiceConfig{ + ServiceID: "web-sidecar-proxy", + Defaults: &structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "foo": 1, + "protocol": "http", + }, + UpstreamConfigs: map[string]map[string]interface{}{ + "redis": map[string]interface{}{ + "protocol": "tcp", + }, + }, + }, + }, resetDefaultsQueryMeta) + + // Verify in memory state. + { + sidecarService := a.State.Service("web-sidecar-proxy") + require.NotNil(sidecarService) + require.Equal(expectState, sidecarService) + } + + // Kill the agent to restart it. + a.Shutdown() + + // Kill the server so that it can't phone home and must rely upon the persisted defaults. + serverAgent.Shutdown() + + // Should load it back during later start. + a2 := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir}) + defer a2.Shutdown() + + { + restored := a.State.Service("web-sidecar-proxy") + require.NotNil(restored) + require.Equal(expectState, restored) + } + + // Now remove it. + require.NoError(a2.RemoveService("web-sidecar-proxy")) + requireFileIsAbsent(t, svcFile) + requireFileIsAbsent(t, configFile) +} + func TestServiceManager_Disabled(t *testing.T) { require := require.New(t) @@ -241,42 +582,23 @@ func TestServiceManager_Disabled(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a global proxy and service config - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ProxyConfigEntry{ - Config: map[string]interface{}{ - "foo": 1, - }, + testApplyConfigEntries(t, a, + &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "web", - Protocol: "http", - }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } - { - args := &structs.ConfigEntryRequest{ - Datacenter: "dc1", - Entry: &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "redis", - Protocol: "tcp", - }, - } - var out bool - require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) - } + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "redis", + Protocol: "tcp", + }, + ) // Now register a sidecar proxy. Note we don't use SidecarService here because // that gets resolved earlier in config handling than the AddService call @@ -329,3 +651,91 @@ func TestServiceManager_Disabled(t *testing.T) { }, }, sidecarService) } + +func testApplyConfigEntries(t *testing.T, a *TestAgent, entries ...structs.ConfigEntry) { + t.Helper() + for _, entry := range entries { + args := &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: entry, + } + var out bool + require.NoError(t, a.RPC("ConfigEntry.Apply", args, &out)) + } +} + +func requireFileIsAbsent(t *testing.T, file string) { + t.Helper() + if _, err := os.Stat(file); !os.IsNotExist(err) { + t.Fatalf("should not persist") + } +} + +func requireFileIsPresent(t *testing.T, file string) { + t.Helper() + if _, err := os.Stat(file); err != nil { + t.Fatalf("err: %v", err) + } +} + +func expectJSONFile(t *testing.T, file string, expect interface{}, fixupContentBeforeCompareFn func([]byte) ([]byte, error)) { + t.Helper() + + expected, err := json.Marshal(expect) + require.NoError(t, err) + + content, err := ioutil.ReadFile(file) + require.NoError(t, err) + + if fixupContentBeforeCompareFn != nil { + content, err = fixupContentBeforeCompareFn(content) + require.NoError(t, err) + } + + require.JSONEq(t, string(expected), string(content)) +} + +// resetDefaultsQueryMeta will reset the embedded fields from structs.QueryMeta +// to their zero values in the json object keyed under 'Defaults'. +func resetDefaultsQueryMeta(content []byte) ([]byte, error) { + var raw map[string]interface{} + if err := json.Unmarshal(content, &raw); err != nil { + return nil, err + } + def, ok := raw["Defaults"] + if !ok { + return content, nil + } + + rawDef, ok := def.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("unexpected structure found in 'Defaults' key") + } + + qmZero, err := convertToMap(structs.QueryMeta{}) + if err != nil { + return nil, err + } + + for k, v := range qmZero { + rawDef[k] = v + } + + raw["Defaults"] = rawDef + + return json.Marshal(raw) +} + +func convertToMap(v interface{}) (map[string]interface{}, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + + var raw map[string]interface{} + if err := json.Unmarshal(b, &raw); err != nil { + return nil, err + } + + return raw, nil +}