From e5850d8a2606da8abfaa5c75100b8285e198cb84 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 29 Aug 2016 19:09:57 -0700 Subject: [PATCH] Adds new consul operator endpoint, CLI, and ACL and some basic Raft commands. --- acl/acl.go | 45 +++- acl/acl_test.go | 49 +++- acl/policy.go | 6 + acl/policy_test.go | 29 ++- api/operator.go | 71 ++++++ api/operator_test.go | 38 +++ command/agent/http.go | 3 + command/agent/operator_endpoint.go | 57 +++++ command/agent/operator_endpoint_test.go | 58 +++++ command/operator.go | 180 ++++++++++++++ command/operator_test.go | 52 ++++ commands.go | 6 + consul/operator_endpoint.go | 120 +++++++++ consul/operator_endpoint_test.go | 229 ++++++++++++++++++ consul/server.go | 39 +-- consul/structs/operator.go | 42 ++++ .../docs/agent/http/operator.html.markdown | 48 ++++ .../source/docs/commands/index.html.markdown | 1 + .../docs/commands/operator.html.markdown | 100 ++++++++ website/source/layouts/docs.erb | 8 + 20 files changed, 1156 insertions(+), 25 deletions(-) create mode 100644 api/operator.go create mode 100644 api/operator_test.go create mode 100644 command/agent/operator_endpoint.go create mode 100644 command/agent/operator_endpoint_test.go create mode 100644 command/operator.go create mode 100644 command/operator_test.go create mode 100644 consul/operator_endpoint.go create mode 100644 consul/operator_endpoint_test.go create mode 100644 consul/structs/operator.go create mode 100644 website/source/docs/agent/http/operator.html.markdown create mode 100644 website/source/docs/commands/operator.html.markdown diff --git a/acl/acl.go b/acl/acl.go index 33f5e23372..f13dc5b569 100644 --- a/acl/acl.go +++ b/acl/acl.go @@ -73,6 +73,14 @@ type ACL interface { // KeyringWrite determines if the keyring can be manipulated KeyringWrite() bool + // OperatorRead determines if the read-only Consul operator functions + // can be used. + OperatorRead() bool + + // OperatorWrite determines if the state-changing Consul operator + // functions can be used. + OperatorWrite() bool + // ACLList checks for permission to list all the ACLs ACLList() bool @@ -132,6 +140,14 @@ func (s *StaticACL) KeyringWrite() bool { return s.defaultAllow } +func (s *StaticACL) OperatorRead() bool { + return s.defaultAllow +} + +func (s *StaticACL) OperatorWrite() bool { + return s.defaultAllow +} + func (s *StaticACL) ACLList() bool { return s.allowManage } @@ -188,10 +204,13 @@ type PolicyACL struct { // preparedQueryRules contains the prepared query policies preparedQueryRules *radix.Tree - // keyringRules contains the keyring policies. The keyring has + // keyringRule contains the keyring policies. The keyring has // a very simple yes/no without prefix matching, so here we // don't need to use a radix tree. keyringRule string + + // operatorRule contains the operator policies. + operatorRule string } // New is used to construct a policy based ACL from a set of policies @@ -228,6 +247,9 @@ func New(parent ACL, policy *Policy) (*PolicyACL, error) { // Load the keyring policy p.keyringRule = policy.Keyring + // Load the operator policy + p.operatorRule = policy.Operator + return p, nil } @@ -422,6 +444,27 @@ func (p *PolicyACL) KeyringWrite() bool { return p.parent.KeyringWrite() } +// OperatorRead determines if the read-only operator functions are allowed. +func (p *PolicyACL) OperatorRead() bool { + switch p.operatorRule { + case PolicyRead, PolicyWrite: + return true + case PolicyDeny: + return false + default: + return p.parent.OperatorRead() + } +} + +// OperatorWrite determines if the state-changing operator functions are +// allowed. +func (p *PolicyACL) OperatorWrite() bool { + if p.operatorRule == PolicyWrite { + return true + } + return p.parent.OperatorWrite() +} + // ACLList checks if listing of ACLs is allowed func (p *PolicyACL) ACLList() bool { return p.parent.ACLList() diff --git a/acl/acl_test.go b/acl/acl_test.go index 69c4f0a1bf..aa1c7972d4 100644 --- a/acl/acl_test.go +++ b/acl/acl_test.go @@ -65,6 +65,12 @@ func TestStaticACL(t *testing.T) { if !all.KeyringWrite() { t.Fatalf("should allow") } + if !all.OperatorRead() { + t.Fatalf("should allow") + } + if !all.OperatorWrite() { + t.Fatalf("should allow") + } if all.ACLList() { t.Fatalf("should not allow") } @@ -108,6 +114,12 @@ func TestStaticACL(t *testing.T) { if none.KeyringWrite() { t.Fatalf("should not allow") } + if none.OperatorRead() { + t.Fatalf("should now allow") + } + if none.OperatorWrite() { + t.Fatalf("should not allow") + } if none.ACLList() { t.Fatalf("should not allow") } @@ -145,6 +157,12 @@ func TestStaticACL(t *testing.T) { if !manage.KeyringWrite() { t.Fatalf("should allow") } + if !manage.OperatorRead() { + t.Fatalf("should allow") + } + if !manage.OperatorWrite() { + t.Fatalf("should allow") + } if !manage.ACLList() { t.Fatalf("should allow") } @@ -480,19 +498,18 @@ func TestPolicyACL_Parent(t *testing.T) { } func TestPolicyACL_Keyring(t *testing.T) { - // Test keyring ACLs type keyringcase struct { inp string read bool write bool } - keyringcases := []keyringcase{ + cases := []keyringcase{ {"", false, false}, {PolicyRead, true, false}, {PolicyWrite, true, true}, {PolicyDeny, false, false}, } - for _, c := range keyringcases { + for _, c := range cases { acl, err := New(DenyAll(), &Policy{Keyring: c.inp}) if err != nil { t.Fatalf("bad: %s", err) @@ -505,3 +522,29 @@ func TestPolicyACL_Keyring(t *testing.T) { } } } + +func TestPolicyACL_Operator(t *testing.T) { + type operatorcase struct { + inp string + read bool + write bool + } + cases := []operatorcase{ + {"", false, false}, + {PolicyRead, true, false}, + {PolicyWrite, true, true}, + {PolicyDeny, false, false}, + } + for _, c := range cases { + acl, err := New(DenyAll(), &Policy{Operator: c.inp}) + if err != nil { + t.Fatalf("bad: %s", err) + } + if acl.OperatorRead() != c.read { + t.Fatalf("bad: %#v", c) + } + if acl.OperatorWrite() != c.write { + t.Fatalf("bad: %#v", c) + } + } +} diff --git a/acl/policy.go b/acl/policy.go index a0e56da425..ae69067fea 100644 --- a/acl/policy.go +++ b/acl/policy.go @@ -21,6 +21,7 @@ type Policy struct { Events []*EventPolicy `hcl:"event,expand"` PreparedQueries []*PreparedQueryPolicy `hcl:"query,expand"` Keyring string `hcl:"keyring"` + Operator string `hcl:"operator"` } // KeyPolicy represents a policy for a key @@ -125,5 +126,10 @@ func Parse(rules string) (*Policy, error) { return nil, fmt.Errorf("Invalid keyring policy: %#v", p.Keyring) } + // Validate the operator policy - this one is allowed to be empty + if p.Operator != "" && !isPolicyValid(p.Operator) { + return nil, fmt.Errorf("Invalid operator policy: %#v", p.Operator) + } + return p, nil } diff --git a/acl/policy_test.go b/acl/policy_test.go index c59a4e0146..7f31bf8608 100644 --- a/acl/policy_test.go +++ b/acl/policy_test.go @@ -45,6 +45,7 @@ query "bar" { policy = "deny" } keyring = "deny" +operator = "deny" ` exp := &Policy{ Keys: []*KeyPolicy{ @@ -103,7 +104,8 @@ keyring = "deny" Policy: PolicyDeny, }, }, - Keyring: PolicyDeny, + Keyring: PolicyDeny, + Operator: PolicyDeny, } out, err := Parse(inp) @@ -162,7 +164,8 @@ func TestACLPolicy_Parse_JSON(t *testing.T) { "policy": "deny" } }, - "keyring": "deny" + "keyring": "deny", + "operator": "deny" }` exp := &Policy{ Keys: []*KeyPolicy{ @@ -221,7 +224,8 @@ func TestACLPolicy_Parse_JSON(t *testing.T) { Policy: PolicyDeny, }, }, - Keyring: PolicyDeny, + Keyring: PolicyDeny, + Operator: PolicyDeny, } out, err := Parse(inp) @@ -252,6 +256,24 @@ keyring = "" } } +func TestACLPolicy_Operator_Empty(t *testing.T) { + inp := ` +operator = "" + ` + exp := &Policy{ + Operator: "", + } + + out, err := Parse(inp) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(out, exp) { + t.Fatalf("bad: %#v %#v", out, exp) + } +} + func TestACLPolicy_Bad_Policy(t *testing.T) { cases := []string{ `key "" { policy = "nope" }`, @@ -259,6 +281,7 @@ func TestACLPolicy_Bad_Policy(t *testing.T) { `event "" { policy = "nope" }`, `query "" { policy = "nope" }`, `keyring = "nope"`, + `operator = "nope"`, } for _, c := range cases { _, err := Parse(c) diff --git a/api/operator.go b/api/operator.go new file mode 100644 index 0000000000..b7389f0f59 --- /dev/null +++ b/api/operator.go @@ -0,0 +1,71 @@ +package api + +import ( + "github.com/hashicorp/raft" +) + +// Operator can be used to perform low-level operator tasks for Consul. +type Operator struct { + c *Client +} + +// Operator returns a handle to the operator endpoints. +func (c *Client) Operator() *Operator { + return &Operator{c} +} + +// RaftConfigration is returned when querying for the current Raft configuration. +// This has the low-level Raft structure, as well as some supplemental +// information from Consul. +type RaftConfiguration struct { + // Configuration is the low-level Raft configuration structure. + Configuration raft.Configuration + + // NodeMap maps IDs in the Raft configuration to node names known by + // Consul. It's possible that not all configuration entries may have + // an entry here if the node isn't known to Consul. Given how this is + // generated, this may also contain entries that aren't present in the + // Raft configuration. + NodeMap map[raft.ServerID]string + + // Leader is the ID of the current Raft leader. This may be blank if + // there isn't one. + Leader raft.ServerID +} + +// RaftGetConfiguration is used to query the current Raft peer set. +func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { + r := op.c.newRequest("GET", "/v1/operator/raft/configuration") + r.setQueryOptions(q) + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var out RaftConfiguration + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return &out, nil +} + +// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft +// quorum but no longer known to Serf or the catalog) by address in the form of +// "IP:port". +func (op *Operator) RaftRemovePeerByAddress(address raft.ServerAddress, q *WriteOptions) error { + r := op.c.newRequest("DELETE", "/v1/operator/raft/peer") + r.setWriteOptions(q) + + // TODO (slackpad) Currently we made address a query parameter. Once + // IDs are in place this will be DELETE /v1/raft-peer/. + r.params.Set("address", string(address)) + + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + + resp.Body.Close() + return nil +} diff --git a/api/operator_test.go b/api/operator_test.go new file mode 100644 index 0000000000..a0d8af69e2 --- /dev/null +++ b/api/operator_test.go @@ -0,0 +1,38 @@ +package api + +import ( + "strings" + "testing" +) + +func TestOperator_RaftGetConfiguration(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + operator := c.Operator() + out, err := operator.RaftGetConfiguration(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(out.Configuration.Servers) != 1 || + len(out.NodeMap) != 1 || + len(out.Leader) == 0 { + t.Fatalf("bad: %v", out) + } +} + +func TestOperator_RaftRemovePeerByAddress(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + // If we get this error, it proves we sent the address all the way + // through. + operator := c.Operator() + err := operator.RaftRemovePeerByAddress("nope", nil) + if err == nil || !strings.Contains(err.Error(), + "address \"nope\" was not found in the Raft configuration") { + t.Fatalf("err: %v", err) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 5d7dcce7c6..52ed69e8e1 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -230,6 +230,9 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) + s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) + s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) + s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go new file mode 100644 index 0000000000..cdab48c387 --- /dev/null +++ b/command/agent/operator_endpoint.go @@ -0,0 +1,57 @@ +package agent + +import ( + "net/http" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/raft" +) + +// OperatorRaftConfiguration is used to inspect the current Raft configuration. +// This supports the stale query mode in case the cluster doesn't have a leader. +func (s *HTTPServer) OperatorRaftConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil + } + + var args structs.DCSpecificRequest + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + var reply structs.RaftConfigurationResponse + if err := s.agent.RPC("Operator.RaftGetConfiguration", &args, &reply); err != nil { + return nil, err + } + + return reply, nil +} + +// OperatorRaftPeer supports actions on Raft peers. Currently we only support +// removing peers by address. +func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "DELETE" { + resp.WriteHeader(http.StatusMethodNotAllowed) + return nil, nil + } + + var args structs.RaftPeerByAddressRequest + s.parseDC(req, &args.Datacenter) + s.parseToken(req, &args.Token) + + params := req.URL.Query() + if _, ok := params["address"]; ok { + args.Address = raft.ServerAddress(params.Get("address")) + } else { + resp.WriteHeader(http.StatusBadRequest) + resp.Write([]byte("Must specify ?address with IP:port of peer to remove")) + return nil, nil + } + + var reply struct{} + if err := s.agent.RPC("Operator.RaftRemovePeerByAddress", &args, &reply); err != nil { + return nil, err + } + return nil, nil +} diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go new file mode 100644 index 0000000000..8e3ebe7200 --- /dev/null +++ b/command/agent/operator_endpoint_test.go @@ -0,0 +1,58 @@ +package agent + +import ( + "bytes" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/hashicorp/consul/consul/structs" +) + +func TestOperator_OperatorRaftConfiguration(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/operator/raft/configuration", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.OperatorRaftConfiguration(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 200 { + t.Fatalf("bad code: %d", resp.Code) + } + out, ok := obj.(structs.RaftConfigurationResponse) + if !ok { + t.Fatalf("unexpected: %T", obj) + } + if len(out.Configuration.Servers) != 1 || + len(out.NodeMap) != 1 || + len(out.Leader) == 0 { + t.Fatalf("bad: %v", out) + } + }) +} + +func TestOperator_OperatorRaftPeer(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("DELETE", "/v1/operator/raft/peer?address=nope", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + // If we get this error, it proves we sent the address all the + // way through. + resp := httptest.NewRecorder() + _, err = srv.OperatorRaftPeer(resp, req) + if err == nil || !strings.Contains(err.Error(), + "address \"nope\" was not found in the Raft configuration") { + t.Fatalf("err: %v", err) + } + }) +} diff --git a/command/operator.go b/command/operator.go new file mode 100644 index 0000000000..d1a6d8d3b3 --- /dev/null +++ b/command/operator.go @@ -0,0 +1,180 @@ +package command + +import ( + "flag" + "fmt" + "strings" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/raft" + "github.com/mitchellh/cli" + "github.com/ryanuber/columnize" +) + +// OperatorCommand is used to provide various low-level tools for Consul +// operators. +type OperatorCommand struct { + Ui cli.Ui +} + +func (c *OperatorCommand) Help() string { + helpText := ` +Usage: consul operator [common options] [action] [options] + + Provides cluster-level tools for Consul operators, such as interacting with + the Raft subsystem. NOTE: Use this command with extreme caution, as improper + use could lead to a Consul outage and even loss of data. + + If ACLs are enabled then a token with operator privileges may required in + order to use this command. Requests are forwarded internally to the leader + if required, so this can be run from any Consul node in a cluster. + + Run consul operator with no arguments for help on that + subcommand. + +Common Options: + + -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. + -token="" ACL token to use. Defaults to that of agent. + +Subcommands: + + raft View and modify Consul's Raft configuration. +` + return strings.TrimSpace(helpText) +} + +func (c *OperatorCommand) Run(args []string) int { + if len(args) < 1 { + c.Ui.Error("A subcommand must be specified") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + + var err error + subcommand := args[0] + switch subcommand { + case "raft": + err = c.raft(args[1:]) + default: + err = fmt.Errorf("unknown subcommand %q", subcommand) + } + + if err != nil { + c.Ui.Error(fmt.Sprintf("Operator %q subcommand failed: %v", subcommand, err)) + return 1 + } + return 0 +} + +// Synopsis returns a one-line description of this command. +func (c *OperatorCommand) Synopsis() string { + return "Provides cluster-level tools for Consul operators" +} + +const raftHelp = ` +Raft Subcommand Actions: + + raft -list-peers -stale=[true|false] + + Displays the current Raft peer configuration. + + The -stale argument defaults to "false" which means the leader provides the + result. If the cluster is in an outage state without a leader, you may need + to set -stale to "true" to get the configuration from a non-leader server. + + raft -remove-peer -address="IP:port" + + Removes Consul server with given -address from the Raft configuration. + + There are rare cases where a peer may be left behind in the Raft quorum even + though the server is no longer present and known to the cluster. This + command can be used to remove the failed server so that it is no longer + affects the Raft quorum. If the server still shows in the output of the + "consul members" command, it is preferable to clean up by simply running + "consul force-leave" instead of this command. +` + +// raft handles the raft subcommands. +func (c *OperatorCommand) raft(args []string) error { + cmdFlags := flag.NewFlagSet("raft", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + + // Parse verb arguments. + var listPeers, removePeer bool + cmdFlags.BoolVar(&listPeers, "list-peers", false, "") + cmdFlags.BoolVar(&removePeer, "remove-peer", false, "") + + // Parse other arguments. + var stale bool + var address, token string + cmdFlags.StringVar(&address, "address", "", "") + cmdFlags.BoolVar(&stale, "stale", false, "") + cmdFlags.StringVar(&token, "token", "", "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return err + } + + // Set up a client. + conf := api.DefaultConfig() + conf.Address = *httpAddr + client, err := api.NewClient(conf) + if err != nil { + return fmt.Errorf("error connecting to Consul agent: %s", err) + } + operator := client.Operator() + + // Dispatch based on the verb argument. + if listPeers { + // Fetch the current configuration. + q := &api.QueryOptions{ + AllowStale: stale, + Token: token, + } + reply, err := operator.RaftGetConfiguration(q) + if err != nil { + return err + } + + // Format it as a nice table. + result := []string{"Node|ID|Address|State|Voter"} + for _, s := range reply.Configuration.Servers { + node := "(unknown)" + if mappedNode, ok := reply.NodeMap[s.ID]; ok { + node = mappedNode + } + state := "follower" + if s.ID == reply.Leader { + state = "leader" + } + voter := s.Suffrage == raft.Voter + result = append(result, fmt.Sprintf("%s|%s|%s|%s|%v", + node, s.ID, s.Address, state, voter)) + } + c.Ui.Output(columnize.SimpleFormat(result)) + } else if removePeer { + // TODO (slackpad) Once we expose IDs, add support for removing + // by ID, add support for that. + if len(address) == 0 { + return fmt.Errorf("an address is required for the peer to remove") + } + + // Try to kick the peer. + w := &api.WriteOptions{ + Token: token, + } + sa := raft.ServerAddress(address) + if err := operator.RaftRemovePeerByAddress(sa, w); err != nil { + return err + } + c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address)) + } else { + c.Ui.Output(c.Help()) + c.Ui.Output("") + c.Ui.Output(strings.TrimSpace(raftHelp)) + } + + return nil +} diff --git a/command/operator_test.go b/command/operator_test.go new file mode 100644 index 0000000000..e65434b75d --- /dev/null +++ b/command/operator_test.go @@ -0,0 +1,52 @@ +package command + +import ( + "strings" + "testing" + + "github.com/mitchellh/cli" +) + +func TestOperator_Implements(t *testing.T) { + var _ cli.Command = &OperatorCommand{} +} + +func TestOperator_Raft_ListPeers(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + ui := new(cli.MockUi) + c := &OperatorCommand{Ui: ui} + args := []string{"raft", "-http-addr=" + a1.httpAddr, "-list-peers"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + output := strings.TrimSpace(ui.OutputWriter.String()) + if !strings.Contains(output, "leader") { + t.Fatalf("bad: %s", output) + } +} + +func TestOperator_Raft_RemovePeer(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + ui := new(cli.MockUi) + c := &OperatorCommand{Ui: ui} + args := []string{"raft", "-http-addr=" + a1.httpAddr, "-remove-peer", "-address=nope"} + + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + // If we get this error, it proves we sent the address all they through. + output := strings.TrimSpace(ui.ErrorWriter.String()) + if !strings.Contains(output, "address \"nope\" was not found in the Raft configuration") { + t.Fatalf("bad: %s", output) + } +} diff --git a/commands.go b/commands.go index 84f0c07fe6..2a25c77f81 100644 --- a/commands.go +++ b/commands.go @@ -103,6 +103,12 @@ func init() { }, nil }, + "operator": func() (cli.Command, error) { + return &command.OperatorCommand{ + Ui: ui, + }, nil + }, + "info": func() (cli.Command, error) { return &command.InfoCommand{ Ui: ui, diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go new file mode 100644 index 0000000000..a33c07a781 --- /dev/null +++ b/consul/operator_endpoint.go @@ -0,0 +1,120 @@ +package consul + +import ( + "fmt" + "net" + + "github.com/hashicorp/consul/consul/agent" + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/raft" +) + +// Operator endpoint is used to perform low-level operator tasks for Consul. +type Operator struct { + srv *Server +} + +// RaftGetConfiguration is used to retrieve the current Raft configuration. +func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error { + if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator read access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorRead() { + return permissionDeniedErr + } + + // We can't fetch the leader and the configuration atomically with + // the current Raft API. + future := op.srv.raft.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + reply.Configuration = future.Configuration() + leader := op.srv.raft.Leader() + + // Index the configuration so we can easily look up IDs by address. + idMap := make(map[raft.ServerAddress]raft.ServerID) + for _, s := range reply.Configuration.Servers { + idMap[s.Address] = s.ID + } + + // Fill out the node map and leader. + reply.NodeMap = make(map[raft.ServerID]string) + members := op.srv.serfLAN.Members() + for _, member := range members { + valid, parts := agent.IsConsulServer(member) + if !valid { + continue + } + + // TODO (slackpad) We need to add a Raft API to get the leader by + // ID so we don't have to do this mapping. + addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String() + if id, ok := idMap[raft.ServerAddress(addr)]; ok { + reply.NodeMap[id] = member.Name + if leader == raft.ServerAddress(addr) { + reply.Leader = id + } + } + } + return nil +} + +// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft +// quorum but no longer known to Serf or the catalog) by address in the form of +// "IP:port". The reply argument is not used, but it required to fulfill the RPC +// interface. +func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error { + if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done { + return err + } + + // This is a super dangerous operation that requires operator write + // access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorWrite() { + return permissionDeniedErr + } + + // Since this is an operation designed for humans to use, we will return + // an error if the supplied address isn't among the peers since it's + // likely they screwed up. + { + future := op.srv.raft.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + for _, s := range future.Configuration().Servers { + if s.Address == args.Address { + goto REMOVE + } + } + return fmt.Errorf("address %q was not found in the Raft configuration", + args.Address) + } + +REMOVE: + // The Raft library itself will prevent various forms of foot-shooting, + // like making a configuration with no voters. Some consideration was + // given here to adding more checks, but it was decided to make this as + // low-level and direct as possible. We've got ACL coverage to lock this + // down, and if you are an operator, it's assumed you know what you are + // doing if you are calling this. If you remove a peer that's known to + // Serf, for example, it will come back when the leader does a reconcile + // pass. + future := op.srv.raft.RemovePeer(args.Address) + if err := future.Error(); err != nil { + return err + } + + return nil +} diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go new file mode 100644 index 0000000000..c48ff83814 --- /dev/null +++ b/consul/operator_endpoint_test.go @@ -0,0 +1,229 @@ +package consul + +import ( + "fmt" + "os" + "reflect" + "strings" + "testing" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/raft" +) + +func TestOperator_RaftGetConfiguration(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.RaftConfigurationResponse + if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + expected := structs.RaftConfigurationResponse{ + Configuration: future.Configuration(), + NodeMap: map[raft.ServerID]string{ + raft.ServerID(s1.config.RPCAddr.String()): s1.config.NodeName, + }, + Leader: raft.ServerID(s1.config.RPCAddr.String()), + } + if !reflect.DeepEqual(reply, expected) { + t.Fatalf("bad: %v", reply) + } +} + +func TestOperator_RaftGetConfiguration_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Make a request with no token to make sure it gets denied. + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var reply structs.RaftConfigurationResponse + err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL with operator read permissions. + var token string + { + var rules = ` + operator = "read" + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now it should go through. + arg.Token = token + if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + expected := structs.RaftConfigurationResponse{ + Configuration: future.Configuration(), + NodeMap: map[raft.ServerID]string{ + raft.ServerID(s1.config.RPCAddr.String()): s1.config.NodeName, + }, + Leader: raft.ServerID(s1.config.RPCAddr.String()), + } + if !reflect.DeepEqual(reply, expected) { + t.Fatalf("bad: %v", reply) + } +} + +func TestOperator_RaftRemovePeerByAddress(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Try to remove a peer that's not there. + arg := structs.RaftPeerByAddressRequest{ + Datacenter: "dc1", + Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())), + } + var reply struct{} + err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") { + t.Fatalf("err: %v", err) + } + + // Add it manually to Raft. + { + future := s1.raft.AddPeer(arg.Address) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Make sure it's there. + { + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + configuration := future.Configuration() + if len(configuration.Servers) != 2 { + t.Fatalf("bad: %v", configuration) + } + } + + // Remove it, now it should go through. + if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's not there. + { + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + configuration := future.Configuration() + if len(configuration.Servers) != 1 { + t.Fatalf("bad: %v", configuration) + } + } +} + +func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Make a request with no token to make sure it gets denied. + arg := structs.RaftPeerByAddressRequest{ + Datacenter: "dc1", + Address: raft.ServerAddress(s1.config.RPCAddr.String()), + } + var reply struct{} + err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL with operator write permissions. + var token string + { + var rules = ` + operator = "write" + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now it should kick back for being an invalid config, which means it + // tried to do the operation. + arg.Token = token + err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), "at least one voter") { + t.Fatalf("err: %v", err) + } +} diff --git a/consul/server.go b/consul/server.go index ab240ce45f..509bc32944 100644 --- a/consul/server.go +++ b/consul/server.go @@ -162,15 +162,16 @@ type Server struct { // Holds the RPC endpoints type endpoints struct { - Catalog *Catalog - Health *Health - Status *Status - KVS *KVS - Session *Session - Internal *Internal ACL *ACL + Catalog *Catalog Coordinate *Coordinate + Health *Health + Internal *Internal + KVS *KVS + Operator *Operator PreparedQuery *PreparedQuery + Session *Session + Status *Status Txn *Txn } @@ -496,27 +497,29 @@ func (s *Server) setupRaft() error { // setupRPC is used to setup the RPC listener func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints - s.endpoints.Status = &Status{s} - s.endpoints.Catalog = &Catalog{s} - s.endpoints.Health = &Health{s} - s.endpoints.KVS = &KVS{s} - s.endpoints.Session = &Session{s} - s.endpoints.Internal = &Internal{s} s.endpoints.ACL = &ACL{s} + s.endpoints.Catalog = &Catalog{s} s.endpoints.Coordinate = NewCoordinate(s) + s.endpoints.Health = &Health{s} + s.endpoints.Internal = &Internal{s} + s.endpoints.KVS = &KVS{s} + s.endpoints.Operator = &Operator{s} s.endpoints.PreparedQuery = &PreparedQuery{s} + s.endpoints.Session = &Session{s} + s.endpoints.Status = &Status{s} s.endpoints.Txn = &Txn{s} // Register the handlers - s.rpcServer.Register(s.endpoints.Status) - s.rpcServer.Register(s.endpoints.Catalog) - s.rpcServer.Register(s.endpoints.Health) - s.rpcServer.Register(s.endpoints.KVS) - s.rpcServer.Register(s.endpoints.Session) - s.rpcServer.Register(s.endpoints.Internal) s.rpcServer.Register(s.endpoints.ACL) + s.rpcServer.Register(s.endpoints.Catalog) s.rpcServer.Register(s.endpoints.Coordinate) + s.rpcServer.Register(s.endpoints.Health) + s.rpcServer.Register(s.endpoints.Internal) + s.rpcServer.Register(s.endpoints.KVS) + s.rpcServer.Register(s.endpoints.Operator) s.rpcServer.Register(s.endpoints.PreparedQuery) + s.rpcServer.Register(s.endpoints.Session) + s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Txn) list, err := net.ListenTCP("tcp", s.config.RPCAddr) diff --git a/consul/structs/operator.go b/consul/structs/operator.go new file mode 100644 index 0000000000..83372d1316 --- /dev/null +++ b/consul/structs/operator.go @@ -0,0 +1,42 @@ +package structs + +import ( + "github.com/hashicorp/raft" +) + +// RaftConfigrationResponse is returned when querying for the current Raft +// configuration. This has the low-level Raft structure, as well as some +// supplemental information from Consul. +type RaftConfigurationResponse struct { + // Configuration is the low-level Raft configuration structure. + Configuration raft.Configuration + + // NodeMap maps IDs in the Raft configuration to node names known by + // Consul. It's possible that not all configuration entries may have + // an entry here if the node isn't known to Consul. Given how this is + // generated, this may also contain entries that aren't present in the + // Raft configuration. + NodeMap map[raft.ServerID]string + + // Leader is the ID of the current Raft leader. This may be blank if + // there isn't one. + Leader raft.ServerID +} + +// RaftPeerByAddressRequest is used by the Operator endpoint to apply a Raft +// operation on a specific Raft peer by address in the form of "IP:port". +type RaftPeerByAddressRequest struct { + // Datacenter is the target this request is intended for. + Datacenter string + + // Address is the peer to remove, in the form "IP:port". + Address raft.ServerAddress + + // WriteRequest holds the ACL token to go along with this request. + WriteRequest +} + +// RequestDatacenter returns the datacenter for a given request. +func (op *RaftPeerByAddressRequest) RequestDatacenter() string { + return op.Datacenter +} diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown new file mode 100644 index 0000000000..f13e5c4b62 --- /dev/null +++ b/website/source/docs/agent/http/operator.html.markdown @@ -0,0 +1,48 @@ +--- +layout: "docs" +page_title: "Operator (HTTP)" +sidebar_current: "docs-agent-http-operator" +description: > + The operator endpoint provides cluster-level tools for Consul operators. +--- + +# Operator HTTP Endpoint + +The Operator endpoints provide cluster-level tools for Consul operators, such +as interacting with the Raft subsystem. This was added in Consul 0.7. + +~> Use this interface with extreme caution, as improper use could lead to a Consul + outage and even loss of data. + +If ACLs are enabled then a token with operator privileges may required in +order to use this interface. See the [ACL](/docs/internals/acl.html#operator) +internals guide for more information. + +See the [Outage Recovery](/docs/guides/outage.html) guide for some examples of how +these capabilities are used. For a CLI to perform these operations manually, please +see the documentation for the [`consul operator`](/docs/commands/operator.html) +command. + +The following endpoints are supported: + +* [`/v1/operator/raft/configuration`](#raft-configuration): Inspects the Raft configuration +* [`/v1/operator/raft/peer`](#raft-peer): Operates on Raft peers + +Not all endpoints support blocking queries and all consistency modes, +see details in the sections below. + +The operator endpoints support the use of ACL Tokens. See the +[ACL](/docs/internals/acl.html#operator) internals guide for more information. + +### /v1/operator/raft/configuration + +The Raft configuration endpoint supports the `GET` method. + +#### GET Method + +### /v1/operator/raft/peer + +The Raft peer endpoint supports the `DELETE` method. + +#### DELETE Method + diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index e466a94b44..7e0302754d 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -38,6 +38,7 @@ Available commands are: lock Execute a command holding a lock members Lists the members of a Consul cluster monitor Stream logs from a Consul agent + operator Provides cluster-level tools for Consul operators reload Triggers the agent to reload configuration files rtt Estimates network round trip time between nodes version Prints the Consul version diff --git a/website/source/docs/commands/operator.html.markdown b/website/source/docs/commands/operator.html.markdown new file mode 100644 index 0000000000..542cfee81f --- /dev/null +++ b/website/source/docs/commands/operator.html.markdown @@ -0,0 +1,100 @@ +--- +layout: "docs" +page_title: "Commands: Operator" +sidebar_current: "docs-commands-operator" +description: > + The operator command provides cluster-level tools for Consul operators. +--- + +# Consul Operator + +Command: `consul operator` + +The `operator` command provides cluster-level tools for Consul operators, such +as interacting with the Raft subsystem. This was added in Consul 0.7. + +~> Use this command with extreme caution, as improper use could lead to a Consul + outage and even loss of data. + +If ACLs are enabled then a token with operator privileges may required in +order to use this command. Requests are forwarded internally to the leader +if required, so this can be run from any Consul node in a cluster. See the +[ACL](/docs/internals/acl.html#operator) internals guide for more information. + +See the [Outage Recovery](/docs/guides/outage.html) guide for some examples of how +this command is used. For an API to perform these operations programatically, +please see the documentation for the [Operator](/docs/agent/http/operator.html) +endpoint. + +## Usage + +Usage: `consul operator [common options] [action] [options]` + +Run `consul operator ` with no arguments for help on that +subcommand. The following subcommands are available: + +* `raft` - View and modify Consul's Raft configuration. + +Options common to all subcommands include: + +* `-http-addr` - Address to the HTTP server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8500" which is the default HTTP address of a Consul agent. + +* `-token` - ACL token to use. Defaults to that of agent. + +## Raft Operations + +The `raft` subcommand is used to view and modify Consul's Raft configuration. +Two actions are available, as detailed in this section. + + +`raft -list-peers -stale=[true|false]` + +This action displays the current Raft peer configuration. + +The `-stale` argument defaults to "false" which means the leader provides the +result. If the cluster is in an outage state without a leader, you may need +to set `-stale` to "true" to get the configuration from a non-leader server. + +The output looks like this: + +``` +Node ID Address State Voter +alice 127.0.0.1:8300 127.0.0.1:8300 follower true +bob 127.0.0.2:8300 127.0.0.2:8300 leader true +carol 127.0.0.3:8300 127.0.0.3:8300 follower true +``` + +* `Node` is the node name of the server, as known to Consul, or "(unknown)" if + the node is stale at not known. + +* `ID` is the ID of the server. This is the same as the `Address` in Consul 0.7 + but may be upgraded to a GUID in a future version of Consul. + +* `Address` is the IP:port for the server. + +* `State` is either "follower" or "leader" depending on the server's role in the + Raft configuration. + +* `Voter` is "true" or "false", indicating if the server has a vote in the Raft + configuration. Future versions of Consul may add support for non-voting + servers. + + +`raft -remove-peer -address="IP:port"` + +This command removes Consul server with given -address from the Raft +configuration. + +The `-address` argument is required and is the "IP:port" for the server to +remove. The port number is usually 8300, unless configured otherwise. + +There are rare cases where a peer may be left behind in the Raft quorum even +though the server is no longer present and known to the cluster. This command +can be used to remove the failed server so that it is no longer affects the +Raft quorum. If the server still shows in the output of the +[`consul members`](/docs/commands/members.html) command, it is preferable to +clean up by simply running +[`consul force-leave`](http://localhost:4567/docs/commands/force-leave.html) +instead of this command. diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 78475804d1..5c02bc7dea 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -118,6 +118,10 @@ monitor + > + operator + + > info @@ -178,6 +182,10 @@ Network Coordinates + > + Operator + + > Prepared Queries