From 08f81ebddb75f3585306dc1107dbef8d0180a4b1 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 30 Mar 2017 12:35:50 -0700 Subject: [PATCH] Moves operator sub-functions into their own files. --- api/operator.go | 378 ------------------ api/operator_autopilot.go | 215 ++++++++++ api/operator_autopilot_test.go | 107 +++++ api/operator_keyring.go | 83 ++++ api/operator_keyring_test.go | 73 ++++ api/operator_raft.go | 86 ++++ api/operator_raft_test.go | 38 ++ api/operator_test.go | 206 ---------- consul/operator_autopilot_endpoint.go | 98 +++++ consul/operator_autopilot_endpoint_test.go | 285 +++++++++++++ consul/operator_endpoint.go | 290 -------------- consul/operator_raft_endpoint.go | 200 +++++++++ ...test.go => operator_raft_endpoint_test.go} | 273 ------------- 13 files changed, 1185 insertions(+), 1147 deletions(-) create mode 100644 api/operator_autopilot.go create mode 100644 api/operator_autopilot_test.go create mode 100644 api/operator_keyring.go create mode 100644 api/operator_keyring_test.go create mode 100644 api/operator_raft.go create mode 100644 api/operator_raft_test.go delete mode 100644 api/operator_test.go create mode 100644 consul/operator_autopilot_endpoint.go create mode 100644 consul/operator_autopilot_endpoint_test.go create mode 100644 consul/operator_raft_endpoint.go rename consul/{operator_endpoint_test.go => operator_raft_endpoint_test.go} (57%) diff --git a/api/operator.go b/api/operator.go index d6a1ff265d..079e224866 100644 --- a/api/operator.go +++ b/api/operator.go @@ -1,14 +1,5 @@ package api -import ( - "bytes" - "fmt" - "io" - "strconv" - "strings" - "time" -) - // Operator can be used to perform low-level operator tasks for Consul. type Operator struct { c *Client @@ -18,372 +9,3 @@ type Operator struct { func (c *Client) Operator() *Operator { return &Operator{c} } - -// RaftServer has information about a server in the Raft configuration. -type RaftServer struct { - // ID is the unique ID for the server. These are currently the same - // as the address, but they will be changed to a real GUID in a future - // release of Consul. - ID string - - // Node is the node name of the server, as known by Consul, or this - // will be set to "(unknown)" otherwise. - Node string - - // Address is the IP:port of the server, used for Raft communications. - Address string - - // Leader is true if this server is the current cluster leader. - Leader bool - - // Voter is true if this server has a vote in the cluster. This might - // be false if the server is staging and still coming online, or if - // it's a non-voting server, which will be added in a future release of - // Consul. - Voter bool -} - -// RaftConfigration is returned when querying for the current Raft configuration. -type RaftConfiguration struct { - // Servers has the list of servers in the Raft configuration. - Servers []*RaftServer - - // Index has the Raft index of this configuration. - Index uint64 -} - -// keyringRequest is used for performing Keyring operations -type keyringRequest struct { - Key string -} - -// KeyringResponse is returned when listing the gossip encryption keys -type KeyringResponse struct { - // Whether this response is for a WAN ring - WAN bool - - // The datacenter name this request corresponds to - Datacenter string - - // A map of the encryption keys to the number of nodes they're installed on - Keys map[string]int - - // The total number of nodes in this ring - NumNodes int -} - -// AutopilotConfiguration is used for querying/setting the Autopilot configuration. -// Autopilot helps manage operator tasks related to Consul servers like removing -// failed servers from the Raft quorum. -type AutopilotConfiguration struct { - // CleanupDeadServers controls whether to remove dead servers from the Raft - // peer list when a new server joins - CleanupDeadServers bool - - // LastContactThreshold is the limit on the amount of time a server can go - // without leader contact before being considered unhealthy. - LastContactThreshold *ReadableDuration - - // MaxTrailingLogs is the amount of entries in the Raft Log that a server can - // be behind before being considered unhealthy. - MaxTrailingLogs uint64 - - // ServerStabilizationTime is the minimum amount of time a server must be - // in a stable, healthy state before it can be added to the cluster. Only - // applicable with Raft protocol version 3 or higher. - ServerStabilizationTime *ReadableDuration - - // (Enterprise-only) RedundancyZoneTag is the node tag to use for separating - // servers into zones for redundancy. If left blank, this feature will be disabled. - RedundancyZoneTag string - - // (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration - // strategy of waiting until enough newer-versioned servers have been added to the - // cluster before promoting them to voters. - DisableUpgradeMigration bool - - // CreateIndex holds the index corresponding the creation of this configuration. - // This is a read-only field. - CreateIndex uint64 - - // ModifyIndex will be set to the index of the last update when retrieving the - // Autopilot configuration. Resubmitting a configuration with - // AutopilotCASConfiguration will perform a check-and-set operation which ensures - // there hasn't been a subsequent update since the configuration was retrieved. - ModifyIndex uint64 -} - -// ServerHealth is the health (from the leader's point of view) of a server. -type ServerHealth struct { - // ID is the raft ID of the server. - ID string - - // Name is the node name of the server. - Name string - - // Address is the address of the server. - Address string - - // The status of the SerfHealth check for the server. - SerfStatus string - - // Version is the Consul version of the server. - Version string - - // Leader is whether this server is currently the leader. - Leader bool - - // LastContact is the time since this node's last contact with the leader. - LastContact *ReadableDuration - - // LastTerm is the highest leader term this server has a record of in its Raft log. - LastTerm uint64 - - // LastIndex is the last log index this server has a record of in its Raft log. - LastIndex uint64 - - // Healthy is whether or not the server is healthy according to the current - // Autopilot config. - Healthy bool - - // Voter is whether this is a voting server. - Voter bool - - // StableSince is the last time this server's Healthy value changed. - StableSince time.Time -} - -// OperatorHealthReply is a representation of the overall health of the cluster -type OperatorHealthReply struct { - // Healthy is true if all the servers in the cluster are healthy. - Healthy bool - - // FailureTolerance is the number of healthy servers that could be lost without - // an outage occurring. - FailureTolerance int - - // Servers holds the health of each server. - Servers []ServerHealth -} - -// ReadableDuration is a duration type that is serialized to JSON in human readable format. -type ReadableDuration time.Duration - -func NewReadableDuration(dur time.Duration) *ReadableDuration { - d := ReadableDuration(dur) - return &d -} - -func (d *ReadableDuration) String() string { return d.Duration().String() } -func (d *ReadableDuration) Duration() time.Duration { - if d == nil { - return time.Duration(0) - } - return time.Duration(*d) -} - -func (d *ReadableDuration) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil -} - -func (d *ReadableDuration) UnmarshalJSON(raw []byte) error { - if d == nil { - return fmt.Errorf("cannot unmarshal to nil pointer") - } - - str := string(raw) - if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' { - return fmt.Errorf("must be enclosed with quotes: %s", str) - } - dur, err := time.ParseDuration(str[1 : len(str)-1]) - if err != nil { - return err - } - *d = ReadableDuration(dur) - return nil -} - -// RaftGetConfiguration is used to query the current Raft peer set. -func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { - r := op.c.newRequest("GET", "/v1/operator/raft/configuration") - r.setQueryOptions(q) - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - var out RaftConfiguration - if err := decodeBody(resp, &out); err != nil { - return nil, err - } - return &out, nil -} - -// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft -// quorum but no longer known to Serf or the catalog) by address in the form of -// "IP:port". -func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error { - r := op.c.newRequest("DELETE", "/v1/operator/raft/peer") - r.setWriteOptions(q) - - r.params.Set("address", string(address)) - - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return err - } - - resp.Body.Close() - return nil -} - -// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft -// quorum but no longer known to Serf or the catalog) by ID. -func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error { - r := op.c.newRequest("DELETE", "/v1/operator/raft/peer") - r.setWriteOptions(q) - - r.params.Set("id", string(id)) - - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return err - } - - resp.Body.Close() - return nil -} - -// KeyringInstall is used to install a new gossip encryption key into the cluster -func (op *Operator) KeyringInstall(key string, q *WriteOptions) error { - r := op.c.newRequest("POST", "/v1/operator/keyring") - r.setWriteOptions(q) - r.obj = keyringRequest{ - Key: key, - } - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// KeyringList is used to list the gossip keys installed in the cluster -func (op *Operator) KeyringList(q *QueryOptions) ([]*KeyringResponse, error) { - r := op.c.newRequest("GET", "/v1/operator/keyring") - r.setQueryOptions(q) - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - var out []*KeyringResponse - if err := decodeBody(resp, &out); err != nil { - return nil, err - } - return out, nil -} - -// KeyringRemove is used to remove a gossip encryption key from the cluster -func (op *Operator) KeyringRemove(key string, q *WriteOptions) error { - r := op.c.newRequest("DELETE", "/v1/operator/keyring") - r.setWriteOptions(q) - r.obj = keyringRequest{ - Key: key, - } - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// KeyringUse is used to change the active gossip encryption key -func (op *Operator) KeyringUse(key string, q *WriteOptions) error { - r := op.c.newRequest("PUT", "/v1/operator/keyring") - r.setWriteOptions(q) - r.obj = keyringRequest{ - Key: key, - } - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// AutopilotGetConfiguration is used to query the current Autopilot configuration. -func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) { - r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration") - r.setQueryOptions(q) - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - var out AutopilotConfiguration - if err := decodeBody(resp, &out); err != nil { - return nil, err - } - - return &out, nil -} - -// AutopilotSetConfiguration is used to set the current Autopilot configuration. -func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error { - r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration") - r.setWriteOptions(q) - r.obj = conf - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// AutopilotCASConfiguration is used to perform a Check-And-Set update on the -// Autopilot configuration. The ModifyIndex value will be respected. Returns -// true on success or false on failures. -func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) { - r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration") - r.setWriteOptions(q) - r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10)) - r.obj = conf - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return false, err - } - defer resp.Body.Close() - - var buf bytes.Buffer - if _, err := io.Copy(&buf, resp.Body); err != nil { - return false, fmt.Errorf("Failed to read response: %v", err) - } - res := strings.Contains(string(buf.Bytes()), "true") - - return res, nil -} - -// AutopilotServerHealth -func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) { - r := op.c.newRequest("GET", "/v1/operator/autopilot/health") - r.setQueryOptions(q) - _, resp, err := requireOK(op.c.doRequest(r)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - var out OperatorHealthReply - if err := decodeBody(resp, &out); err != nil { - return nil, err - } - return &out, nil -} diff --git a/api/operator_autopilot.go b/api/operator_autopilot.go new file mode 100644 index 0000000000..59471ecf99 --- /dev/null +++ b/api/operator_autopilot.go @@ -0,0 +1,215 @@ +package api + +import ( + "bytes" + "fmt" + "io" + "strconv" + "strings" + "time" +) + +// AutopilotConfiguration is used for querying/setting the Autopilot configuration. +// Autopilot helps manage operator tasks related to Consul servers like removing +// failed servers from the Raft quorum. +type AutopilotConfiguration struct { + // CleanupDeadServers controls whether to remove dead servers from the Raft + // peer list when a new server joins + CleanupDeadServers bool + + // LastContactThreshold is the limit on the amount of time a server can go + // without leader contact before being considered unhealthy. + LastContactThreshold *ReadableDuration + + // MaxTrailingLogs is the amount of entries in the Raft Log that a server can + // be behind before being considered unhealthy. + MaxTrailingLogs uint64 + + // ServerStabilizationTime is the minimum amount of time a server must be + // in a stable, healthy state before it can be added to the cluster. Only + // applicable with Raft protocol version 3 or higher. + ServerStabilizationTime *ReadableDuration + + // (Enterprise-only) RedundancyZoneTag is the node tag to use for separating + // servers into zones for redundancy. If left blank, this feature will be disabled. + RedundancyZoneTag string + + // (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration + // strategy of waiting until enough newer-versioned servers have been added to the + // cluster before promoting them to voters. + DisableUpgradeMigration bool + + // CreateIndex holds the index corresponding the creation of this configuration. + // This is a read-only field. + CreateIndex uint64 + + // ModifyIndex will be set to the index of the last update when retrieving the + // Autopilot configuration. Resubmitting a configuration with + // AutopilotCASConfiguration will perform a check-and-set operation which ensures + // there hasn't been a subsequent update since the configuration was retrieved. + ModifyIndex uint64 +} + +// ServerHealth is the health (from the leader's point of view) of a server. +type ServerHealth struct { + // ID is the raft ID of the server. + ID string + + // Name is the node name of the server. + Name string + + // Address is the address of the server. + Address string + + // The status of the SerfHealth check for the server. + SerfStatus string + + // Version is the Consul version of the server. + Version string + + // Leader is whether this server is currently the leader. + Leader bool + + // LastContact is the time since this node's last contact with the leader. + LastContact *ReadableDuration + + // LastTerm is the highest leader term this server has a record of in its Raft log. + LastTerm uint64 + + // LastIndex is the last log index this server has a record of in its Raft log. + LastIndex uint64 + + // Healthy is whether or not the server is healthy according to the current + // Autopilot config. + Healthy bool + + // Voter is whether this is a voting server. + Voter bool + + // StableSince is the last time this server's Healthy value changed. + StableSince time.Time +} + +// OperatorHealthReply is a representation of the overall health of the cluster +type OperatorHealthReply struct { + // Healthy is true if all the servers in the cluster are healthy. + Healthy bool + + // FailureTolerance is the number of healthy servers that could be lost without + // an outage occurring. + FailureTolerance int + + // Servers holds the health of each server. + Servers []ServerHealth +} + +// ReadableDuration is a duration type that is serialized to JSON in human readable format. +type ReadableDuration time.Duration + +func NewReadableDuration(dur time.Duration) *ReadableDuration { + d := ReadableDuration(dur) + return &d +} + +func (d *ReadableDuration) String() string { + return d.Duration().String() +} + +func (d *ReadableDuration) Duration() time.Duration { + if d == nil { + return time.Duration(0) + } + return time.Duration(*d) +} + +func (d *ReadableDuration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil +} + +func (d *ReadableDuration) UnmarshalJSON(raw []byte) error { + if d == nil { + return fmt.Errorf("cannot unmarshal to nil pointer") + } + + str := string(raw) + if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' { + return fmt.Errorf("must be enclosed with quotes: %s", str) + } + dur, err := time.ParseDuration(str[1 : len(str)-1]) + if err != nil { + return err + } + *d = ReadableDuration(dur) + return nil +} + +// AutopilotGetConfiguration is used to query the current Autopilot configuration. +func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) { + r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out AutopilotConfiguration + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + + return &out, nil +} + +// AutopilotSetConfiguration is used to set the current Autopilot configuration. +func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error { + r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration") + r.setWriteOptions(q) + r.obj = conf + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + +// AutopilotCASConfiguration is used to perform a Check-And-Set update on the +// Autopilot configuration. The ModifyIndex value will be respected. Returns +// true on success or false on failures. +func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) { + r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration") + r.setWriteOptions(q) + r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10)) + r.obj = conf + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return false, err + } + defer resp.Body.Close() + + var buf bytes.Buffer + if _, err := io.Copy(&buf, resp.Body); err != nil { + return false, fmt.Errorf("Failed to read response: %v", err) + } + res := strings.Contains(string(buf.Bytes()), "true") + + return res, nil +} + +// AutopilotServerHealth +func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) { + r := op.c.newRequest("GET", "/v1/operator/autopilot/health") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out OperatorHealthReply + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return &out, nil +} diff --git a/api/operator_autopilot_test.go b/api/operator_autopilot_test.go new file mode 100644 index 0000000000..3bc1b530f4 --- /dev/null +++ b/api/operator_autopilot_test.go @@ -0,0 +1,107 @@ +package api + +import ( + "fmt" + "testing" + + "github.com/hashicorp/consul/testutil" +) + +func TestOperator_AutopilotGetSetConfiguration(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + operator := c.Operator() + config, err := operator.AutopilotGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !config.CleanupDeadServers { + t.Fatalf("bad: %v", config) + } + + // Change a config setting + newConf := &AutopilotConfiguration{CleanupDeadServers: false} + if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil { + t.Fatalf("err: %v", err) + } + + config, err = operator.AutopilotGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if config.CleanupDeadServers { + t.Fatalf("bad: %v", config) + } +} + +func TestOperator_AutopilotCASConfiguration(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + operator := c.Operator() + config, err := operator.AutopilotGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !config.CleanupDeadServers { + t.Fatalf("bad: %v", config) + } + + // Pass an invalid ModifyIndex + { + newConf := &AutopilotConfiguration{ + CleanupDeadServers: false, + ModifyIndex: config.ModifyIndex - 1, + } + resp, err := operator.AutopilotCASConfiguration(newConf, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp { + t.Fatalf("bad: %v", resp) + } + } + + // Pass a valid ModifyIndex + { + newConf := &AutopilotConfiguration{ + CleanupDeadServers: false, + ModifyIndex: config.ModifyIndex, + } + resp, err := operator.AutopilotCASConfiguration(newConf, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !resp { + t.Fatalf("bad: %v", resp) + } + } +} + +func TestOperator_AutopilotServerHealth(t *testing.T) { + t.Parallel() + c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { + c.RaftProtocol = 3 + }) + defer s.Stop() + + operator := c.Operator() + if err := testutil.WaitForResult(func() (bool, error) { + out, err := operator.AutopilotServerHealth(nil) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if len(out.Servers) != 1 || + !out.Servers[0].Healthy || + out.Servers[0].Name != s.Config.NodeName { + return false, fmt.Errorf("bad: %v", out) + } + + return true, nil + }); err != nil { + t.Fatal(err) + } +} diff --git a/api/operator_keyring.go b/api/operator_keyring.go new file mode 100644 index 0000000000..4f91c35432 --- /dev/null +++ b/api/operator_keyring.go @@ -0,0 +1,83 @@ +package api + +// keyringRequest is used for performing Keyring operations +type keyringRequest struct { + Key string +} + +// KeyringResponse is returned when listing the gossip encryption keys +type KeyringResponse struct { + // Whether this response is for a WAN ring + WAN bool + + // The datacenter name this request corresponds to + Datacenter string + + // A map of the encryption keys to the number of nodes they're installed on + Keys map[string]int + + // The total number of nodes in this ring + NumNodes int +} + +// KeyringInstall is used to install a new gossip encryption key into the cluster +func (op *Operator) KeyringInstall(key string, q *WriteOptions) error { + r := op.c.newRequest("POST", "/v1/operator/keyring") + r.setWriteOptions(q) + r.obj = keyringRequest{ + Key: key, + } + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + +// KeyringList is used to list the gossip keys installed in the cluster +func (op *Operator) KeyringList(q *QueryOptions) ([]*KeyringResponse, error) { + r := op.c.newRequest("GET", "/v1/operator/keyring") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out []*KeyringResponse + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return out, nil +} + +// KeyringRemove is used to remove a gossip encryption key from the cluster +func (op *Operator) KeyringRemove(key string, q *WriteOptions) error { + r := op.c.newRequest("DELETE", "/v1/operator/keyring") + r.setWriteOptions(q) + r.obj = keyringRequest{ + Key: key, + } + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + +// KeyringUse is used to change the active gossip encryption key +func (op *Operator) KeyringUse(key string, q *WriteOptions) error { + r := op.c.newRequest("PUT", "/v1/operator/keyring") + r.setWriteOptions(q) + r.obj = keyringRequest{ + Key: key, + } + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + resp.Body.Close() + return nil +} diff --git a/api/operator_keyring_test.go b/api/operator_keyring_test.go new file mode 100644 index 0000000000..cec6511f88 --- /dev/null +++ b/api/operator_keyring_test.go @@ -0,0 +1,73 @@ +package api + +import ( + "testing" + + "github.com/hashicorp/consul/testutil" +) + +func TestOperator_KeyringInstallListPutRemove(t *testing.T) { + oldKey := "d8wu8CSUrqgtjVsvcBPmhQ==" + newKey := "qxycTi/SsePj/TZzCBmNXw==" + t.Parallel() + c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { + c.Encrypt = oldKey + }) + defer s.Stop() + + operator := c.Operator() + if err := operator.KeyringInstall(newKey, nil); err != nil { + t.Fatalf("err: %v", err) + } + + listResponses, err := operator.KeyringList(nil) + if err != nil { + t.Fatalf("err %v", err) + } + + // Make sure the new key is installed + if len(listResponses) != 2 { + t.Fatalf("bad: %v", len(listResponses)) + } + for _, response := range listResponses { + if len(response.Keys) != 2 { + t.Fatalf("bad: %v", len(response.Keys)) + } + if _, ok := response.Keys[oldKey]; !ok { + t.Fatalf("bad: %v", ok) + } + if _, ok := response.Keys[newKey]; !ok { + t.Fatalf("bad: %v", ok) + } + } + + // Switch the primary to the new key + if err := operator.KeyringUse(newKey, nil); err != nil { + t.Fatalf("err: %v", err) + } + + if err := operator.KeyringRemove(oldKey, nil); err != nil { + t.Fatalf("err: %v", err) + } + + listResponses, err = operator.KeyringList(nil) + if err != nil { + t.Fatalf("err %v", err) + } + + // Make sure the old key is removed + if len(listResponses) != 2 { + t.Fatalf("bad: %v", len(listResponses)) + } + for _, response := range listResponses { + if len(response.Keys) != 1 { + t.Fatalf("bad: %v", len(response.Keys)) + } + if _, ok := response.Keys[oldKey]; ok { + t.Fatalf("bad: %v", ok) + } + if _, ok := response.Keys[newKey]; !ok { + t.Fatalf("bad: %v", ok) + } + } +} diff --git a/api/operator_raft.go b/api/operator_raft.go new file mode 100644 index 0000000000..5f3c25b131 --- /dev/null +++ b/api/operator_raft.go @@ -0,0 +1,86 @@ +package api + +// RaftServer has information about a server in the Raft configuration. +type RaftServer struct { + // ID is the unique ID for the server. These are currently the same + // as the address, but they will be changed to a real GUID in a future + // release of Consul. + ID string + + // Node is the node name of the server, as known by Consul, or this + // will be set to "(unknown)" otherwise. + Node string + + // Address is the IP:port of the server, used for Raft communications. + Address string + + // Leader is true if this server is the current cluster leader. + Leader bool + + // Voter is true if this server has a vote in the cluster. This might + // be false if the server is staging and still coming online, or if + // it's a non-voting server, which will be added in a future release of + // Consul. + Voter bool +} + +// RaftConfigration is returned when querying for the current Raft configuration. +type RaftConfiguration struct { + // Servers has the list of servers in the Raft configuration. + Servers []*RaftServer + + // Index has the Raft index of this configuration. + Index uint64 +} + +// RaftGetConfiguration is used to query the current Raft peer set. +func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { + r := op.c.newRequest("GET", "/v1/operator/raft/configuration") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out RaftConfiguration + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return &out, nil +} + +// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft +// quorum but no longer known to Serf or the catalog) by address in the form of +// "IP:port". +func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error { + r := op.c.newRequest("DELETE", "/v1/operator/raft/peer") + r.setWriteOptions(q) + + r.params.Set("address", string(address)) + + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + + resp.Body.Close() + return nil +} + +// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft +// quorum but no longer known to Serf or the catalog) by ID. +func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error { + r := op.c.newRequest("DELETE", "/v1/operator/raft/peer") + r.setWriteOptions(q) + + r.params.Set("id", string(id)) + + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + + resp.Body.Close() + return nil +} diff --git a/api/operator_raft_test.go b/api/operator_raft_test.go new file mode 100644 index 0000000000..f9d242b810 --- /dev/null +++ b/api/operator_raft_test.go @@ -0,0 +1,38 @@ +package api + +import ( + "strings" + "testing" +) + +func TestOperator_RaftGetConfiguration(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + operator := c.Operator() + out, err := operator.RaftGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(out.Servers) != 1 || + !out.Servers[0].Leader || + !out.Servers[0].Voter { + t.Fatalf("bad: %v", out) + } +} + +func TestOperator_RaftRemovePeerByAddress(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + // If we get this error, it proves we sent the address all the way + // through. + operator := c.Operator() + err := operator.RaftRemovePeerByAddress("nope", nil) + if err == nil || !strings.Contains(err.Error(), + "address \"nope\" was not found in the Raft configuration") { + t.Fatalf("err: %v", err) + } +} diff --git a/api/operator_test.go b/api/operator_test.go deleted file mode 100644 index 717f04e8f8..0000000000 --- a/api/operator_test.go +++ /dev/null @@ -1,206 +0,0 @@ -package api - -import ( - "fmt" - "strings" - "testing" - - "github.com/hashicorp/consul/testutil" -) - -func TestOperator_RaftGetConfiguration(t *testing.T) { - t.Parallel() - c, s := makeClient(t) - defer s.Stop() - - operator := c.Operator() - out, err := operator.RaftGetConfiguration(nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if len(out.Servers) != 1 || - !out.Servers[0].Leader || - !out.Servers[0].Voter { - t.Fatalf("bad: %v", out) - } -} - -func TestOperator_RaftRemovePeerByAddress(t *testing.T) { - t.Parallel() - c, s := makeClient(t) - defer s.Stop() - - // If we get this error, it proves we sent the address all the way - // through. - operator := c.Operator() - err := operator.RaftRemovePeerByAddress("nope", nil) - if err == nil || !strings.Contains(err.Error(), - "address \"nope\" was not found in the Raft configuration") { - t.Fatalf("err: %v", err) - } -} - -func TestOperator_KeyringInstallListPutRemove(t *testing.T) { - oldKey := "d8wu8CSUrqgtjVsvcBPmhQ==" - newKey := "qxycTi/SsePj/TZzCBmNXw==" - t.Parallel() - c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { - c.Encrypt = oldKey - }) - defer s.Stop() - - operator := c.Operator() - if err := operator.KeyringInstall(newKey, nil); err != nil { - t.Fatalf("err: %v", err) - } - - listResponses, err := operator.KeyringList(nil) - if err != nil { - t.Fatalf("err %v", err) - } - - // Make sure the new key is installed - if len(listResponses) != 2 { - t.Fatalf("bad: %v", len(listResponses)) - } - for _, response := range listResponses { - if len(response.Keys) != 2 { - t.Fatalf("bad: %v", len(response.Keys)) - } - if _, ok := response.Keys[oldKey]; !ok { - t.Fatalf("bad: %v", ok) - } - if _, ok := response.Keys[newKey]; !ok { - t.Fatalf("bad: %v", ok) - } - } - - // Switch the primary to the new key - if err := operator.KeyringUse(newKey, nil); err != nil { - t.Fatalf("err: %v", err) - } - - if err := operator.KeyringRemove(oldKey, nil); err != nil { - t.Fatalf("err: %v", err) - } - - listResponses, err = operator.KeyringList(nil) - if err != nil { - t.Fatalf("err %v", err) - } - - // Make sure the old key is removed - if len(listResponses) != 2 { - t.Fatalf("bad: %v", len(listResponses)) - } - for _, response := range listResponses { - if len(response.Keys) != 1 { - t.Fatalf("bad: %v", len(response.Keys)) - } - if _, ok := response.Keys[oldKey]; ok { - t.Fatalf("bad: %v", ok) - } - if _, ok := response.Keys[newKey]; !ok { - t.Fatalf("bad: %v", ok) - } - } -} - -func TestOperator_AutopilotGetSetConfiguration(t *testing.T) { - t.Parallel() - c, s := makeClient(t) - defer s.Stop() - - operator := c.Operator() - config, err := operator.AutopilotGetConfiguration(nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if !config.CleanupDeadServers { - t.Fatalf("bad: %v", config) - } - - // Change a config setting - newConf := &AutopilotConfiguration{CleanupDeadServers: false} - if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil { - t.Fatalf("err: %v", err) - } - - config, err = operator.AutopilotGetConfiguration(nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if config.CleanupDeadServers { - t.Fatalf("bad: %v", config) - } -} - -func TestOperator_AutopilotCASConfiguration(t *testing.T) { - t.Parallel() - c, s := makeClient(t) - defer s.Stop() - - operator := c.Operator() - config, err := operator.AutopilotGetConfiguration(nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if !config.CleanupDeadServers { - t.Fatalf("bad: %v", config) - } - - // Pass an invalid ModifyIndex - { - newConf := &AutopilotConfiguration{ - CleanupDeadServers: false, - ModifyIndex: config.ModifyIndex - 1, - } - resp, err := operator.AutopilotCASConfiguration(newConf, nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if resp { - t.Fatalf("bad: %v", resp) - } - } - - // Pass a valid ModifyIndex - { - newConf := &AutopilotConfiguration{ - CleanupDeadServers: false, - ModifyIndex: config.ModifyIndex, - } - resp, err := operator.AutopilotCASConfiguration(newConf, nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if !resp { - t.Fatalf("bad: %v", resp) - } - } -} - -func TestOperator_ServerHealth(t *testing.T) { - t.Parallel() - c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { - c.RaftProtocol = 3 - }) - defer s.Stop() - - operator := c.Operator() - if err := testutil.WaitForResult(func() (bool, error) { - out, err := operator.AutopilotServerHealth(nil) - if err != nil { - return false, fmt.Errorf("err: %v", err) - } - if len(out.Servers) != 1 || - !out.Servers[0].Healthy || - out.Servers[0].Name != s.Config.NodeName { - return false, fmt.Errorf("bad: %v", out) - } - - return true, nil - }); err != nil { - t.Fatal(err) - } -} diff --git a/consul/operator_autopilot_endpoint.go b/consul/operator_autopilot_endpoint.go new file mode 100644 index 0000000000..abe21dd895 --- /dev/null +++ b/consul/operator_autopilot_endpoint.go @@ -0,0 +1,98 @@ +package consul + +import ( + "fmt" + + "github.com/hashicorp/consul/consul/structs" +) + +// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. +func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error { + if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator read access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorRead() { + return permissionDeniedErr + } + + state := op.srv.fsm.State() + _, config, err := state.AutopilotConfig() + if err != nil { + return err + } + + *reply = *config + + return nil +} + +// AutopilotSetConfiguration is used to set the current Autopilot configuration. +func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { + if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator write access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorWrite() { + return permissionDeniedErr + } + + // Apply the update + resp, err := op.srv.raftApply(structs.AutopilotRequestType, args) + if err != nil { + op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + // Check if the return type is a bool. + if respBool, ok := resp.(bool); ok { + *reply = respBool + } + return nil +} + +// ServerHealth is used to get the current health of the servers. +func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error { + // This must be sent to the leader, so we fix the args since we are + // re-using a structure where we don't support all the options. + args.RequireConsistent = true + args.AllowStale = false + if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done { + return err + } + + // This action requires operator read access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorRead() { + return permissionDeniedErr + } + + // Exit early if the min Raft version is too low + minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers()) + if err != nil { + return fmt.Errorf("error getting server raft protocol versions: %s", err) + } + if minRaftProtocol < 3 { + return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") + } + + *reply = op.srv.getClusterHealth() + + return nil +} diff --git a/consul/operator_autopilot_endpoint_test.go b/consul/operator_autopilot_endpoint_test.go new file mode 100644 index 0000000000..963acc22ae --- /dev/null +++ b/consul/operator_autopilot_endpoint_test.go @@ -0,0 +1,285 @@ +package consul + +import ( + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/raft" +) + +func TestOperator_Autopilot_GetConfiguration(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.AutopilotConfig.CleanupDeadServers = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.AutopilotConfig + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + if reply.CleanupDeadServers { + t.Fatalf("bad: %#v", reply) + } +} + +func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.AutopilotConfig.CleanupDeadServers = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Try to get config without permissions + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.AutopilotConfig + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL with operator read permissions. + var token string + { + var rules = ` + operator = "read" + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now we can read and verify the config + arg.Token = token + err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + if reply.CleanupDeadServers { + t.Fatalf("bad: %#v", reply) + } +} + +func TestOperator_Autopilot_SetConfiguration(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.AutopilotConfig.CleanupDeadServers = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Change the autopilot config from the default + arg := structs.AutopilotSetConfigRequest{ + Datacenter: "dc1", + Config: structs.AutopilotConfig{ + CleanupDeadServers: true, + }, + } + var reply *bool + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's changed + state := s1.fsm.State() + _, config, err := state.AutopilotConfig() + if err != nil { + t.Fatal(err) + } + if !config.CleanupDeadServers { + t.Fatalf("bad: %#v", config) + } +} + +func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.AutopilotConfig.CleanupDeadServers = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Try to set config without permissions + arg := structs.AutopilotSetConfigRequest{ + Datacenter: "dc1", + Config: structs.AutopilotConfig{ + CleanupDeadServers: true, + }, + } + var reply *bool + err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL with operator write permissions. + var token string + { + var rules = ` + operator = "write" + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now we can update the config + arg.Token = token + err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's changed + state := s1.fsm.State() + _, config, err := state.AutopilotConfig() + if err != nil { + t.Fatal(err) + } + if !config.CleanupDeadServers { + t.Fatalf("bad: %#v", config) + } +} + +func TestOperator_ServerHealth(t *testing.T) { + conf := func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = false + c.BootstrapExpect = 3 + c.RaftConfig.ProtocolVersion = 3 + c.ServerHealthInterval = 100 * time.Millisecond + c.AutopilotInterval = 100 * time.Millisecond + } + dir1, s1 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + dir2, s2 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + dir3, s3 := testServerWithConfig(t, conf) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + if err := testutil.WaitForResult(func() (bool, error) { + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.OperatorHealthReply + err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) + if err != nil { + return false, fmt.Errorf("err: %v", err) + } + if !reply.Healthy { + return false, fmt.Errorf("bad: %v", reply) + } + if reply.FailureTolerance != 1 { + return false, fmt.Errorf("bad: %v", reply) + } + if len(reply.Servers) != 3 { + return false, fmt.Errorf("bad: %v", reply) + } + // Leader should have LastContact == 0, others should be positive + for _, s := range reply.Servers { + isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address) + if isLeader && s.LastContact != 0 { + return false, fmt.Errorf("bad: %v", reply) + } + if !isLeader && s.LastContact <= 0 { + return false, fmt.Errorf("bad: %v", reply) + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.RaftConfig.ProtocolVersion = 2 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.OperatorHealthReply + err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") { + t.Fatalf("bad: %v", err) + } +} diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index e77fae903e..477a1d85d1 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -1,296 +1,6 @@ package consul -import ( - "fmt" - "net" - - "github.com/hashicorp/consul/consul/agent" - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/raft" - "github.com/hashicorp/serf/serf" -) - // Operator endpoint is used to perform low-level operator tasks for Consul. type Operator struct { srv *Server } - -// RaftGetConfiguration is used to retrieve the current Raft configuration. -func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error { - if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done { - return err - } - - // This action requires operator read access. - acl, err := op.srv.resolveToken(args.Token) - if err != nil { - return err - } - if acl != nil && !acl.OperatorRead() { - return permissionDeniedErr - } - - // We can't fetch the leader and the configuration atomically with - // the current Raft API. - future := op.srv.raft.GetConfiguration() - if err := future.Error(); err != nil { - return err - } - - // Index the Consul information about the servers. - serverMap := make(map[raft.ServerAddress]serf.Member) - for _, member := range op.srv.serfLAN.Members() { - valid, parts := agent.IsConsulServer(member) - if !valid { - continue - } - - addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String() - serverMap[raft.ServerAddress(addr)] = member - } - - // Fill out the reply. - leader := op.srv.raft.Leader() - reply.Index = future.Index() - for _, server := range future.Configuration().Servers { - node := "(unknown)" - if member, ok := serverMap[server.Address]; ok { - node = member.Name - } - - entry := &structs.RaftServer{ - ID: server.ID, - Node: node, - Address: server.Address, - Leader: server.Address == leader, - Voter: server.Suffrage == raft.Voter, - } - reply.Servers = append(reply.Servers, entry) - } - return nil -} - -// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft -// quorum but no longer known to Serf or the catalog) by address in the form of -// "IP:port". The reply argument is not used, but it required to fulfill the RPC -// interface. -func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done { - return err - } - - // This is a super dangerous operation that requires operator write - // access. - acl, err := op.srv.resolveToken(args.Token) - if err != nil { - return err - } - if acl != nil && !acl.OperatorWrite() { - return permissionDeniedErr - } - - // Since this is an operation designed for humans to use, we will return - // an error if the supplied address isn't among the peers since it's - // likely they screwed up. - { - future := op.srv.raft.GetConfiguration() - if err := future.Error(); err != nil { - return err - } - for _, s := range future.Configuration().Servers { - if s.Address == args.Address { - args.ID = s.ID - goto REMOVE - } - } - return fmt.Errorf("address %q was not found in the Raft configuration", - args.Address) - } - -REMOVE: - // The Raft library itself will prevent various forms of foot-shooting, - // like making a configuration with no voters. Some consideration was - // given here to adding more checks, but it was decided to make this as - // low-level and direct as possible. We've got ACL coverage to lock this - // down, and if you are an operator, it's assumed you know what you are - // doing if you are calling this. If you remove a peer that's known to - // Serf, for example, it will come back when the leader does a reconcile - // pass. - minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) - if err != nil { - return err - } - - var future raft.Future - if minRaftProtocol >= 2 { - future = op.srv.raft.RemoveServer(args.ID, 0, 0) - } else { - future = op.srv.raft.RemovePeer(args.Address) - } - if err := future.Error(); err != nil { - op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v", - args.Address, err) - return err - } - - op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address) - return nil -} - -// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft -// quorum but no longer known to Serf or the catalog) by address in the form of -// "IP:port". The reply argument is not used, but is required to fulfill the RPC -// interface. -func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done { - return err - } - - // This is a super dangerous operation that requires operator write - // access. - acl, err := op.srv.resolveToken(args.Token) - if err != nil { - return err - } - if acl != nil && !acl.OperatorWrite() { - return permissionDeniedErr - } - - // Since this is an operation designed for humans to use, we will return - // an error if the supplied id isn't among the peers since it's - // likely they screwed up. - { - future := op.srv.raft.GetConfiguration() - if err := future.Error(); err != nil { - return err - } - for _, s := range future.Configuration().Servers { - if s.ID == args.ID { - args.Address = s.Address - goto REMOVE - } - } - return fmt.Errorf("id %q was not found in the Raft configuration", - args.ID) - } - -REMOVE: - // The Raft library itself will prevent various forms of foot-shooting, - // like making a configuration with no voters. Some consideration was - // given here to adding more checks, but it was decided to make this as - // low-level and direct as possible. We've got ACL coverage to lock this - // down, and if you are an operator, it's assumed you know what you are - // doing if you are calling this. If you remove a peer that's known to - // Serf, for example, it will come back when the leader does a reconcile - // pass. - minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) - if err != nil { - return err - } - - var future raft.Future - if minRaftProtocol >= 2 { - future = op.srv.raft.RemoveServer(args.ID, 0, 0) - } else { - future = op.srv.raft.RemovePeer(args.Address) - } - if err := future.Error(); err != nil { - op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v", - args.ID, err) - return err - } - - op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID) - return nil -} - -// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. -func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error { - if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { - return err - } - - // This action requires operator read access. - acl, err := op.srv.resolveToken(args.Token) - if err != nil { - return err - } - if acl != nil && !acl.OperatorRead() { - return permissionDeniedErr - } - - state := op.srv.fsm.State() - _, config, err := state.AutopilotConfig() - if err != nil { - return err - } - - *reply = *config - - return nil -} - -// AutopilotSetConfiguration is used to set the current Autopilot configuration. -func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { - if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done { - return err - } - - // This action requires operator write access. - acl, err := op.srv.resolveToken(args.Token) - if err != nil { - return err - } - if acl != nil && !acl.OperatorWrite() { - return permissionDeniedErr - } - - // Apply the update - resp, err := op.srv.raftApply(structs.AutopilotRequestType, args) - if err != nil { - op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err) - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - // Check if the return type is a bool. - if respBool, ok := resp.(bool); ok { - *reply = respBool - } - return nil -} - -// ServerHealth is used to get the current health of the servers. -func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error { - // This must be sent to the leader, so we fix the args since we are - // re-using a structure where we don't support all the options. - args.RequireConsistent = true - args.AllowStale = false - if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done { - return err - } - - // This action requires operator read access. - acl, err := op.srv.resolveToken(args.Token) - if err != nil { - return err - } - if acl != nil && !acl.OperatorRead() { - return permissionDeniedErr - } - - // Exit early if the min Raft version is too low - minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers()) - if err != nil { - return fmt.Errorf("error getting server raft protocol versions: %s", err) - } - if minRaftProtocol < 3 { - return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") - } - - *reply = op.srv.getClusterHealth() - - return nil -} diff --git a/consul/operator_raft_endpoint.go b/consul/operator_raft_endpoint.go new file mode 100644 index 0000000000..017a91bfe2 --- /dev/null +++ b/consul/operator_raft_endpoint.go @@ -0,0 +1,200 @@ +package consul + +import ( + "fmt" + "net" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" +) + +// RaftGetConfiguration is used to retrieve the current Raft configuration. +func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error { + if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator read access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorRead() { + return permissionDeniedErr + } + + // We can't fetch the leader and the configuration atomically with + // the current Raft API. + future := op.srv.raft.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + + // Index the Consul information about the servers. + serverMap := make(map[raft.ServerAddress]serf.Member) + for _, member := range op.srv.serfLAN.Members() { + valid, parts := agent.IsConsulServer(member) + if !valid { + continue + } + + addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String() + serverMap[raft.ServerAddress(addr)] = member + } + + // Fill out the reply. + leader := op.srv.raft.Leader() + reply.Index = future.Index() + for _, server := range future.Configuration().Servers { + node := "(unknown)" + if member, ok := serverMap[server.Address]; ok { + node = member.Name + } + + entry := &structs.RaftServer{ + ID: server.ID, + Node: node, + Address: server.Address, + Leader: server.Address == leader, + Voter: server.Suffrage == raft.Voter, + } + reply.Servers = append(reply.Servers, entry) + } + return nil +} + +// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft +// quorum but no longer known to Serf or the catalog) by address in the form of +// "IP:port". The reply argument is not used, but it required to fulfill the RPC +// interface. +func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error { + if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done { + return err + } + + // This is a super dangerous operation that requires operator write + // access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorWrite() { + return permissionDeniedErr + } + + // Since this is an operation designed for humans to use, we will return + // an error if the supplied address isn't among the peers since it's + // likely they screwed up. + { + future := op.srv.raft.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + for _, s := range future.Configuration().Servers { + if s.Address == args.Address { + args.ID = s.ID + goto REMOVE + } + } + return fmt.Errorf("address %q was not found in the Raft configuration", + args.Address) + } + +REMOVE: + // The Raft library itself will prevent various forms of foot-shooting, + // like making a configuration with no voters. Some consideration was + // given here to adding more checks, but it was decided to make this as + // low-level and direct as possible. We've got ACL coverage to lock this + // down, and if you are an operator, it's assumed you know what you are + // doing if you are calling this. If you remove a peer that's known to + // Serf, for example, it will come back when the leader does a reconcile + // pass. + minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) + if err != nil { + return err + } + + var future raft.Future + if minRaftProtocol >= 2 { + future = op.srv.raft.RemoveServer(args.ID, 0, 0) + } else { + future = op.srv.raft.RemovePeer(args.Address) + } + if err := future.Error(); err != nil { + op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v", + args.Address, err) + return err + } + + op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address) + return nil +} + +// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft +// quorum but no longer known to Serf or the catalog) by address in the form of +// "IP:port". The reply argument is not used, but is required to fulfill the RPC +// interface. +func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error { + if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done { + return err + } + + // This is a super dangerous operation that requires operator write + // access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorWrite() { + return permissionDeniedErr + } + + // Since this is an operation designed for humans to use, we will return + // an error if the supplied id isn't among the peers since it's + // likely they screwed up. + { + future := op.srv.raft.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + for _, s := range future.Configuration().Servers { + if s.ID == args.ID { + args.Address = s.Address + goto REMOVE + } + } + return fmt.Errorf("id %q was not found in the Raft configuration", + args.ID) + } + +REMOVE: + // The Raft library itself will prevent various forms of foot-shooting, + // like making a configuration with no voters. Some consideration was + // given here to adding more checks, but it was decided to make this as + // low-level and direct as possible. We've got ACL coverage to lock this + // down, and if you are an operator, it's assumed you know what you are + // doing if you are calling this. If you remove a peer that's known to + // Serf, for example, it will come back when the leader does a reconcile + // pass. + minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) + if err != nil { + return err + } + + var future raft.Future + if minRaftProtocol >= 2 { + future = op.srv.raft.RemoveServer(args.ID, 0, 0) + } else { + future = op.srv.raft.RemovePeer(args.Address) + } + if err := future.Error(); err != nil { + op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v", + args.ID, err) + return err + } + + op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID) + return nil +} diff --git a/consul/operator_endpoint_test.go b/consul/operator_raft_endpoint_test.go similarity index 57% rename from consul/operator_endpoint_test.go rename to consul/operator_raft_endpoint_test.go index de9a060c86..58676b6d9e 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_raft_endpoint_test.go @@ -7,8 +7,6 @@ import ( "strings" "testing" - "time" - "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/net-rpc-msgpackrpc" @@ -361,274 +359,3 @@ func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) { t.Fatalf("err: %v", err) } } - -func TestOperator_Autopilot_GetConfiguration(t *testing.T) { - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.AutopilotConfig.CleanupDeadServers = false - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - testutil.WaitForLeader(t, s1.RPC, "dc1") - - arg := structs.DCSpecificRequest{ - Datacenter: "dc1", - } - var reply structs.AutopilotConfig - err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) - if err != nil { - t.Fatalf("err: %v", err) - } - if reply.CleanupDeadServers { - t.Fatalf("bad: %#v", reply) - } -} - -func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) { - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.ACLDatacenter = "dc1" - c.ACLMasterToken = "root" - c.ACLDefaultPolicy = "deny" - c.AutopilotConfig.CleanupDeadServers = false - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - testutil.WaitForLeader(t, s1.RPC, "dc1") - - // Try to get config without permissions - arg := structs.DCSpecificRequest{ - Datacenter: "dc1", - } - var reply structs.AutopilotConfig - err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) - if err == nil || !strings.Contains(err.Error(), permissionDenied) { - t.Fatalf("err: %v", err) - } - - // Create an ACL with operator read permissions. - var token string - { - var rules = ` - operator = "read" - ` - - req := structs.ACLRequest{ - Datacenter: "dc1", - Op: structs.ACLSet, - ACL: structs.ACL{ - Name: "User token", - Type: structs.ACLTypeClient, - Rules: rules, - }, - WriteRequest: structs.WriteRequest{Token: "root"}, - } - if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { - t.Fatalf("err: %v", err) - } - } - - // Now we can read and verify the config - arg.Token = token - err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply) - if err != nil { - t.Fatalf("err: %v", err) - } - if reply.CleanupDeadServers { - t.Fatalf("bad: %#v", reply) - } -} - -func TestOperator_Autopilot_SetConfiguration(t *testing.T) { - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.AutopilotConfig.CleanupDeadServers = false - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - testutil.WaitForLeader(t, s1.RPC, "dc1") - - // Change the autopilot config from the default - arg := structs.AutopilotSetConfigRequest{ - Datacenter: "dc1", - Config: structs.AutopilotConfig{ - CleanupDeadServers: true, - }, - } - var reply *bool - err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Make sure it's changed - state := s1.fsm.State() - _, config, err := state.AutopilotConfig() - if err != nil { - t.Fatal(err) - } - if !config.CleanupDeadServers { - t.Fatalf("bad: %#v", config) - } -} - -func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) { - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.ACLDatacenter = "dc1" - c.ACLMasterToken = "root" - c.ACLDefaultPolicy = "deny" - c.AutopilotConfig.CleanupDeadServers = false - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - testutil.WaitForLeader(t, s1.RPC, "dc1") - - // Try to set config without permissions - arg := structs.AutopilotSetConfigRequest{ - Datacenter: "dc1", - Config: structs.AutopilotConfig{ - CleanupDeadServers: true, - }, - } - var reply *bool - err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) - if err == nil || !strings.Contains(err.Error(), permissionDenied) { - t.Fatalf("err: %v", err) - } - - // Create an ACL with operator write permissions. - var token string - { - var rules = ` - operator = "write" - ` - - req := structs.ACLRequest{ - Datacenter: "dc1", - Op: structs.ACLSet, - ACL: structs.ACL{ - Name: "User token", - Type: structs.ACLTypeClient, - Rules: rules, - }, - WriteRequest: structs.WriteRequest{Token: "root"}, - } - if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { - t.Fatalf("err: %v", err) - } - } - - // Now we can update the config - arg.Token = token - err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Make sure it's changed - state := s1.fsm.State() - _, config, err := state.AutopilotConfig() - if err != nil { - t.Fatal(err) - } - if !config.CleanupDeadServers { - t.Fatalf("bad: %#v", config) - } -} - -func TestOperator_ServerHealth(t *testing.T) { - conf := func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = false - c.BootstrapExpect = 3 - c.RaftConfig.ProtocolVersion = 3 - c.ServerHealthInterval = 100 * time.Millisecond - c.AutopilotInterval = 100 * time.Millisecond - } - dir1, s1 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - dir2, s2 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - addr := fmt.Sprintf("127.0.0.1:%d", - s1.config.SerfLANConfig.MemberlistConfig.BindPort) - if _, err := s2.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - - dir3, s3 := testServerWithConfig(t, conf) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - if _, err := s3.JoinLAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - - testutil.WaitForLeader(t, s1.RPC, "dc1") - - if err := testutil.WaitForResult(func() (bool, error) { - arg := structs.DCSpecificRequest{ - Datacenter: "dc1", - } - var reply structs.OperatorHealthReply - err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) - if err != nil { - return false, fmt.Errorf("err: %v", err) - } - if !reply.Healthy { - return false, fmt.Errorf("bad: %v", reply) - } - if reply.FailureTolerance != 1 { - return false, fmt.Errorf("bad: %v", reply) - } - if len(reply.Servers) != 3 { - return false, fmt.Errorf("bad: %v", reply) - } - // Leader should have LastContact == 0, others should be positive - for _, s := range reply.Servers { - isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address) - if isLeader && s.LastContact != 0 { - return false, fmt.Errorf("bad: %v", reply) - } - if !isLeader && s.LastContact <= 0 { - return false, fmt.Errorf("bad: %v", reply) - } - } - return true, nil - }); err != nil { - t.Fatal(err) - } -} - -func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) { - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = true - c.RaftConfig.ProtocolVersion = 2 - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - arg := structs.DCSpecificRequest{ - Datacenter: "dc1", - } - var reply structs.OperatorHealthReply - err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply) - if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") { - t.Fatalf("bad: %v", err) - } -}