From da9c82559289dfe6fcddea69650152b71fddbe09 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 29 Mar 2017 18:09:41 -0700 Subject: [PATCH] Add CLI/API endpoints for removing peer by ID --- api/operator.go | 19 ++- command/agent/operator_endpoint.go | 27 +++- command/agent/operator_endpoint_test.go | 17 +++ command/operator_raft.go | 2 +- command/operator_raft_remove.go | 33 +++-- command/operator_raft_remove_test.go | 23 ++++ consul/autopilot_test.go | 2 +- consul/operator_endpoint.go | 82 +++++++++++- consul/operator_endpoint_test.go | 120 +++++++++++++++++- consul/structs/operator.go | 9 +- .../docs/agent/http/operator.html.markdown | 6 +- .../commands/operator/raft.html.markdown.erb | 2 + 12 files changed, 313 insertions(+), 29 deletions(-) diff --git a/api/operator.go b/api/operator.go index c5c13141a0..d6a1ff265d 100644 --- a/api/operator.go +++ b/api/operator.go @@ -227,8 +227,6 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err r := op.c.newRequest("DELETE", "/v1/operator/raft/peer") r.setWriteOptions(q) - // TODO (slackpad) Currently we made address a query parameter. Once - // IDs are in place this will be DELETE /v1/operator/raft/peer/. r.params.Set("address", string(address)) _, resp, err := requireOK(op.c.doRequest(r)) @@ -240,6 +238,23 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err return nil } +// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft +// quorum but no longer known to Serf or the catalog) by ID. +func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error { + r := op.c.newRequest("DELETE", "/v1/operator/raft/peer") + r.setWriteOptions(q) + + r.params.Set("id", string(id)) + + _, resp, err := requireOK(op.c.doRequest(r)) + if err != nil { + return err + } + + resp.Body.Close() + return nil +} + // KeyringInstall is used to install a new gossip encryption key into the cluster func (op *Operator) KeyringInstall(key string, q *WriteOptions) error { r := op.c.newRequest("POST", "/v1/operator/keyring") diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 1900a40389..2534c1261b 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -42,23 +42,40 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques return nil, nil } - var args structs.RaftPeerByAddressRequest + var args structs.RaftRemovePeerRequest s.parseDC(req, &args.Datacenter) s.parseToken(req, &args.Token) params := req.URL.Query() - if _, ok := params["address"]; ok { + _, hasID := params["id"] + if hasID { + args.ID = raft.ServerID(params.Get("id")) + } + _, hasAddress := params["address"] + if hasAddress { args.Address = raft.ServerAddress(params.Get("address")) - } else { + } + + if !hasID && !hasAddress { resp.WriteHeader(http.StatusBadRequest) - resp.Write([]byte("Must specify ?address with IP:port of peer to remove")) + resp.Write([]byte("Must specify either ?id with the server's ID or ?address with IP:port of peer to remove")) + return nil, nil + } + if hasID && hasAddress { + resp.WriteHeader(http.StatusBadRequest) + resp.Write([]byte("Must specify only one of ?id or ?address")) return nil, nil } var reply struct{} - if err := s.agent.RPC("Operator.RaftRemovePeerByAddress", &args, &reply); err != nil { + method := "Operator.RaftRemovePeerByID" + if hasAddress { + method = "Operator.RaftRemovePeerByAddress" + } + if err := s.agent.RPC(method, &args, &reply); err != nil { return nil, err } + return nil, nil } diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 0d976eaef8..6e6368f451 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -59,6 +59,23 @@ func TestOperator_RaftPeer(t *testing.T) { t.Fatalf("err: %v", err) } }) + + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("DELETE", "/v1/operator/raft/peer?id=nope", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + // If we get this error, it proves we sent the ID all the + // way through. + resp := httptest.NewRecorder() + _, err = srv.OperatorRaftPeer(resp, req) + if err == nil || !strings.Contains(err.Error(), + "id \"nope\" was not found in the Raft configuration") { + t.Fatalf("err: %v", err) + } + }) } func TestOperator_KeyringInstall(t *testing.T) { diff --git a/command/operator_raft.go b/command/operator_raft.go index 2f5ea26c3f..d40e4cc726 100644 --- a/command/operator_raft.go +++ b/command/operator_raft.go @@ -87,7 +87,7 @@ func (c *OperatorRaftCommand) raft(args []string) error { } c.Ui.Output(result) } else if removePeer { - if err := raftRemovePeers(address, operator); err != nil { + if err := raftRemovePeers(address, "", operator); err != nil { return fmt.Errorf("Error removing peer: %v", err) } c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address)) diff --git a/command/operator_raft_remove.go b/command/operator_raft_remove.go index 3ca2aaebee..fba1cebc4c 100644 --- a/command/operator_raft_remove.go +++ b/command/operator_raft_remove.go @@ -38,9 +38,11 @@ func (c *OperatorRaftRemoveCommand) Synopsis() string { func (c *OperatorRaftRemoveCommand) Run(args []string) int { f := c.Command.NewFlagSet(c) - var address string + var address, id string f.StringVar(&address, "address", "", "The address to remove from the Raft configuration.") + f.StringVar(&id, "id", "", + "The ID to remove from the Raft configuration.") if err := c.Command.Parse(args); err != nil { if err == flag.ErrHelp { @@ -58,25 +60,36 @@ func (c *OperatorRaftRemoveCommand) Run(args []string) int { } // Fetch the current configuration. - if err := raftRemovePeers(address, client.Operator()); err != nil { + if err := raftRemovePeers(address, id, client.Operator()); err != nil { c.Ui.Error(fmt.Sprintf("Error removing peer: %v", err)) return 1 } - c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address)) + if address != "" { + c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address)) + } else { + c.Ui.Output(fmt.Sprintf("Removed peer with id %q", id)) + } return 0 } -func raftRemovePeers(address string, operator *api.Operator) error { - // TODO (slackpad) Once we expose IDs, add support for removing - // by ID, add support for that. - if len(address) == 0 { - return fmt.Errorf("an address is required for the peer to remove") +func raftRemovePeers(address, id string, operator *api.Operator) error { + if len(address) == 0 && len(id) == 0 { + return fmt.Errorf("an address or id is required for the peer to remove") + } + if len(address) > 0 && len(id) > 0 { + return fmt.Errorf("cannot give both an address and id") } // Try to kick the peer. - if err := operator.RaftRemovePeerByAddress(address, nil); err != nil { - return err + if len(address) > 0 { + if err := operator.RaftRemovePeerByAddress(address, nil); err != nil { + return err + } + } else { + if err := operator.RaftRemovePeerByID(id, nil); err != nil { + return err + } } return nil diff --git a/command/operator_raft_remove_test.go b/command/operator_raft_remove_test.go index a549e895c4..1a039f171a 100644 --- a/command/operator_raft_remove_test.go +++ b/command/operator_raft_remove_test.go @@ -56,4 +56,27 @@ func TestOperator_Raft_RemovePeer(t *testing.T) { t.Fatalf("bad: %s", output) } } + + // Test the remove-peer subcommand with -id + { + ui := new(cli.MockUi) + c := OperatorRaftRemoveCommand{ + Command: base.Command{ + Ui: ui, + Flags: base.FlagSetHTTP, + }, + } + args := []string{"-http-addr=" + a1.httpAddr, "-id=nope"} + + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + // If we get this error, it proves we sent the address all they through. + output := strings.TrimSpace(ui.ErrorWriter.String()) + if !strings.Contains(output, "id \"nope\" was not found in the Raft configuration") { + t.Fatalf("bad: %s", output) + } + } } diff --git a/consul/autopilot_test.go b/consul/autopilot_test.go index e2811a49bb..5aed5719ba 100644 --- a/consul/autopilot_test.go +++ b/consul/autopilot_test.go @@ -194,7 +194,7 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) { // Add s4 to peers directly s4addr := fmt.Sprintf("127.0.0.1:%d", s4.config.SerfLANConfig.MemberlistConfig.BindPort) - s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(s4addr),0, 0) + s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(s4addr), 0, 0) // Verify we have 4 peers peers, err := s1.numPeers() diff --git a/consul/operator_endpoint.go b/consul/operator_endpoint.go index acbfb9ea3a..e77fae903e 100644 --- a/consul/operator_endpoint.go +++ b/consul/operator_endpoint.go @@ -74,7 +74,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply // quorum but no longer known to Serf or the catalog) by address in the form of // "IP:port". The reply argument is not used, but it required to fulfill the RPC // interface. -func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error { +func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error { if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done { return err } @@ -99,6 +99,7 @@ func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressReque } for _, s := range future.Configuration().Servers { if s.Address == args.Address { + args.ID = s.ID goto REMOVE } } @@ -115,7 +116,17 @@ REMOVE: // doing if you are calling this. If you remove a peer that's known to // Serf, for example, it will come back when the leader does a reconcile // pass. - future := op.srv.raft.RemovePeer(args.Address) + minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) + if err != nil { + return err + } + + var future raft.Future + if minRaftProtocol >= 2 { + future = op.srv.raft.RemoveServer(args.ID, 0, 0) + } else { + future = op.srv.raft.RemovePeer(args.Address) + } if err := future.Error(); err != nil { op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v", args.Address, err) @@ -126,6 +137,73 @@ REMOVE: return nil } +// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft +// quorum but no longer known to Serf or the catalog) by address in the form of +// "IP:port". The reply argument is not used, but is required to fulfill the RPC +// interface. +func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error { + if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done { + return err + } + + // This is a super dangerous operation that requires operator write + // access. + acl, err := op.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && !acl.OperatorWrite() { + return permissionDeniedErr + } + + // Since this is an operation designed for humans to use, we will return + // an error if the supplied id isn't among the peers since it's + // likely they screwed up. + { + future := op.srv.raft.GetConfiguration() + if err := future.Error(); err != nil { + return err + } + for _, s := range future.Configuration().Servers { + if s.ID == args.ID { + args.Address = s.Address + goto REMOVE + } + } + return fmt.Errorf("id %q was not found in the Raft configuration", + args.ID) + } + +REMOVE: + // The Raft library itself will prevent various forms of foot-shooting, + // like making a configuration with no voters. Some consideration was + // given here to adding more checks, but it was decided to make this as + // low-level and direct as possible. We've got ACL coverage to lock this + // down, and if you are an operator, it's assumed you know what you are + // doing if you are calling this. If you remove a peer that's known to + // Serf, for example, it will come back when the leader does a reconcile + // pass. + minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members()) + if err != nil { + return err + } + + var future raft.Future + if minRaftProtocol >= 2 { + future = op.srv.raft.RemoveServer(args.ID, 0, 0) + } else { + future = op.srv.raft.RemovePeer(args.Address) + } + if err := future.Error(); err != nil { + op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v", + args.ID, err) + return err + } + + op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID) + return nil +} + // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error { if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { diff --git a/consul/operator_endpoint_test.go b/consul/operator_endpoint_test.go index dcb761ff97..de9a060c86 100644 --- a/consul/operator_endpoint_test.go +++ b/consul/operator_endpoint_test.go @@ -143,7 +143,7 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Try to remove a peer that's not there. - arg := structs.RaftPeerByAddressRequest{ + arg := structs.RaftRemovePeerRequest{ Datacenter: "dc1", Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())), } @@ -205,7 +205,7 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") // Make a request with no token to make sure it gets denied. - arg := structs.RaftPeerByAddressRequest{ + arg := structs.RaftRemovePeerRequest{ Datacenter: "dc1", Address: raft.ServerAddress(s1.config.RPCAddr.String()), } @@ -246,6 +246,122 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) { } } +func TestOperator_RaftRemovePeerByID(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.RaftConfig.ProtocolVersion = 3 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Try to remove a peer that's not there. + arg := structs.RaftRemovePeerRequest{ + Datacenter: "dc1", + ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"), + } + var reply struct{} + err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") { + t.Fatalf("err: %v", err) + } + + // Add it manually to Raft. + { + future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())), 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Make sure it's there. + { + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + configuration := future.Configuration() + if len(configuration.Servers) != 2 { + t.Fatalf("bad: %v", configuration) + } + } + + // Remove it, now it should go through. + if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure it's not there. + { + future := s1.raft.GetConfiguration() + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + configuration := future.Configuration() + if len(configuration.Servers) != 1 { + t.Fatalf("bad: %v", configuration) + } + } +} + +func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.RaftConfig.ProtocolVersion = 3 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Make a request with no token to make sure it gets denied. + arg := structs.RaftRemovePeerRequest{ + Datacenter: "dc1", + ID: raft.ServerID(s1.config.NodeID), + } + var reply struct{} + err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL with operator write permissions. + var token string + { + var rules = ` + operator = "write" + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now it should kick back for being an invalid config, which means it + // tried to do the operation. + arg.Token = token + err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply) + if err == nil || !strings.Contains(err.Error(), "at least one voter") { + t.Fatalf("err: %v", err) + } +} + func TestOperator_Autopilot_GetConfiguration(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.AutopilotConfig.CleanupDeadServers = false diff --git a/consul/structs/operator.go b/consul/structs/operator.go index 00e6bd393f..332d9e556d 100644 --- a/consul/structs/operator.go +++ b/consul/structs/operator.go @@ -73,21 +73,24 @@ type RaftConfigurationResponse struct { Index uint64 } -// RaftPeerByAddressRequest is used by the Operator endpoint to apply a Raft +// RaftRemovePeerRequest is used by the Operator endpoint to apply a Raft // operation on a specific Raft peer by address in the form of "IP:port". -type RaftPeerByAddressRequest struct { +type RaftRemovePeerRequest struct { // Datacenter is the target this request is intended for. Datacenter string // Address is the peer to remove, in the form "IP:port". Address raft.ServerAddress + // ID is the peer ID to remove. + ID raft.ServerID + // WriteRequest holds the ACL token to go along with this request. WriteRequest } // RequestDatacenter returns the datacenter for a given request. -func (op *RaftPeerByAddressRequest) RequestDatacenter() string { +func (op *RaftRemovePeerRequest) RequestDatacenter() string { return op.Datacenter } diff --git a/website/source/docs/agent/http/operator.html.markdown b/website/source/docs/agent/http/operator.html.markdown index 1b8a6299d1..e7697683c0 100644 --- a/website/source/docs/agent/http/operator.html.markdown +++ b/website/source/docs/agent/http/operator.html.markdown @@ -688,9 +688,9 @@ even though the server is no longer present and known to the cluster. This endpoint can be used to remove the failed server so that it is no longer affects the Raft quorum. -An `?address=` query parameter is required and should be set to the -`IP:port` for the server to remove. The port number is usually 8300, unless -configured otherwise. Nothing is required in the body of the request. +Either an `?id=` or `?address=` query parameter is required and should be set to the +peer ID or `IP:port` respectively for the server to remove. The port number is usually +8300, unless configured otherwise. Nothing is required in the body of the request. By default, the datacenter of the agent is targeted; however, the `dc` can be provided using the `?dc=` query parameter. diff --git a/website/source/docs/commands/operator/raft.html.markdown.erb b/website/source/docs/commands/operator/raft.html.markdown.erb index b2b68f4bd0..c08465be5e 100644 --- a/website/source/docs/commands/operator/raft.html.markdown.erb +++ b/website/source/docs/commands/operator/raft.html.markdown.erb @@ -78,4 +78,6 @@ Usage: `consul operator raft remove-peer -address="IP:port"` * `-address` - "IP:port" for the server to remove. The port number is usually 8300, unless configured otherwise. +* `-id` - ID of the server to remove. + The return code will indicate success or failure.