diff --git a/agent/agent.go b/agent/agent.go index 0dadd7025f..32720d6c29 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1407,7 +1407,7 @@ func (a *Agent) reapServicesInternal() { } // reapServices is a long running goroutine that looks for checks that have been -// critical too long and dregisters their associated services. +// critical too long and deregisters their associated services. func (a *Agent) reapServices() { for { select { diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index 175b0d4a26..1eb30fc3a6 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -8,13 +8,15 @@ import ( "github.com/hashicorp/consul/agent/structs" ) +var durations = NewDurationFixer("interval", "timeout", "deregistercriticalserviceafter") + func (s *HTTPServer) CatalogRegister(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "PUT" { return nil, MethodNotAllowedError{req.Method, []string{"PUT"}} } var args structs.RegisterRequest - if err := decodeBody(req, &args, nil); err != nil { + if err := decodeBody(req, &args, durations.FixupDurations); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Request decode failed: %v", err) return nil, nil diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index c6997fa702..0d6fef4ee9 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -28,8 +28,11 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now()) // Verify the args. - if args.Node == "" || args.Address == "" { - return fmt.Errorf("Must provide node and address") + if args.Node == "" { + return fmt.Errorf("Must provide node") + } + if args.Address == "" && !args.SkipNodeUpdate { + return fmt.Errorf("Must provide address if SkipNodeUpdate is not set") } if args.ID != "" { if _, err := uuid.ParseUUID(string(args.ID)); err != nil { diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index dd2d496088..f6825f990a 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -78,6 +78,45 @@ func TestCatalog_RegisterService_InvalidAddress(t *testing.T) { } } +func TestCatalog_RegisterService_SkipNodeUpdate(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Register a node + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + } + var out struct{} + err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) + if err != nil { + t.Fatal(err) + } + + // Update it with a blank address, should fail. + arg.Address = "" + arg.Service = &structs.NodeService{ + Service: "db", + Port: 8000, + } + err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) + if err == nil || err.Error() != "Must provide address if SkipNodeUpdate is not set" { + t.Fatalf("got error %v want 'Must provide address...'", err) + } + + // Set SkipNodeUpdate, should succeed + arg.SkipNodeUpdate = true + err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out) + if err != nil { + t.Fatal(err) + } +} + func TestCatalog_Register_NodeID(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) diff --git a/agent/coordinate_endpoint.go b/agent/coordinate_endpoint.go index 4e4ea12960..1a16964fe2 100644 --- a/agent/coordinate_endpoint.go +++ b/agent/coordinate_endpoint.go @@ -138,3 +138,25 @@ func filterCoordinates(req *http.Request, in structs.Coordinates) structs.Coordi } return out } + +// CoordinateUpdate inserts or updates the LAN coordinate of a node. +func (s *HTTPServer) CoordinateUpdate(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" { + return nil, MethodNotAllowedError{req.Method, []string{"PUT"}} + } + + args := structs.CoordinateUpdateRequest{} + if err := decodeBody(req, &args, nil); err != nil { + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(resp, "Request decode failed: %v", err) + return nil, nil + } + s.parseDC(req, &args.Datacenter) + + var reply struct{} + if err := s.agent.RPC("Coordinate.Update", &args, &reply); err != nil { + return nil, err + } + + return nil, nil +} diff --git a/agent/coordinate_endpoint_test.go b/agent/coordinate_endpoint_test.go index b34197c4a9..09001ccabe 100644 --- a/agent/coordinate_endpoint_test.go +++ b/agent/coordinate_endpoint_test.go @@ -243,3 +243,49 @@ func TestCoordinate_Node(t *testing.T) { t.Fatalf("bad: %v", resp.Code) } } + +func TestCoordinate_Update(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + // Register the node. + reg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + } + var reply struct{} + if err := a.RPC("Catalog.Register", ®, &reply); err != nil { + t.Fatalf("err: %s", err) + } + + // Update the coordinates and wait for it to complete. + coord := coordinate.NewCoordinate(coordinate.DefaultConfig()) + coord.Height = -5.0 + body := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "foo", + Coord: coord, + } + req, _ := http.NewRequest("PUT", "/v1/coordinate/update", jsonReader(body)) + resp := httptest.NewRecorder() + _, err := a.srv.CoordinateUpdate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(300 * time.Millisecond) + + // Query back and check the coordinates are present. + args := structs.NodeSpecificRequest{Node: "foo", Datacenter: "dc1"} + var coords structs.IndexedCoordinates + if err := a.RPC("Coordinate.Node", &args, &coords); err != nil { + t.Fatalf("err: %s", err) + } + + coordinates := coords.Coordinates + if len(coordinates) != 1 || + coordinates[0].Node != "foo" { + t.Fatalf("bad: %v", coordinates) + } +} diff --git a/agent/http.go b/agent/http.go index 0abaf42346..d14858d969 100644 --- a/agent/http.go +++ b/agent/http.go @@ -140,10 +140,12 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler { handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) handleFuncMetrics("/v1/coordinate/node/", s.wrap(s.CoordinateNode)) + handleFuncMetrics("/v1/coordinate/update", s.wrap(s.CoordinateUpdate)) } else { handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) handleFuncMetrics("/v1/coordinate/node/", s.wrap(coordinateDisabled)) + handleFuncMetrics("/v1/coordinate/update", s.wrap(coordinateDisabled)) } handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) diff --git a/agent/operator_endpoint.go b/agent/operator_endpoint.go index 5a3c0d38a9..7ae505b194 100644 --- a/agent/operator_endpoint.go +++ b/agent/operator_endpoint.go @@ -4,7 +4,6 @@ import ( "fmt" "net/http" "strconv" - "strings" "time" "github.com/hashicorp/consul/agent/structs" @@ -220,7 +219,8 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re s.parseToken(req, &args.Token) var conf api.AutopilotConfiguration - if err := decodeBody(req, &conf, FixupConfigDurations); err != nil { + durations := NewDurationFixer("lastcontactthreshold", "serverstabilizationtime") + if err := decodeBody(req, &conf, durations.FixupDurations); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Error parsing autopilot config: %v", err) return nil, nil @@ -265,29 +265,6 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re } } -// FixupConfigDurations is used to handle parsing the duration fields in -// the Autopilot config struct -func FixupConfigDurations(raw interface{}) error { - rawMap, ok := raw.(map[string]interface{}) - if !ok { - return nil - } - for key, val := range rawMap { - if strings.ToLower(key) == "lastcontactthreshold" || - strings.ToLower(key) == "serverstabilizationtime" { - // Convert a string value into an integer - if vStr, ok := val.(string); ok { - dur, err := time.ParseDuration(vStr) - if err != nil { - return err - } - rawMap[key] = dur - } - } - } - return nil -} - // OperatorServerHealth is used to get the health of the servers in the local DC func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "GET" { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index ca6628d130..b50f912f86 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -482,9 +482,22 @@ type HealthCheck struct { ServiceName string // optional service name ServiceTags []string // optional service tags + Definition HealthCheckDefinition + RaftIndex } +type HealthCheckDefinition struct { + HTTP string `json:",omitempty"` + TLSSkipVerify bool `json:",omitempty"` + Header map[string][]string `json:",omitempty"` + Method string `json:",omitempty"` + TCP string `json:",omitempty"` + Interval api.ReadableDuration `json:",omitempty"` + Timeout api.ReadableDuration `json:",omitempty"` + DeregisterCriticalServiceAfter api.ReadableDuration `json:",omitempty"` +} + // IsSame checks if one HealthCheck is the same as another, without looking // at the Raft information (that's why we didn't call it IsEqual). This is // useful for seeing if an update would be idempotent for all the functional diff --git a/agent/util.go b/agent/util.go index 1044a7742e..0ab2fb6f24 100644 --- a/agent/util.go +++ b/agent/util.go @@ -9,6 +9,8 @@ import ( "os/signal" osuser "os/user" "strconv" + "strings" + "time" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-msgpack/codec" @@ -113,3 +115,56 @@ func ForwardSignals(cmd *exec.Cmd, logFn func(error), shutdownCh <-chan struct{} } }() } + +type durationFixer map[string]bool + +func NewDurationFixer(fields ...string) durationFixer { + d := make(map[string]bool) + for _, field := range fields { + d[field] = true + } + return d +} + +// FixupDurations is used to handle parsing any field names in the map to time.Durations +func (d durationFixer) FixupDurations(raw interface{}) error { + rawMap, ok := raw.(map[string]interface{}) + if !ok { + return nil + } + for key, val := range rawMap { + switch val.(type) { + case map[string]interface{}: + if err := d.FixupDurations(val); err != nil { + return err + } + + case []interface{}: + for _, v := range val.([]interface{}) { + if err := d.FixupDurations(v); err != nil { + return err + } + } + + case []map[string]interface{}: + for _, v := range val.([]map[string]interface{}) { + if err := d.FixupDurations(v); err != nil { + return err + } + } + + default: + if d[strings.ToLower(key)] { + // Convert a string value into an integer + if vStr, ok := val.(string); ok { + dur, err := time.ParseDuration(vStr) + if err != nil { + return err + } + rawMap[key] = dur + } + } + } + } + return nil +} diff --git a/agent/util_test.go b/agent/util_test.go index b10f274c02..f90589782e 100644 --- a/agent/util_test.go +++ b/agent/util_test.go @@ -4,8 +4,10 @@ import ( "os" "runtime" "testing" + "time" "github.com/hashicorp/consul/testutil" + "github.com/pascaldekloe/goe/verify" ) func TestStringHash(t *testing.T) { @@ -74,3 +76,46 @@ func TestSetFilePermissions(t *testing.T) { t.Fatalf("bad: %s", fi.Mode()) } } + +func TestDurationFixer(t *testing.T) { + obj := map[string]interface{}{ + "key1": []map[string]interface{}{ + { + "subkey1": "10s", + }, + { + "subkey2": "5d", + }, + }, + "key2": map[string]interface{}{ + "subkey3": "30s", + "subkey4": "20m", + }, + "key3": "11s", + "key4": "49h", + } + expected := map[string]interface{}{ + "key1": []map[string]interface{}{ + { + "subkey1": 10 * time.Second, + }, + { + "subkey2": "5d", + }, + }, + "key2": map[string]interface{}{ + "subkey3": "30s", + "subkey4": 20 * time.Minute, + }, + "key3": "11s", + "key4": 49 * time.Hour, + } + + fixer := NewDurationFixer("key4", "subkey1", "subkey4") + if err := fixer.FixupDurations(obj); err != nil { + t.Fatal(err) + } + + // Ensure we only processed the intended fieldnames + verify.Values(t, "", obj, expected) +} diff --git a/api/agent.go b/api/agent.go index 533b245578..2992791b3c 100644 --- a/api/agent.go +++ b/api/agent.go @@ -15,6 +15,7 @@ type AgentCheck struct { Output string ServiceID string ServiceName string + Definition HealthCheckDefinition } // AgentService represents a service known to the agent diff --git a/api/catalog.go b/api/catalog.go index babfc9a1df..08da6e16ea 100644 --- a/api/catalog.go +++ b/api/catalog.go @@ -42,6 +42,7 @@ type CatalogRegistration struct { Datacenter string Service *AgentService Check *AgentCheck + SkipNodeUpdate bool } type CatalogDeregistration struct { diff --git a/api/coordinate.go b/api/coordinate.go index 42df0decaf..53318f11dd 100644 --- a/api/coordinate.go +++ b/api/coordinate.go @@ -67,6 +67,23 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err return out, qm, nil } +// Update inserts or updates the LAN coordinate of a node. +func (c *Coordinate) Update(coord *CoordinateEntry, q *WriteOptions) (*WriteMeta, error) { + r := c.c.newRequest("PUT", "/v1/coordinate/update") + r.setWriteOptions(q) + r.obj = coord + rtt, resp, err := requireOK(c.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + wm := &WriteMeta{} + wm.RequestTime = rtt + + return wm, nil +} + // Node is used to return the coordinates of a single in the LAN pool. func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) { r := c.c.newRequest("GET", "/v1/coordinate/node/"+node) diff --git a/api/coordinate_test.go b/api/coordinate_test.go index cf0f82c331..3526201488 100644 --- a/api/coordinate_test.go +++ b/api/coordinate_test.go @@ -3,8 +3,11 @@ package api import ( "strings" "testing" + "time" "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/serf/coordinate" + "github.com/pascaldekloe/goe/verify" ) func TestAPI_CoordinateDatacenters(t *testing.T) { @@ -12,9 +15,9 @@ func TestAPI_CoordinateDatacenters(t *testing.T) { c, s := makeClient(t) defer s.Stop() - coordinate := c.Coordinate() + coord := c.Coordinate() retry.Run(t, func(r *retry.R) { - datacenters, err := coordinate.Datacenters() + datacenters, err := coord.Datacenters() if err != nil { r.Fatal(err) } @@ -30,9 +33,9 @@ func TestAPI_CoordinateNodes(t *testing.T) { c, s := makeClient(t) defer s.Stop() - coordinate := c.Coordinate() + coord := c.Coordinate() retry.Run(t, func(r *retry.R) { - _, _, err := coordinate.Nodes(nil) + _, _, err := coord.Nodes(nil) if err != nil { r.Fatal(err) } @@ -49,9 +52,9 @@ func TestAPI_CoordinateNode(t *testing.T) { c, s := makeClient(t) defer s.Stop() - coordinate := c.Coordinate() + coord := c.Coordinate() retry.Run(t, func(r *retry.R) { - _, _, err := coordinate.Node(s.Config.NodeName, nil) + _, _, err := coord.Node(s.Config.NodeName, nil) if err != nil && !strings.Contains(err.Error(), "Unexpected response code: 404") { r.Fatal(err) } @@ -62,3 +65,42 @@ func TestAPI_CoordinateNode(t *testing.T) { // get an error. }) } + +func TestAPI_CoordinateUpdate(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + node := "foo" + _, err := c.Catalog().Register(&CatalogRegistration{ + Node: node, + Address: "1.1.1.1", + }, nil) + if err != nil { + t.Fatal(err) + } + + coord := c.Coordinate() + newCoord := coordinate.NewCoordinate(coordinate.DefaultConfig()) + newCoord.Height = 0.5 + entry := &CoordinateEntry{ + Node: node, + Coord: newCoord, + } + _, err = coord.Update(entry, nil) + if err != nil { + t.Fatal(err) + } + + retryer := &retry.Timer{Timeout: 5 * time.Second, Wait: 1 * time.Second} + retry.RunWith(retryer, t, func(r *retry.R) { + coords, _, err := coord.Node(node, nil) + if err != nil { + r.Fatal(err) + } + if len(coords) != 1 { + r.Fatalf("bad: %v", coords) + } + verify.Values(r, "", coords[0], entry) + }) +} diff --git a/api/health.go b/api/health.go index 38c105fdb9..53f3de4f79 100644 --- a/api/health.go +++ b/api/health.go @@ -34,6 +34,21 @@ type HealthCheck struct { ServiceID string ServiceName string ServiceTags []string + + Definition HealthCheckDefinition +} + +// HealthCheckDefinition is used to store the details about +// a health check's execution. +type HealthCheckDefinition struct { + HTTP string + Header map[string][]string + Method string + TLSSkipVerify bool + TCP string + Interval ReadableDuration + Timeout ReadableDuration + DeregisterCriticalServiceAfter ReadableDuration } // HealthChecks is a collection of HealthCheck structs. diff --git a/website/source/api/catalog.html.md b/website/source/api/catalog.html.md index dfb95cd0e7..bfee1d0f43 100644 --- a/website/source/api/catalog.html.md +++ b/website/source/api/catalog.html.md @@ -69,9 +69,16 @@ The table below shows this endpoint's support for treated as a service level health check, instead of a node level health check. The `Status` must be one of `passing`, `warning`, or `critical`. + The `Definition` field can be provided with details for a TCP or HTTP health + check. For more information, see the [Health Checks](/docs/agent/checks.html) page. + Multiple checks can be provided by replacing `Check` with `Checks` and sending an array of `Check` objects. +- `SkipNodeUpdate` `(bool: false)` - Specifies whether to skip updating the + node part of the registration. Useful in the case where only a health check + or service entry on a node needs to be updated. + It is important to note that `Check` does not have to be provided with `Service` and vice versa. A catalog entry can have either, neither, or both. @@ -106,8 +113,15 @@ and vice versa. A catalog entry can have either, neither, or both. "Name": "Redis health check", "Notes": "Script based health check", "Status": "passing", - "ServiceID": "redis1" - } + "ServiceID": "redis1", + "Definition": { + "TCP": "localhost:8888", + "Interval": "5s", + "Timeout": "1s", + "DeregisterCriticalServiceAfter": "30s" + } + }, + "SkipNodeUpdate": false } ``` diff --git a/website/source/api/coordinate.html.md b/website/source/api/coordinate.html.md index a1bfecb606..b8b6ac83eb 100644 --- a/website/source/api/coordinate.html.md +++ b/website/source/api/coordinate.html.md @@ -183,3 +183,51 @@ $ curl \ In **Consul Enterprise**, this may include multiple coordinates for the same node, each marked with a different `Segment`. Coordinates are only compatible within the same segment. + +## Update LAN Coordinates for a node + +This endpoint updates the LAN network coordinates for a node in a given +datacenter. + +| Method | Path | Produces | +| ------ | ---------------------------- | -------------------------- | +| `PUT` | `/coordinate/update` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/api/index.html#blocking-queries), +[consistency modes](/api/index.html#consistency-modes), and +[required ACLs](/api/index.html#acls). + +| Blocking Queries | Consistency Modes | ACL Required | +| ---------------- | ----------------- | ------------ | +| `NO` | `none` | `node:write` | + +### Parameters + +- `dc` `(string: "")` - Specifies the datacenter to query. This will default to + the datacenter of the agent being queried. This is specified as part of the + URL as a query parameter. + +### Sample Payload + +```text +{ + "Node": "agent-one", + "Segment": "", + "Coord": { + "Adjustment": 0, + "Error": 1.5, + "Height": 0, + "Vec": [0, 0, 0, 0, 0, 0, 0, 0] + } +} +``` + +### Sample Request + +```text +$ curl \ + --request PUT \ + --data @payload.json \ + https://consul.rocks/v1/coordinate/update +```