mirror of https://github.com/hashicorp/consul
Moves operator sub-functions into their own files.
parent
1767fa87a7
commit
08f81ebddb
378
api/operator.go
378
api/operator.go
|
@ -1,14 +1,5 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Operator can be used to perform low-level operator tasks for Consul.
|
// Operator can be used to perform low-level operator tasks for Consul.
|
||||||
type Operator struct {
|
type Operator struct {
|
||||||
c *Client
|
c *Client
|
||||||
|
@ -18,372 +9,3 @@ type Operator struct {
|
||||||
func (c *Client) Operator() *Operator {
|
func (c *Client) Operator() *Operator {
|
||||||
return &Operator{c}
|
return &Operator{c}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RaftServer has information about a server in the Raft configuration.
|
|
||||||
type RaftServer struct {
|
|
||||||
// ID is the unique ID for the server. These are currently the same
|
|
||||||
// as the address, but they will be changed to a real GUID in a future
|
|
||||||
// release of Consul.
|
|
||||||
ID string
|
|
||||||
|
|
||||||
// Node is the node name of the server, as known by Consul, or this
|
|
||||||
// will be set to "(unknown)" otherwise.
|
|
||||||
Node string
|
|
||||||
|
|
||||||
// Address is the IP:port of the server, used for Raft communications.
|
|
||||||
Address string
|
|
||||||
|
|
||||||
// Leader is true if this server is the current cluster leader.
|
|
||||||
Leader bool
|
|
||||||
|
|
||||||
// Voter is true if this server has a vote in the cluster. This might
|
|
||||||
// be false if the server is staging and still coming online, or if
|
|
||||||
// it's a non-voting server, which will be added in a future release of
|
|
||||||
// Consul.
|
|
||||||
Voter bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftConfigration is returned when querying for the current Raft configuration.
|
|
||||||
type RaftConfiguration struct {
|
|
||||||
// Servers has the list of servers in the Raft configuration.
|
|
||||||
Servers []*RaftServer
|
|
||||||
|
|
||||||
// Index has the Raft index of this configuration.
|
|
||||||
Index uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// keyringRequest is used for performing Keyring operations
|
|
||||||
type keyringRequest struct {
|
|
||||||
Key string
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyringResponse is returned when listing the gossip encryption keys
|
|
||||||
type KeyringResponse struct {
|
|
||||||
// Whether this response is for a WAN ring
|
|
||||||
WAN bool
|
|
||||||
|
|
||||||
// The datacenter name this request corresponds to
|
|
||||||
Datacenter string
|
|
||||||
|
|
||||||
// A map of the encryption keys to the number of nodes they're installed on
|
|
||||||
Keys map[string]int
|
|
||||||
|
|
||||||
// The total number of nodes in this ring
|
|
||||||
NumNodes int
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutopilotConfiguration is used for querying/setting the Autopilot configuration.
|
|
||||||
// Autopilot helps manage operator tasks related to Consul servers like removing
|
|
||||||
// failed servers from the Raft quorum.
|
|
||||||
type AutopilotConfiguration struct {
|
|
||||||
// CleanupDeadServers controls whether to remove dead servers from the Raft
|
|
||||||
// peer list when a new server joins
|
|
||||||
CleanupDeadServers bool
|
|
||||||
|
|
||||||
// LastContactThreshold is the limit on the amount of time a server can go
|
|
||||||
// without leader contact before being considered unhealthy.
|
|
||||||
LastContactThreshold *ReadableDuration
|
|
||||||
|
|
||||||
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
|
||||||
// be behind before being considered unhealthy.
|
|
||||||
MaxTrailingLogs uint64
|
|
||||||
|
|
||||||
// ServerStabilizationTime is the minimum amount of time a server must be
|
|
||||||
// in a stable, healthy state before it can be added to the cluster. Only
|
|
||||||
// applicable with Raft protocol version 3 or higher.
|
|
||||||
ServerStabilizationTime *ReadableDuration
|
|
||||||
|
|
||||||
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
|
||||||
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
|
||||||
RedundancyZoneTag string
|
|
||||||
|
|
||||||
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
|
||||||
// strategy of waiting until enough newer-versioned servers have been added to the
|
|
||||||
// cluster before promoting them to voters.
|
|
||||||
DisableUpgradeMigration bool
|
|
||||||
|
|
||||||
// CreateIndex holds the index corresponding the creation of this configuration.
|
|
||||||
// This is a read-only field.
|
|
||||||
CreateIndex uint64
|
|
||||||
|
|
||||||
// ModifyIndex will be set to the index of the last update when retrieving the
|
|
||||||
// Autopilot configuration. Resubmitting a configuration with
|
|
||||||
// AutopilotCASConfiguration will perform a check-and-set operation which ensures
|
|
||||||
// there hasn't been a subsequent update since the configuration was retrieved.
|
|
||||||
ModifyIndex uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerHealth is the health (from the leader's point of view) of a server.
|
|
||||||
type ServerHealth struct {
|
|
||||||
// ID is the raft ID of the server.
|
|
||||||
ID string
|
|
||||||
|
|
||||||
// Name is the node name of the server.
|
|
||||||
Name string
|
|
||||||
|
|
||||||
// Address is the address of the server.
|
|
||||||
Address string
|
|
||||||
|
|
||||||
// The status of the SerfHealth check for the server.
|
|
||||||
SerfStatus string
|
|
||||||
|
|
||||||
// Version is the Consul version of the server.
|
|
||||||
Version string
|
|
||||||
|
|
||||||
// Leader is whether this server is currently the leader.
|
|
||||||
Leader bool
|
|
||||||
|
|
||||||
// LastContact is the time since this node's last contact with the leader.
|
|
||||||
LastContact *ReadableDuration
|
|
||||||
|
|
||||||
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
|
||||||
LastTerm uint64
|
|
||||||
|
|
||||||
// LastIndex is the last log index this server has a record of in its Raft log.
|
|
||||||
LastIndex uint64
|
|
||||||
|
|
||||||
// Healthy is whether or not the server is healthy according to the current
|
|
||||||
// Autopilot config.
|
|
||||||
Healthy bool
|
|
||||||
|
|
||||||
// Voter is whether this is a voting server.
|
|
||||||
Voter bool
|
|
||||||
|
|
||||||
// StableSince is the last time this server's Healthy value changed.
|
|
||||||
StableSince time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// OperatorHealthReply is a representation of the overall health of the cluster
|
|
||||||
type OperatorHealthReply struct {
|
|
||||||
// Healthy is true if all the servers in the cluster are healthy.
|
|
||||||
Healthy bool
|
|
||||||
|
|
||||||
// FailureTolerance is the number of healthy servers that could be lost without
|
|
||||||
// an outage occurring.
|
|
||||||
FailureTolerance int
|
|
||||||
|
|
||||||
// Servers holds the health of each server.
|
|
||||||
Servers []ServerHealth
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
|
|
||||||
type ReadableDuration time.Duration
|
|
||||||
|
|
||||||
func NewReadableDuration(dur time.Duration) *ReadableDuration {
|
|
||||||
d := ReadableDuration(dur)
|
|
||||||
return &d
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *ReadableDuration) String() string { return d.Duration().String() }
|
|
||||||
func (d *ReadableDuration) Duration() time.Duration {
|
|
||||||
if d == nil {
|
|
||||||
return time.Duration(0)
|
|
||||||
}
|
|
||||||
return time.Duration(*d)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
|
|
||||||
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
|
|
||||||
if d == nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal to nil pointer")
|
|
||||||
}
|
|
||||||
|
|
||||||
str := string(raw)
|
|
||||||
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
|
|
||||||
return fmt.Errorf("must be enclosed with quotes: %s", str)
|
|
||||||
}
|
|
||||||
dur, err := time.ParseDuration(str[1 : len(str)-1])
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
*d = ReadableDuration(dur)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftGetConfiguration is used to query the current Raft peer set.
|
|
||||||
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
|
||||||
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
|
||||||
r.setQueryOptions(q)
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
var out RaftConfiguration
|
|
||||||
if err := decodeBody(resp, &out); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
|
||||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
|
||||||
// "IP:port".
|
|
||||||
func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error {
|
|
||||||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
|
||||||
r.setWriteOptions(q)
|
|
||||||
|
|
||||||
r.params.Set("address", string(address))
|
|
||||||
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp.Body.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft
|
|
||||||
// quorum but no longer known to Serf or the catalog) by ID.
|
|
||||||
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
|
|
||||||
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
|
||||||
r.setWriteOptions(q)
|
|
||||||
|
|
||||||
r.params.Set("id", string(id))
|
|
||||||
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp.Body.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyringInstall is used to install a new gossip encryption key into the cluster
|
|
||||||
func (op *Operator) KeyringInstall(key string, q *WriteOptions) error {
|
|
||||||
r := op.c.newRequest("POST", "/v1/operator/keyring")
|
|
||||||
r.setWriteOptions(q)
|
|
||||||
r.obj = keyringRequest{
|
|
||||||
Key: key,
|
|
||||||
}
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyringList is used to list the gossip keys installed in the cluster
|
|
||||||
func (op *Operator) KeyringList(q *QueryOptions) ([]*KeyringResponse, error) {
|
|
||||||
r := op.c.newRequest("GET", "/v1/operator/keyring")
|
|
||||||
r.setQueryOptions(q)
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
var out []*KeyringResponse
|
|
||||||
if err := decodeBody(resp, &out); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyringRemove is used to remove a gossip encryption key from the cluster
|
|
||||||
func (op *Operator) KeyringRemove(key string, q *WriteOptions) error {
|
|
||||||
r := op.c.newRequest("DELETE", "/v1/operator/keyring")
|
|
||||||
r.setWriteOptions(q)
|
|
||||||
r.obj = keyringRequest{
|
|
||||||
Key: key,
|
|
||||||
}
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyringUse is used to change the active gossip encryption key
|
|
||||||
func (op *Operator) KeyringUse(key string, q *WriteOptions) error {
|
|
||||||
r := op.c.newRequest("PUT", "/v1/operator/keyring")
|
|
||||||
r.setWriteOptions(q)
|
|
||||||
r.obj = keyringRequest{
|
|
||||||
Key: key,
|
|
||||||
}
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
|
|
||||||
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
|
|
||||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
|
|
||||||
r.setQueryOptions(q)
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
var out AutopilotConfiguration
|
|
||||||
if err := decodeBody(resp, &out); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
|
||||||
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
|
|
||||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
|
||||||
r.setWriteOptions(q)
|
|
||||||
r.obj = conf
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
|
|
||||||
// Autopilot configuration. The ModifyIndex value will be respected. Returns
|
|
||||||
// true on success or false on failures.
|
|
||||||
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
|
|
||||||
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
|
||||||
r.setWriteOptions(q)
|
|
||||||
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
|
|
||||||
r.obj = conf
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
var buf bytes.Buffer
|
|
||||||
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
|
||||||
return false, fmt.Errorf("Failed to read response: %v", err)
|
|
||||||
}
|
|
||||||
res := strings.Contains(string(buf.Bytes()), "true")
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutopilotServerHealth
|
|
||||||
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
|
|
||||||
r := op.c.newRequest("GET", "/v1/operator/autopilot/health")
|
|
||||||
r.setQueryOptions(q)
|
|
||||||
_, resp, err := requireOK(op.c.doRequest(r))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
var out OperatorHealthReply
|
|
||||||
if err := decodeBody(resp, &out); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,215 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AutopilotConfiguration is used for querying/setting the Autopilot configuration.
|
||||||
|
// Autopilot helps manage operator tasks related to Consul servers like removing
|
||||||
|
// failed servers from the Raft quorum.
|
||||||
|
type AutopilotConfiguration struct {
|
||||||
|
// CleanupDeadServers controls whether to remove dead servers from the Raft
|
||||||
|
// peer list when a new server joins
|
||||||
|
CleanupDeadServers bool
|
||||||
|
|
||||||
|
// LastContactThreshold is the limit on the amount of time a server can go
|
||||||
|
// without leader contact before being considered unhealthy.
|
||||||
|
LastContactThreshold *ReadableDuration
|
||||||
|
|
||||||
|
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
|
||||||
|
// be behind before being considered unhealthy.
|
||||||
|
MaxTrailingLogs uint64
|
||||||
|
|
||||||
|
// ServerStabilizationTime is the minimum amount of time a server must be
|
||||||
|
// in a stable, healthy state before it can be added to the cluster. Only
|
||||||
|
// applicable with Raft protocol version 3 or higher.
|
||||||
|
ServerStabilizationTime *ReadableDuration
|
||||||
|
|
||||||
|
// (Enterprise-only) RedundancyZoneTag is the node tag to use for separating
|
||||||
|
// servers into zones for redundancy. If left blank, this feature will be disabled.
|
||||||
|
RedundancyZoneTag string
|
||||||
|
|
||||||
|
// (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration
|
||||||
|
// strategy of waiting until enough newer-versioned servers have been added to the
|
||||||
|
// cluster before promoting them to voters.
|
||||||
|
DisableUpgradeMigration bool
|
||||||
|
|
||||||
|
// CreateIndex holds the index corresponding the creation of this configuration.
|
||||||
|
// This is a read-only field.
|
||||||
|
CreateIndex uint64
|
||||||
|
|
||||||
|
// ModifyIndex will be set to the index of the last update when retrieving the
|
||||||
|
// Autopilot configuration. Resubmitting a configuration with
|
||||||
|
// AutopilotCASConfiguration will perform a check-and-set operation which ensures
|
||||||
|
// there hasn't been a subsequent update since the configuration was retrieved.
|
||||||
|
ModifyIndex uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerHealth is the health (from the leader's point of view) of a server.
|
||||||
|
type ServerHealth struct {
|
||||||
|
// ID is the raft ID of the server.
|
||||||
|
ID string
|
||||||
|
|
||||||
|
// Name is the node name of the server.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// Address is the address of the server.
|
||||||
|
Address string
|
||||||
|
|
||||||
|
// The status of the SerfHealth check for the server.
|
||||||
|
SerfStatus string
|
||||||
|
|
||||||
|
// Version is the Consul version of the server.
|
||||||
|
Version string
|
||||||
|
|
||||||
|
// Leader is whether this server is currently the leader.
|
||||||
|
Leader bool
|
||||||
|
|
||||||
|
// LastContact is the time since this node's last contact with the leader.
|
||||||
|
LastContact *ReadableDuration
|
||||||
|
|
||||||
|
// LastTerm is the highest leader term this server has a record of in its Raft log.
|
||||||
|
LastTerm uint64
|
||||||
|
|
||||||
|
// LastIndex is the last log index this server has a record of in its Raft log.
|
||||||
|
LastIndex uint64
|
||||||
|
|
||||||
|
// Healthy is whether or not the server is healthy according to the current
|
||||||
|
// Autopilot config.
|
||||||
|
Healthy bool
|
||||||
|
|
||||||
|
// Voter is whether this is a voting server.
|
||||||
|
Voter bool
|
||||||
|
|
||||||
|
// StableSince is the last time this server's Healthy value changed.
|
||||||
|
StableSince time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperatorHealthReply is a representation of the overall health of the cluster
|
||||||
|
type OperatorHealthReply struct {
|
||||||
|
// Healthy is true if all the servers in the cluster are healthy.
|
||||||
|
Healthy bool
|
||||||
|
|
||||||
|
// FailureTolerance is the number of healthy servers that could be lost without
|
||||||
|
// an outage occurring.
|
||||||
|
FailureTolerance int
|
||||||
|
|
||||||
|
// Servers holds the health of each server.
|
||||||
|
Servers []ServerHealth
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
|
||||||
|
type ReadableDuration time.Duration
|
||||||
|
|
||||||
|
func NewReadableDuration(dur time.Duration) *ReadableDuration {
|
||||||
|
d := ReadableDuration(dur)
|
||||||
|
return &d
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReadableDuration) String() string {
|
||||||
|
return d.Duration().String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReadableDuration) Duration() time.Duration {
|
||||||
|
if d == nil {
|
||||||
|
return time.Duration(0)
|
||||||
|
}
|
||||||
|
return time.Duration(*d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
|
||||||
|
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
|
||||||
|
if d == nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal to nil pointer")
|
||||||
|
}
|
||||||
|
|
||||||
|
str := string(raw)
|
||||||
|
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
|
||||||
|
return fmt.Errorf("must be enclosed with quotes: %s", str)
|
||||||
|
}
|
||||||
|
dur, err := time.ParseDuration(str[1 : len(str)-1])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*d = ReadableDuration(dur)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AutopilotGetConfiguration is used to query the current Autopilot configuration.
|
||||||
|
func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfiguration, error) {
|
||||||
|
r := op.c.newRequest("GET", "/v1/operator/autopilot/configuration")
|
||||||
|
r.setQueryOptions(q)
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var out AutopilotConfiguration
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||||
|
func (op *Operator) AutopilotSetConfiguration(conf *AutopilotConfiguration, q *WriteOptions) error {
|
||||||
|
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||||
|
r.setWriteOptions(q)
|
||||||
|
r.obj = conf
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AutopilotCASConfiguration is used to perform a Check-And-Set update on the
|
||||||
|
// Autopilot configuration. The ModifyIndex value will be respected. Returns
|
||||||
|
// true on success or false on failures.
|
||||||
|
func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *WriteOptions) (bool, error) {
|
||||||
|
r := op.c.newRequest("PUT", "/v1/operator/autopilot/configuration")
|
||||||
|
r.setWriteOptions(q)
|
||||||
|
r.params.Set("cas", strconv.FormatUint(conf.ModifyIndex, 10))
|
||||||
|
r.obj = conf
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||||||
|
return false, fmt.Errorf("Failed to read response: %v", err)
|
||||||
|
}
|
||||||
|
res := strings.Contains(string(buf.Bytes()), "true")
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AutopilotServerHealth
|
||||||
|
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
|
||||||
|
r := op.c.newRequest("GET", "/v1/operator/autopilot/health")
|
||||||
|
r.setQueryOptions(q)
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var out OperatorHealthReply
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClient(t)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
operator := c.Operator()
|
||||||
|
config, err := operator.AutopilotGetConfiguration(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !config.CleanupDeadServers {
|
||||||
|
t.Fatalf("bad: %v", config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change a config setting
|
||||||
|
newConf := &AutopilotConfiguration{CleanupDeadServers: false}
|
||||||
|
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
config, err = operator.AutopilotGetConfiguration(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if config.CleanupDeadServers {
|
||||||
|
t.Fatalf("bad: %v", config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClient(t)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
operator := c.Operator()
|
||||||
|
config, err := operator.AutopilotGetConfiguration(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !config.CleanupDeadServers {
|
||||||
|
t.Fatalf("bad: %v", config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pass an invalid ModifyIndex
|
||||||
|
{
|
||||||
|
newConf := &AutopilotConfiguration{
|
||||||
|
CleanupDeadServers: false,
|
||||||
|
ModifyIndex: config.ModifyIndex - 1,
|
||||||
|
}
|
||||||
|
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if resp {
|
||||||
|
t.Fatalf("bad: %v", resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pass a valid ModifyIndex
|
||||||
|
{
|
||||||
|
newConf := &AutopilotConfiguration{
|
||||||
|
CleanupDeadServers: false,
|
||||||
|
ModifyIndex: config.ModifyIndex,
|
||||||
|
}
|
||||||
|
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !resp {
|
||||||
|
t.Fatalf("bad: %v", resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_AutopilotServerHealth(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||||
|
c.RaftProtocol = 3
|
||||||
|
})
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
operator := c.Operator()
|
||||||
|
if err := testutil.WaitForResult(func() (bool, error) {
|
||||||
|
out, err := operator.AutopilotServerHealth(nil)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Servers) != 1 ||
|
||||||
|
!out.Servers[0].Healthy ||
|
||||||
|
out.Servers[0].Name != s.Config.NodeName {
|
||||||
|
return false, fmt.Errorf("bad: %v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
// keyringRequest is used for performing Keyring operations
|
||||||
|
type keyringRequest struct {
|
||||||
|
Key string
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyringResponse is returned when listing the gossip encryption keys
|
||||||
|
type KeyringResponse struct {
|
||||||
|
// Whether this response is for a WAN ring
|
||||||
|
WAN bool
|
||||||
|
|
||||||
|
// The datacenter name this request corresponds to
|
||||||
|
Datacenter string
|
||||||
|
|
||||||
|
// A map of the encryption keys to the number of nodes they're installed on
|
||||||
|
Keys map[string]int
|
||||||
|
|
||||||
|
// The total number of nodes in this ring
|
||||||
|
NumNodes int
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyringInstall is used to install a new gossip encryption key into the cluster
|
||||||
|
func (op *Operator) KeyringInstall(key string, q *WriteOptions) error {
|
||||||
|
r := op.c.newRequest("POST", "/v1/operator/keyring")
|
||||||
|
r.setWriteOptions(q)
|
||||||
|
r.obj = keyringRequest{
|
||||||
|
Key: key,
|
||||||
|
}
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyringList is used to list the gossip keys installed in the cluster
|
||||||
|
func (op *Operator) KeyringList(q *QueryOptions) ([]*KeyringResponse, error) {
|
||||||
|
r := op.c.newRequest("GET", "/v1/operator/keyring")
|
||||||
|
r.setQueryOptions(q)
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var out []*KeyringResponse
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyringRemove is used to remove a gossip encryption key from the cluster
|
||||||
|
func (op *Operator) KeyringRemove(key string, q *WriteOptions) error {
|
||||||
|
r := op.c.newRequest("DELETE", "/v1/operator/keyring")
|
||||||
|
r.setWriteOptions(q)
|
||||||
|
r.obj = keyringRequest{
|
||||||
|
Key: key,
|
||||||
|
}
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyringUse is used to change the active gossip encryption key
|
||||||
|
func (op *Operator) KeyringUse(key string, q *WriteOptions) error {
|
||||||
|
r := op.c.newRequest("PUT", "/v1/operator/keyring")
|
||||||
|
r.setWriteOptions(q)
|
||||||
|
r.obj = keyringRequest{
|
||||||
|
Key: key,
|
||||||
|
}
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOperator_KeyringInstallListPutRemove(t *testing.T) {
|
||||||
|
oldKey := "d8wu8CSUrqgtjVsvcBPmhQ=="
|
||||||
|
newKey := "qxycTi/SsePj/TZzCBmNXw=="
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||||
|
c.Encrypt = oldKey
|
||||||
|
})
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
operator := c.Operator()
|
||||||
|
if err := operator.KeyringInstall(newKey, nil); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
listResponses, err := operator.KeyringList(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the new key is installed
|
||||||
|
if len(listResponses) != 2 {
|
||||||
|
t.Fatalf("bad: %v", len(listResponses))
|
||||||
|
}
|
||||||
|
for _, response := range listResponses {
|
||||||
|
if len(response.Keys) != 2 {
|
||||||
|
t.Fatalf("bad: %v", len(response.Keys))
|
||||||
|
}
|
||||||
|
if _, ok := response.Keys[oldKey]; !ok {
|
||||||
|
t.Fatalf("bad: %v", ok)
|
||||||
|
}
|
||||||
|
if _, ok := response.Keys[newKey]; !ok {
|
||||||
|
t.Fatalf("bad: %v", ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Switch the primary to the new key
|
||||||
|
if err := operator.KeyringUse(newKey, nil); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := operator.KeyringRemove(oldKey, nil); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
listResponses, err = operator.KeyringList(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the old key is removed
|
||||||
|
if len(listResponses) != 2 {
|
||||||
|
t.Fatalf("bad: %v", len(listResponses))
|
||||||
|
}
|
||||||
|
for _, response := range listResponses {
|
||||||
|
if len(response.Keys) != 1 {
|
||||||
|
t.Fatalf("bad: %v", len(response.Keys))
|
||||||
|
}
|
||||||
|
if _, ok := response.Keys[oldKey]; ok {
|
||||||
|
t.Fatalf("bad: %v", ok)
|
||||||
|
}
|
||||||
|
if _, ok := response.Keys[newKey]; !ok {
|
||||||
|
t.Fatalf("bad: %v", ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
// RaftServer has information about a server in the Raft configuration.
|
||||||
|
type RaftServer struct {
|
||||||
|
// ID is the unique ID for the server. These are currently the same
|
||||||
|
// as the address, but they will be changed to a real GUID in a future
|
||||||
|
// release of Consul.
|
||||||
|
ID string
|
||||||
|
|
||||||
|
// Node is the node name of the server, as known by Consul, or this
|
||||||
|
// will be set to "(unknown)" otherwise.
|
||||||
|
Node string
|
||||||
|
|
||||||
|
// Address is the IP:port of the server, used for Raft communications.
|
||||||
|
Address string
|
||||||
|
|
||||||
|
// Leader is true if this server is the current cluster leader.
|
||||||
|
Leader bool
|
||||||
|
|
||||||
|
// Voter is true if this server has a vote in the cluster. This might
|
||||||
|
// be false if the server is staging and still coming online, or if
|
||||||
|
// it's a non-voting server, which will be added in a future release of
|
||||||
|
// Consul.
|
||||||
|
Voter bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// RaftConfigration is returned when querying for the current Raft configuration.
|
||||||
|
type RaftConfiguration struct {
|
||||||
|
// Servers has the list of servers in the Raft configuration.
|
||||||
|
Servers []*RaftServer
|
||||||
|
|
||||||
|
// Index has the Raft index of this configuration.
|
||||||
|
Index uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// RaftGetConfiguration is used to query the current Raft peer set.
|
||||||
|
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
||||||
|
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
||||||
|
r.setQueryOptions(q)
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var out RaftConfiguration
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
||||||
|
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||||
|
// "IP:port".
|
||||||
|
func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error {
|
||||||
|
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||||
|
r.setWriteOptions(q)
|
||||||
|
|
||||||
|
r.params.Set("address", string(address))
|
||||||
|
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft
|
||||||
|
// quorum but no longer known to Serf or the catalog) by ID.
|
||||||
|
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
|
||||||
|
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
|
||||||
|
r.setWriteOptions(q)
|
||||||
|
|
||||||
|
r.params.Set("id", string(id))
|
||||||
|
|
||||||
|
_, resp, err := requireOK(op.c.doRequest(r))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClient(t)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
operator := c.Operator()
|
||||||
|
out, err := operator.RaftGetConfiguration(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Servers) != 1 ||
|
||||||
|
!out.Servers[0].Leader ||
|
||||||
|
!out.Servers[0].Voter {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClient(t)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
// If we get this error, it proves we sent the address all the way
|
||||||
|
// through.
|
||||||
|
operator := c.Operator()
|
||||||
|
err := operator.RaftRemovePeerByAddress("nope", nil)
|
||||||
|
if err == nil || !strings.Contains(err.Error(),
|
||||||
|
"address \"nope\" was not found in the Raft configuration") {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,206 +0,0 @@
|
||||||
package api
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/testutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
c, s := makeClient(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
operator := c.Operator()
|
|
||||||
out, err := operator.RaftGetConfiguration(nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if len(out.Servers) != 1 ||
|
|
||||||
!out.Servers[0].Leader ||
|
|
||||||
!out.Servers[0].Voter {
|
|
||||||
t.Fatalf("bad: %v", out)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
c, s := makeClient(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
// If we get this error, it proves we sent the address all the way
|
|
||||||
// through.
|
|
||||||
operator := c.Operator()
|
|
||||||
err := operator.RaftRemovePeerByAddress("nope", nil)
|
|
||||||
if err == nil || !strings.Contains(err.Error(),
|
|
||||||
"address \"nope\" was not found in the Raft configuration") {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_KeyringInstallListPutRemove(t *testing.T) {
|
|
||||||
oldKey := "d8wu8CSUrqgtjVsvcBPmhQ=="
|
|
||||||
newKey := "qxycTi/SsePj/TZzCBmNXw=="
|
|
||||||
t.Parallel()
|
|
||||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
|
||||||
c.Encrypt = oldKey
|
|
||||||
})
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
operator := c.Operator()
|
|
||||||
if err := operator.KeyringInstall(newKey, nil); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
listResponses, err := operator.KeyringList(nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure the new key is installed
|
|
||||||
if len(listResponses) != 2 {
|
|
||||||
t.Fatalf("bad: %v", len(listResponses))
|
|
||||||
}
|
|
||||||
for _, response := range listResponses {
|
|
||||||
if len(response.Keys) != 2 {
|
|
||||||
t.Fatalf("bad: %v", len(response.Keys))
|
|
||||||
}
|
|
||||||
if _, ok := response.Keys[oldKey]; !ok {
|
|
||||||
t.Fatalf("bad: %v", ok)
|
|
||||||
}
|
|
||||||
if _, ok := response.Keys[newKey]; !ok {
|
|
||||||
t.Fatalf("bad: %v", ok)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Switch the primary to the new key
|
|
||||||
if err := operator.KeyringUse(newKey, nil); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := operator.KeyringRemove(oldKey, nil); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
listResponses, err = operator.KeyringList(nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure the old key is removed
|
|
||||||
if len(listResponses) != 2 {
|
|
||||||
t.Fatalf("bad: %v", len(listResponses))
|
|
||||||
}
|
|
||||||
for _, response := range listResponses {
|
|
||||||
if len(response.Keys) != 1 {
|
|
||||||
t.Fatalf("bad: %v", len(response.Keys))
|
|
||||||
}
|
|
||||||
if _, ok := response.Keys[oldKey]; ok {
|
|
||||||
t.Fatalf("bad: %v", ok)
|
|
||||||
}
|
|
||||||
if _, ok := response.Keys[newKey]; !ok {
|
|
||||||
t.Fatalf("bad: %v", ok)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_AutopilotGetSetConfiguration(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
c, s := makeClient(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
operator := c.Operator()
|
|
||||||
config, err := operator.AutopilotGetConfiguration(nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if !config.CleanupDeadServers {
|
|
||||||
t.Fatalf("bad: %v", config)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Change a config setting
|
|
||||||
newConf := &AutopilotConfiguration{CleanupDeadServers: false}
|
|
||||||
if err := operator.AutopilotSetConfiguration(newConf, nil); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
config, err = operator.AutopilotGetConfiguration(nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if config.CleanupDeadServers {
|
|
||||||
t.Fatalf("bad: %v", config)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_AutopilotCASConfiguration(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
c, s := makeClient(t)
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
operator := c.Operator()
|
|
||||||
config, err := operator.AutopilotGetConfiguration(nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if !config.CleanupDeadServers {
|
|
||||||
t.Fatalf("bad: %v", config)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pass an invalid ModifyIndex
|
|
||||||
{
|
|
||||||
newConf := &AutopilotConfiguration{
|
|
||||||
CleanupDeadServers: false,
|
|
||||||
ModifyIndex: config.ModifyIndex - 1,
|
|
||||||
}
|
|
||||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if resp {
|
|
||||||
t.Fatalf("bad: %v", resp)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pass a valid ModifyIndex
|
|
||||||
{
|
|
||||||
newConf := &AutopilotConfiguration{
|
|
||||||
CleanupDeadServers: false,
|
|
||||||
ModifyIndex: config.ModifyIndex,
|
|
||||||
}
|
|
||||||
resp, err := operator.AutopilotCASConfiguration(newConf, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if !resp {
|
|
||||||
t.Fatalf("bad: %v", resp)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_ServerHealth(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
|
||||||
c.RaftProtocol = 3
|
|
||||||
})
|
|
||||||
defer s.Stop()
|
|
||||||
|
|
||||||
operator := c.Operator()
|
|
||||||
if err := testutil.WaitForResult(func() (bool, error) {
|
|
||||||
out, err := operator.AutopilotServerHealth(nil)
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("err: %v", err)
|
|
||||||
}
|
|
||||||
if len(out.Servers) != 1 ||
|
|
||||||
!out.Servers[0].Healthy ||
|
|
||||||
out.Servers[0].Name != s.Config.NodeName {
|
|
||||||
return false, fmt.Errorf("bad: %v", out)
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
||||||
|
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
||||||
|
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This action requires operator read access.
|
||||||
|
acl, err := op.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && !acl.OperatorRead() {
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
state := op.srv.fsm.State()
|
||||||
|
_, config, err := state.AutopilotConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = *config
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
||||||
|
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
|
||||||
|
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This action requires operator write access.
|
||||||
|
acl, err := op.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && !acl.OperatorWrite() {
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the update
|
||||||
|
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
|
||||||
|
if err != nil {
|
||||||
|
op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if respErr, ok := resp.(error); ok {
|
||||||
|
return respErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the return type is a bool.
|
||||||
|
if respBool, ok := resp.(bool); ok {
|
||||||
|
*reply = respBool
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerHealth is used to get the current health of the servers.
|
||||||
|
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
|
||||||
|
// This must be sent to the leader, so we fix the args since we are
|
||||||
|
// re-using a structure where we don't support all the options.
|
||||||
|
args.RequireConsistent = true
|
||||||
|
args.AllowStale = false
|
||||||
|
if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This action requires operator read access.
|
||||||
|
acl, err := op.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && !acl.OperatorRead() {
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exit early if the min Raft version is too low
|
||||||
|
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
||||||
|
}
|
||||||
|
if minRaftProtocol < 3 {
|
||||||
|
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = op.srv.getClusterHealth()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,285 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/consul/testutil"
|
||||||
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.AutopilotConfig.CleanupDeadServers = false
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
arg := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
var reply structs.AutopilotConfig
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if reply.CleanupDeadServers {
|
||||||
|
t.Fatalf("bad: %#v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.ACLDatacenter = "dc1"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
c.AutopilotConfig.CleanupDeadServers = false
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Try to get config without permissions
|
||||||
|
arg := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
var reply structs.AutopilotConfig
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an ACL with operator read permissions.
|
||||||
|
var token string
|
||||||
|
{
|
||||||
|
var rules = `
|
||||||
|
operator = "read"
|
||||||
|
`
|
||||||
|
|
||||||
|
req := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: rules,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we can read and verify the config
|
||||||
|
arg.Token = token
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if reply.CleanupDeadServers {
|
||||||
|
t.Fatalf("bad: %#v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.AutopilotConfig.CleanupDeadServers = false
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Change the autopilot config from the default
|
||||||
|
arg := structs.AutopilotSetConfigRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Config: structs.AutopilotConfig{
|
||||||
|
CleanupDeadServers: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var reply *bool
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure it's changed
|
||||||
|
state := s1.fsm.State()
|
||||||
|
_, config, err := state.AutopilotConfig()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !config.CleanupDeadServers {
|
||||||
|
t.Fatalf("bad: %#v", config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.ACLDatacenter = "dc1"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
c.AutopilotConfig.CleanupDeadServers = false
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Try to set config without permissions
|
||||||
|
arg := structs.AutopilotSetConfigRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Config: structs.AutopilotConfig{
|
||||||
|
CleanupDeadServers: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var reply *bool
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an ACL with operator write permissions.
|
||||||
|
var token string
|
||||||
|
{
|
||||||
|
var rules = `
|
||||||
|
operator = "write"
|
||||||
|
`
|
||||||
|
|
||||||
|
req := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: rules,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we can update the config
|
||||||
|
arg.Token = token
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure it's changed
|
||||||
|
state := s1.fsm.State()
|
||||||
|
_, config, err := state.AutopilotConfig()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !config.CleanupDeadServers {
|
||||||
|
t.Fatalf("bad: %#v", config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_ServerHealth(t *testing.T) {
|
||||||
|
conf := func(c *Config) {
|
||||||
|
c.Datacenter = "dc1"
|
||||||
|
c.Bootstrap = false
|
||||||
|
c.BootstrapExpect = 3
|
||||||
|
c.RaftConfig.ProtocolVersion = 3
|
||||||
|
c.ServerHealthInterval = 100 * time.Millisecond
|
||||||
|
c.AutopilotInterval = 100 * time.Millisecond
|
||||||
|
}
|
||||||
|
dir1, s1 := testServerWithConfig(t, conf)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
dir2, s2 := testServerWithConfig(t, conf)
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dir3, s3 := testServerWithConfig(t, conf)
|
||||||
|
defer os.RemoveAll(dir3)
|
||||||
|
defer s3.Shutdown()
|
||||||
|
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
if err := testutil.WaitForResult(func() (bool, error) {
|
||||||
|
arg := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
var reply structs.OperatorHealthReply
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !reply.Healthy {
|
||||||
|
return false, fmt.Errorf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
if reply.FailureTolerance != 1 {
|
||||||
|
return false, fmt.Errorf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
if len(reply.Servers) != 3 {
|
||||||
|
return false, fmt.Errorf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
// Leader should have LastContact == 0, others should be positive
|
||||||
|
for _, s := range reply.Servers {
|
||||||
|
isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address)
|
||||||
|
if isLeader && s.LastContact != 0 {
|
||||||
|
return false, fmt.Errorf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
if !isLeader && s.LastContact <= 0 {
|
||||||
|
return false, fmt.Errorf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.Datacenter = "dc1"
|
||||||
|
c.Bootstrap = true
|
||||||
|
c.RaftConfig.ProtocolVersion = 2
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
arg := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
var reply structs.OperatorHealthReply
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,296 +1,6 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/agent"
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
|
||||||
"github.com/hashicorp/raft"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Operator endpoint is used to perform low-level operator tasks for Consul.
|
// Operator endpoint is used to perform low-level operator tasks for Consul.
|
||||||
type Operator struct {
|
type Operator struct {
|
||||||
srv *Server
|
srv *Server
|
||||||
}
|
}
|
||||||
|
|
||||||
// RaftGetConfiguration is used to retrieve the current Raft configuration.
|
|
||||||
func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error {
|
|
||||||
if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This action requires operator read access.
|
|
||||||
acl, err := op.srv.resolveToken(args.Token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if acl != nil && !acl.OperatorRead() {
|
|
||||||
return permissionDeniedErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// We can't fetch the leader and the configuration atomically with
|
|
||||||
// the current Raft API.
|
|
||||||
future := op.srv.raft.GetConfiguration()
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Index the Consul information about the servers.
|
|
||||||
serverMap := make(map[raft.ServerAddress]serf.Member)
|
|
||||||
for _, member := range op.srv.serfLAN.Members() {
|
|
||||||
valid, parts := agent.IsConsulServer(member)
|
|
||||||
if !valid {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String()
|
|
||||||
serverMap[raft.ServerAddress(addr)] = member
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fill out the reply.
|
|
||||||
leader := op.srv.raft.Leader()
|
|
||||||
reply.Index = future.Index()
|
|
||||||
for _, server := range future.Configuration().Servers {
|
|
||||||
node := "(unknown)"
|
|
||||||
if member, ok := serverMap[server.Address]; ok {
|
|
||||||
node = member.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
entry := &structs.RaftServer{
|
|
||||||
ID: server.ID,
|
|
||||||
Node: node,
|
|
||||||
Address: server.Address,
|
|
||||||
Leader: server.Address == leader,
|
|
||||||
Voter: server.Suffrage == raft.Voter,
|
|
||||||
}
|
|
||||||
reply.Servers = append(reply.Servers, entry)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
|
||||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
|
||||||
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
|
|
||||||
// interface.
|
|
||||||
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
|
||||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is a super dangerous operation that requires operator write
|
|
||||||
// access.
|
|
||||||
acl, err := op.srv.resolveToken(args.Token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if acl != nil && !acl.OperatorWrite() {
|
|
||||||
return permissionDeniedErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Since this is an operation designed for humans to use, we will return
|
|
||||||
// an error if the supplied address isn't among the peers since it's
|
|
||||||
// likely they screwed up.
|
|
||||||
{
|
|
||||||
future := op.srv.raft.GetConfiguration()
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, s := range future.Configuration().Servers {
|
|
||||||
if s.Address == args.Address {
|
|
||||||
args.ID = s.ID
|
|
||||||
goto REMOVE
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Errorf("address %q was not found in the Raft configuration",
|
|
||||||
args.Address)
|
|
||||||
}
|
|
||||||
|
|
||||||
REMOVE:
|
|
||||||
// The Raft library itself will prevent various forms of foot-shooting,
|
|
||||||
// like making a configuration with no voters. Some consideration was
|
|
||||||
// given here to adding more checks, but it was decided to make this as
|
|
||||||
// low-level and direct as possible. We've got ACL coverage to lock this
|
|
||||||
// down, and if you are an operator, it's assumed you know what you are
|
|
||||||
// doing if you are calling this. If you remove a peer that's known to
|
|
||||||
// Serf, for example, it will come back when the leader does a reconcile
|
|
||||||
// pass.
|
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var future raft.Future
|
|
||||||
if minRaftProtocol >= 2 {
|
|
||||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
|
||||||
} else {
|
|
||||||
future = op.srv.raft.RemovePeer(args.Address)
|
|
||||||
}
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v",
|
|
||||||
args.Address, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
|
|
||||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
|
||||||
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
|
|
||||||
// interface.
|
|
||||||
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
|
||||||
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is a super dangerous operation that requires operator write
|
|
||||||
// access.
|
|
||||||
acl, err := op.srv.resolveToken(args.Token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if acl != nil && !acl.OperatorWrite() {
|
|
||||||
return permissionDeniedErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Since this is an operation designed for humans to use, we will return
|
|
||||||
// an error if the supplied id isn't among the peers since it's
|
|
||||||
// likely they screwed up.
|
|
||||||
{
|
|
||||||
future := op.srv.raft.GetConfiguration()
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, s := range future.Configuration().Servers {
|
|
||||||
if s.ID == args.ID {
|
|
||||||
args.Address = s.Address
|
|
||||||
goto REMOVE
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Errorf("id %q was not found in the Raft configuration",
|
|
||||||
args.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
REMOVE:
|
|
||||||
// The Raft library itself will prevent various forms of foot-shooting,
|
|
||||||
// like making a configuration with no voters. Some consideration was
|
|
||||||
// given here to adding more checks, but it was decided to make this as
|
|
||||||
// low-level and direct as possible. We've got ACL coverage to lock this
|
|
||||||
// down, and if you are an operator, it's assumed you know what you are
|
|
||||||
// doing if you are calling this. If you remove a peer that's known to
|
|
||||||
// Serf, for example, it will come back when the leader does a reconcile
|
|
||||||
// pass.
|
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var future raft.Future
|
|
||||||
if minRaftProtocol >= 2 {
|
|
||||||
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
|
||||||
} else {
|
|
||||||
future = op.srv.raft.RemovePeer(args.Address)
|
|
||||||
}
|
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v",
|
|
||||||
args.ID, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
|
|
||||||
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
|
|
||||||
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This action requires operator read access.
|
|
||||||
acl, err := op.srv.resolveToken(args.Token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if acl != nil && !acl.OperatorRead() {
|
|
||||||
return permissionDeniedErr
|
|
||||||
}
|
|
||||||
|
|
||||||
state := op.srv.fsm.State()
|
|
||||||
_, config, err := state.AutopilotConfig()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
*reply = *config
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutopilotSetConfiguration is used to set the current Autopilot configuration.
|
|
||||||
func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
|
|
||||||
if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This action requires operator write access.
|
|
||||||
acl, err := op.srv.resolveToken(args.Token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if acl != nil && !acl.OperatorWrite() {
|
|
||||||
return permissionDeniedErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply the update
|
|
||||||
resp, err := op.srv.raftApply(structs.AutopilotRequestType, args)
|
|
||||||
if err != nil {
|
|
||||||
op.srv.logger.Printf("[ERR] consul.operator: Apply failed: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if respErr, ok := resp.(error); ok {
|
|
||||||
return respErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the return type is a bool.
|
|
||||||
if respBool, ok := resp.(bool); ok {
|
|
||||||
*reply = respBool
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerHealth is used to get the current health of the servers.
|
|
||||||
func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs.OperatorHealthReply) error {
|
|
||||||
// This must be sent to the leader, so we fix the args since we are
|
|
||||||
// re-using a structure where we don't support all the options.
|
|
||||||
args.RequireConsistent = true
|
|
||||||
args.AllowStale = false
|
|
||||||
if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This action requires operator read access.
|
|
||||||
acl, err := op.srv.resolveToken(args.Token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if acl != nil && !acl.OperatorRead() {
|
|
||||||
return permissionDeniedErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exit early if the min Raft version is too low
|
|
||||||
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.LANMembers())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error getting server raft protocol versions: %s", err)
|
|
||||||
}
|
|
||||||
if minRaftProtocol < 3 {
|
|
||||||
return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
|
|
||||||
}
|
|
||||||
|
|
||||||
*reply = op.srv.getClusterHealth()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,200 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/agent"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RaftGetConfiguration is used to retrieve the current Raft configuration.
|
||||||
|
func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error {
|
||||||
|
if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This action requires operator read access.
|
||||||
|
acl, err := op.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && !acl.OperatorRead() {
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can't fetch the leader and the configuration atomically with
|
||||||
|
// the current Raft API.
|
||||||
|
future := op.srv.raft.GetConfiguration()
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index the Consul information about the servers.
|
||||||
|
serverMap := make(map[raft.ServerAddress]serf.Member)
|
||||||
|
for _, member := range op.srv.serfLAN.Members() {
|
||||||
|
valid, parts := agent.IsConsulServer(member)
|
||||||
|
if !valid {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String()
|
||||||
|
serverMap[raft.ServerAddress(addr)] = member
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fill out the reply.
|
||||||
|
leader := op.srv.raft.Leader()
|
||||||
|
reply.Index = future.Index()
|
||||||
|
for _, server := range future.Configuration().Servers {
|
||||||
|
node := "(unknown)"
|
||||||
|
if member, ok := serverMap[server.Address]; ok {
|
||||||
|
node = member.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := &structs.RaftServer{
|
||||||
|
ID: server.ID,
|
||||||
|
Node: node,
|
||||||
|
Address: server.Address,
|
||||||
|
Leader: server.Address == leader,
|
||||||
|
Voter: server.Suffrage == raft.Voter,
|
||||||
|
}
|
||||||
|
reply.Servers = append(reply.Servers, entry)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
||||||
|
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||||
|
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
|
||||||
|
// interface.
|
||||||
|
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||||
|
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a super dangerous operation that requires operator write
|
||||||
|
// access.
|
||||||
|
acl, err := op.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && !acl.OperatorWrite() {
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since this is an operation designed for humans to use, we will return
|
||||||
|
// an error if the supplied address isn't among the peers since it's
|
||||||
|
// likely they screwed up.
|
||||||
|
{
|
||||||
|
future := op.srv.raft.GetConfiguration()
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, s := range future.Configuration().Servers {
|
||||||
|
if s.Address == args.Address {
|
||||||
|
args.ID = s.ID
|
||||||
|
goto REMOVE
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("address %q was not found in the Raft configuration",
|
||||||
|
args.Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
REMOVE:
|
||||||
|
// The Raft library itself will prevent various forms of foot-shooting,
|
||||||
|
// like making a configuration with no voters. Some consideration was
|
||||||
|
// given here to adding more checks, but it was decided to make this as
|
||||||
|
// low-level and direct as possible. We've got ACL coverage to lock this
|
||||||
|
// down, and if you are an operator, it's assumed you know what you are
|
||||||
|
// doing if you are calling this. If you remove a peer that's known to
|
||||||
|
// Serf, for example, it will come back when the leader does a reconcile
|
||||||
|
// pass.
|
||||||
|
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var future raft.Future
|
||||||
|
if minRaftProtocol >= 2 {
|
||||||
|
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||||
|
} else {
|
||||||
|
future = op.srv.raft.RemovePeer(args.Address)
|
||||||
|
}
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v",
|
||||||
|
args.Address, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer %q", args.Address)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
|
||||||
|
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||||
|
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
|
||||||
|
// interface.
|
||||||
|
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
|
||||||
|
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a super dangerous operation that requires operator write
|
||||||
|
// access.
|
||||||
|
acl, err := op.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && !acl.OperatorWrite() {
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since this is an operation designed for humans to use, we will return
|
||||||
|
// an error if the supplied id isn't among the peers since it's
|
||||||
|
// likely they screwed up.
|
||||||
|
{
|
||||||
|
future := op.srv.raft.GetConfiguration()
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, s := range future.Configuration().Servers {
|
||||||
|
if s.ID == args.ID {
|
||||||
|
args.Address = s.Address
|
||||||
|
goto REMOVE
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("id %q was not found in the Raft configuration",
|
||||||
|
args.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
REMOVE:
|
||||||
|
// The Raft library itself will prevent various forms of foot-shooting,
|
||||||
|
// like making a configuration with no voters. Some consideration was
|
||||||
|
// given here to adding more checks, but it was decided to make this as
|
||||||
|
// low-level and direct as possible. We've got ACL coverage to lock this
|
||||||
|
// down, and if you are an operator, it's assumed you know what you are
|
||||||
|
// doing if you are calling this. If you remove a peer that's known to
|
||||||
|
// Serf, for example, it will come back when the leader does a reconcile
|
||||||
|
// pass.
|
||||||
|
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var future raft.Future
|
||||||
|
if minRaftProtocol >= 2 {
|
||||||
|
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
|
||||||
|
} else {
|
||||||
|
future = op.srv.raft.RemovePeer(args.Address)
|
||||||
|
}
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v",
|
||||||
|
args.ID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID)
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -7,8 +7,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/testutil"
|
"github.com/hashicorp/consul/testutil"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
|
@ -361,274 +359,3 @@ func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.AutopilotConfig.CleanupDeadServers = false
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
|
|
||||||
arg := structs.DCSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
}
|
|
||||||
var reply structs.AutopilotConfig
|
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if reply.CleanupDeadServers {
|
|
||||||
t.Fatalf("bad: %#v", reply)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_Autopilot_GetConfiguration_ACLDeny(t *testing.T) {
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.ACLDatacenter = "dc1"
|
|
||||||
c.ACLMasterToken = "root"
|
|
||||||
c.ACLDefaultPolicy = "deny"
|
|
||||||
c.AutopilotConfig.CleanupDeadServers = false
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
|
|
||||||
// Try to get config without permissions
|
|
||||||
arg := structs.DCSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
}
|
|
||||||
var reply structs.AutopilotConfig
|
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
|
||||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create an ACL with operator read permissions.
|
|
||||||
var token string
|
|
||||||
{
|
|
||||||
var rules = `
|
|
||||||
operator = "read"
|
|
||||||
`
|
|
||||||
|
|
||||||
req := structs.ACLRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
Name: "User token",
|
|
||||||
Type: structs.ACLTypeClient,
|
|
||||||
Rules: rules,
|
|
||||||
},
|
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
||||||
}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now we can read and verify the config
|
|
||||||
arg.Token = token
|
|
||||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotGetConfiguration", &arg, &reply)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if reply.CleanupDeadServers {
|
|
||||||
t.Fatalf("bad: %#v", reply)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_Autopilot_SetConfiguration(t *testing.T) {
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.AutopilotConfig.CleanupDeadServers = false
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
|
|
||||||
// Change the autopilot config from the default
|
|
||||||
arg := structs.AutopilotSetConfigRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Config: structs.AutopilotConfig{
|
|
||||||
CleanupDeadServers: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
var reply *bool
|
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure it's changed
|
|
||||||
state := s1.fsm.State()
|
|
||||||
_, config, err := state.AutopilotConfig()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !config.CleanupDeadServers {
|
|
||||||
t.Fatalf("bad: %#v", config)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_Autopilot_SetConfiguration_ACLDeny(t *testing.T) {
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.ACLDatacenter = "dc1"
|
|
||||||
c.ACLMasterToken = "root"
|
|
||||||
c.ACLDefaultPolicy = "deny"
|
|
||||||
c.AutopilotConfig.CleanupDeadServers = false
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
|
|
||||||
// Try to set config without permissions
|
|
||||||
arg := structs.AutopilotSetConfigRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Config: structs.AutopilotConfig{
|
|
||||||
CleanupDeadServers: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
var reply *bool
|
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
|
||||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create an ACL with operator write permissions.
|
|
||||||
var token string
|
|
||||||
{
|
|
||||||
var rules = `
|
|
||||||
operator = "write"
|
|
||||||
`
|
|
||||||
|
|
||||||
req := structs.ACLRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
Name: "User token",
|
|
||||||
Type: structs.ACLTypeClient,
|
|
||||||
Rules: rules,
|
|
||||||
},
|
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
||||||
}
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now we can update the config
|
|
||||||
arg.Token = token
|
|
||||||
err = msgpackrpc.CallWithCodec(codec, "Operator.AutopilotSetConfiguration", &arg, &reply)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure it's changed
|
|
||||||
state := s1.fsm.State()
|
|
||||||
_, config, err := state.AutopilotConfig()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !config.CleanupDeadServers {
|
|
||||||
t.Fatalf("bad: %#v", config)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_ServerHealth(t *testing.T) {
|
|
||||||
conf := func(c *Config) {
|
|
||||||
c.Datacenter = "dc1"
|
|
||||||
c.Bootstrap = false
|
|
||||||
c.BootstrapExpect = 3
|
|
||||||
c.RaftConfig.ProtocolVersion = 3
|
|
||||||
c.ServerHealthInterval = 100 * time.Millisecond
|
|
||||||
c.AutopilotInterval = 100 * time.Millisecond
|
|
||||||
}
|
|
||||||
dir1, s1 := testServerWithConfig(t, conf)
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
dir2, s2 := testServerWithConfig(t, conf)
|
|
||||||
defer os.RemoveAll(dir2)
|
|
||||||
defer s2.Shutdown()
|
|
||||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
|
||||||
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
|
||||||
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dir3, s3 := testServerWithConfig(t, conf)
|
|
||||||
defer os.RemoveAll(dir3)
|
|
||||||
defer s3.Shutdown()
|
|
||||||
if _, err := s3.JoinLAN([]string{addr}); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
|
|
||||||
if err := testutil.WaitForResult(func() (bool, error) {
|
|
||||||
arg := structs.DCSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
}
|
|
||||||
var reply structs.OperatorHealthReply
|
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("err: %v", err)
|
|
||||||
}
|
|
||||||
if !reply.Healthy {
|
|
||||||
return false, fmt.Errorf("bad: %v", reply)
|
|
||||||
}
|
|
||||||
if reply.FailureTolerance != 1 {
|
|
||||||
return false, fmt.Errorf("bad: %v", reply)
|
|
||||||
}
|
|
||||||
if len(reply.Servers) != 3 {
|
|
||||||
return false, fmt.Errorf("bad: %v", reply)
|
|
||||||
}
|
|
||||||
// Leader should have LastContact == 0, others should be positive
|
|
||||||
for _, s := range reply.Servers {
|
|
||||||
isLeader := s1.raft.Leader() == raft.ServerAddress(s.Address)
|
|
||||||
if isLeader && s.LastContact != 0 {
|
|
||||||
return false, fmt.Errorf("bad: %v", reply)
|
|
||||||
}
|
|
||||||
if !isLeader && s.LastContact <= 0 {
|
|
||||||
return false, fmt.Errorf("bad: %v", reply)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
}); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOperator_ServerHealth_UnsupportedRaftVersion(t *testing.T) {
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.Datacenter = "dc1"
|
|
||||||
c.Bootstrap = true
|
|
||||||
c.RaftConfig.ProtocolVersion = 2
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
codec := rpcClient(t, s1)
|
|
||||||
defer codec.Close()
|
|
||||||
|
|
||||||
arg := structs.DCSpecificRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
}
|
|
||||||
var reply structs.OperatorHealthReply
|
|
||||||
err := msgpackrpc.CallWithCodec(codec, "Operator.ServerHealth", &arg, &reply)
|
|
||||||
if err == nil || !strings.Contains(err.Error(), "raft_protocol set to 3 or higher") {
|
|
||||||
t.Fatalf("bad: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue