diff --git a/agent/agent.go b/agent/agent.go index 063b120eb8..74c63c8933 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1984,18 +1984,27 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { return nil } +// AddServiceAndReplaceChecks is used to add a service entry and its check. Any check for this service missing from chkTypes will be deleted. +// This entry is persistent and the agent will make a best effort to +// ensure it is registered +func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { + a.stateLock.Lock() + defer a.stateLock.Unlock() + return a.addServiceLocked(service, chkTypes, persist, token, true, source) +} + // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { a.stateLock.Lock() defer a.stateLock.Unlock() - return a.addServiceLocked(service, chkTypes, persist, token, source) + return a.addServiceLocked(service, chkTypes, persist, token, false, source) } // addServiceLocked adds a service entry to the service manager if enabled, or directly // to the local state if it is not. This function assumes the state lock is already held. -func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { +func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error { if err := a.validateService(service, chkTypes); err != nil { return err } @@ -2004,11 +2013,11 @@ func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*struc return a.serviceManager.AddService(service, chkTypes, persist, token, source) } - return a.addServiceInternal(service, chkTypes, persist, token, source) + return a.addServiceInternal(service, chkTypes, persist, token, replaceExistingChecks, source) } // addServiceInternal adds the given service and checks to the local state. -func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { +func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error { // Pause the service syncs during modification a.PauseSync() defer a.ResumeSync() @@ -2020,8 +2029,17 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str var checks []*structs.HealthCheck + existingChecks := map[types.CheckID]bool{} + for _, check := range a.State.Checks() { + if check.ServiceID == service.ID { + existingChecks[check.CheckID] = false + } + } + // Create an associated health check for i, chkType := range chkTypes { + existingChecks[chkType.CheckID] = true + checkID := string(chkType.CheckID) if checkID == "" { checkID = fmt.Sprintf("service:%s", service.ID) @@ -2102,6 +2120,14 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str } } + if replaceExistingChecks { + for checkID, keep := range existingChecks { + if !keep { + a.removeCheckLocked(checkID, persist) + } + } + } + return nil } @@ -2845,13 +2871,13 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { // syntax sugar and shouldn't be persisted in local or server state. ns.Connect.SidecarService = nil - if err := a.addServiceLocked(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil { + if err := a.addServiceLocked(ns, chkTypes, false, service.Token, false, ConfigSourceLocal); err != nil { return fmt.Errorf("Failed to register service %q: %v", service.Name, err) } // If there is a sidecar service, register that too. if sidecar != nil { - if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil { + if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, false, ConfigSourceLocal); err != nil { return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err) } } @@ -2914,7 +2940,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", serviceID, file) - if err := a.addServiceLocked(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil { + if err := a.addServiceLocked(p.Service, nil, false, p.Token, false, ConfigSourceLocal); err != nil { return fmt.Errorf("failed adding service %q: %s", serviceID, err) } } diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 6addef95bf..6653704025 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -915,8 +915,21 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re } // Add the service. - if err := s.agent.AddService(ns, chkTypes, true, token, ConfigSourceRemote); err != nil { - return nil, err + replaceExistingChecks := false + + query := req.URL.Query() + if len(query["replace-existing-checks"]) > 0 && (query.Get("replace-existing-checks") == "" || query.Get("replace-existing-checks") == "true") { + replaceExistingChecks = true + } + + if replaceExistingChecks { + if err := s.agent.AddServiceAndReplaceChecks(ns, chkTypes, true, token, ConfigSourceRemote); err != nil { + return nil, err + } + } else { + if err := s.agent.AddService(ns, chkTypes, true, token, ConfigSourceRemote); err != nil { + return nil, err + } } // Add sidecar. if sidecar != nil { diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 34d54f6f32..944273a2d9 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "reflect" + "sort" "strings" "testing" "time" @@ -2414,6 +2415,136 @@ func TestAgent_RegisterService(t *testing.T) { } } +func TestAgent_RegisterService_ReRegister(t *testing.T) { + t.Parallel() + a := NewTestAgent(t, t.Name(), "") + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + args := &structs.ServiceDefinition{ + Name: "test", + Meta: map[string]string{"hello": "world"}, + Tags: []string{"master"}, + Port: 8000, + Checks: []*structs.CheckType{ + &structs.CheckType{ + CheckID: types.CheckID("check_1"), + TTL: 20 * time.Second, + }, + &structs.CheckType{ + CheckID: types.CheckID("check_2"), + TTL: 30 * time.Second, + }, + }, + Weights: &structs.Weights{ + Passing: 100, + Warning: 3, + }, + } + req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args)) + _, err := a.srv.AgentRegisterService(nil, req) + require.NoError(t, err) + + args = &structs.ServiceDefinition{ + Name: "test", + Meta: map[string]string{"hello": "world"}, + Tags: []string{"master"}, + Port: 8000, + Checks: []*structs.CheckType{ + &structs.CheckType{ + CheckID: types.CheckID("check_1"), + TTL: 20 * time.Second, + }, + &structs.CheckType{ + CheckID: types.CheckID("check_3"), + TTL: 30 * time.Second, + }, + }, + Weights: &structs.Weights{ + Passing: 100, + Warning: 3, + }, + } + req, _ = http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args)) + _, err = a.srv.AgentRegisterService(nil, req) + require.NoError(t, err) + + checks := a.State.Checks() + require.Equal(t, 3, len(checks)) + + checkIDs := []string{} + for id := range checks { + checkIDs = append(checkIDs, string(id)) + } + sort.Strings(checkIDs) + require.Equal(t, []string{"check_1", "check_2", "check_3"}, checkIDs) +} + +func TestAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T) { + t.Parallel() + a := NewTestAgent(t, t.Name(), "") + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + args := &structs.ServiceDefinition{ + Name: "test", + Meta: map[string]string{"hello": "world"}, + Tags: []string{"master"}, + Port: 8000, + Checks: []*structs.CheckType{ + &structs.CheckType{ + CheckID: types.CheckID("check_1"), + TTL: 20 * time.Second, + }, + &structs.CheckType{ + CheckID: types.CheckID("check_2"), + TTL: 30 * time.Second, + }, + }, + Weights: &structs.Weights{ + Passing: 100, + Warning: 3, + }, + } + req, _ := http.NewRequest("PUT", "/v1/agent/service/register?replace-existing-checks", jsonReader(args)) + _, err := a.srv.AgentRegisterService(nil, req) + require.NoError(t, err) + + args = &structs.ServiceDefinition{ + Name: "test", + Meta: map[string]string{"hello": "world"}, + Tags: []string{"master"}, + Port: 8000, + Checks: []*structs.CheckType{ + &structs.CheckType{ + CheckID: types.CheckID("check_1"), + TTL: 20 * time.Second, + }, + &structs.CheckType{ + CheckID: types.CheckID("check_3"), + TTL: 30 * time.Second, + }, + }, + Weights: &structs.Weights{ + Passing: 100, + Warning: 3, + }, + } + req, _ = http.NewRequest("PUT", "/v1/agent/service/register?replace-existing-checks", jsonReader(args)) + _, err = a.srv.AgentRegisterService(nil, req) + require.NoError(t, err) + + checks := a.State.Checks() + require.Equal(t, 2, len(checks)) + + checkIDs := []string{} + for id := range checks { + checkIDs = append(checkIDs, string(id)) + } + sort.Strings(checkIDs) + require.Equal(t, []string{"check_1", "check_3"}, checkIDs) +} + func TestAgent_RegisterService_TranslateKeys(t *testing.T) { t.Parallel() tests := []struct { diff --git a/agent/service_manager.go b/agent/service_manager.go index 0c8d65eb1f..001d125da6 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -39,7 +39,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // For now only sidecar proxies have anything that can be configured // centrally. So bypass the whole manager for regular services. if !service.IsSidecarProxy() && !service.IsMeshGateway() { - return s.agent.addServiceInternal(service, chkTypes, persist, token, source) + return s.agent.addServiceInternal(service, chkTypes, persist, token, false, source) } s.lock.Lock() @@ -263,7 +263,7 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, first // updateAgentRegistration updates the service (and its sidecar, if applicable) in the // local state. func (s *serviceConfigWatch) updateAgentRegistration(ns *structs.NodeService) error { - return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) + return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, false, s.registration.source) } // ensureConfigWatch starts a cache.Notify goroutine to run a continuous diff --git a/website/source/api/agent/service.html.md b/website/source/api/agent/service.html.md index 8b97be9365..4dff0f7b76 100644 --- a/website/source/api/agent/service.html.md +++ b/website/source/api/agent/service.html.md @@ -461,7 +461,7 @@ Parameters and response format are the same as ## Register Service -This endpoint adds a new service, with an optional health check, to the local +This endpoint adds a new service, with optional health checks, to the local agent. The agent is responsible for managing the status of its local services, and for @@ -485,6 +485,10 @@ The table below shows this endpoint's support for | ---------------- | ----------------- | ------------- | --------------- | | `NO` | `none` | `none` | `service:write` | +### Query string parameters + +- `replace-existing-checks` - Missing healthchecks from the request will be deleted from the agent. Using this parameter allows to idempotently register a service and its checks whithout having to manually deregister checks. + ### Parameters Note that this endpoint, unlike most also [supports `snake_case`](/docs/agent/services.html#service-definition-parameter-case) @@ -623,7 +627,7 @@ For the `Connect` field, the parameters are: $ curl \ --request PUT \ --data @payload.json \ - http://127.0.0.1:8500/v1/agent/service/register + http://127.0.0.1:8500/v1/agent/service/register?replace-existing-checks=1 ``` ## Deregister Service