mirror of https://github.com/hashicorp/consul
669 lines
17 KiB
Go
669 lines
17 KiB
Go
package command
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"unicode"
|
|
|
|
consulapi "github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/command/base"
|
|
"github.com/mitchellh/cli"
|
|
)
|
|
|
|
const (
|
|
// rExecPrefix is the prefix in the KV store used to
|
|
// store the remote exec data
|
|
rExecPrefix = "_rexec"
|
|
|
|
// rExecFileName is the name of the file we append to
|
|
// the path, e.g. _rexec/session_id/job
|
|
rExecFileName = "job"
|
|
|
|
// rExecAck is the suffix added to an ack path
|
|
rExecAckSuffix = "/ack"
|
|
|
|
// rExecAck is the suffix added to an exit code
|
|
rExecExitSuffix = "/exit"
|
|
|
|
// rExecOutputDivider is used to namespace the output
|
|
rExecOutputDivider = "/out/"
|
|
|
|
// rExecReplicationWait is how long we wait for replication
|
|
rExecReplicationWait = 200 * time.Millisecond
|
|
|
|
// rExecQuietWait is how long we wait for no responses
|
|
// before assuming the job is done.
|
|
rExecQuietWait = 2 * time.Second
|
|
|
|
// rExecTTL is how long we default the session TTL to
|
|
rExecTTL = "15s"
|
|
|
|
// rExecRenewInterval is how often we renew the session TTL
|
|
// when doing an exec in a foreign DC.
|
|
rExecRenewInterval = 5 * time.Second
|
|
)
|
|
|
|
// rExecConf is used to pass around configuration
|
|
type rExecConf struct {
|
|
prefix string
|
|
|
|
foreignDC bool
|
|
localDC string
|
|
localNode string
|
|
|
|
node string
|
|
service string
|
|
tag string
|
|
|
|
wait time.Duration
|
|
replWait time.Duration
|
|
|
|
cmd string
|
|
script []byte
|
|
|
|
verbose bool
|
|
}
|
|
|
|
// rExecEvent is the event we broadcast using a user-event
|
|
type rExecEvent struct {
|
|
Prefix string
|
|
Session string
|
|
}
|
|
|
|
// rExecSpec is the file we upload to specify the parameters
|
|
// of the remote execution.
|
|
type rExecSpec struct {
|
|
// Command is a single command to run directly in the shell
|
|
Command string `json:",omitempty"`
|
|
|
|
// Script should be spilled to a file and executed
|
|
Script []byte `json:",omitempty"`
|
|
|
|
// Wait is how long we are waiting on a quiet period to terminate
|
|
Wait time.Duration
|
|
}
|
|
|
|
// rExecAck is used to transmit an acknowledgement
|
|
type rExecAck struct {
|
|
Node string
|
|
}
|
|
|
|
// rExecHeart is used to transmit a heartbeat
|
|
type rExecHeart struct {
|
|
Node string
|
|
}
|
|
|
|
// rExecOutput is used to transmit a chunk of output
|
|
type rExecOutput struct {
|
|
Node string
|
|
Output []byte
|
|
}
|
|
|
|
// rExecExit is used to transmit an exit code
|
|
type rExecExit struct {
|
|
Node string
|
|
Code int
|
|
}
|
|
|
|
// ExecCommand is a Command implementation that is used to
|
|
// do remote execution of commands
|
|
type ExecCommand struct {
|
|
base.Command
|
|
|
|
ShutdownCh <-chan struct{}
|
|
conf rExecConf
|
|
client *consulapi.Client
|
|
sessionID string
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
func (c *ExecCommand) Run(args []string) int {
|
|
f := c.Command.NewFlagSet(c)
|
|
f.StringVar(&c.conf.node, "node", "",
|
|
"Regular expression to filter on node names.")
|
|
f.StringVar(&c.conf.service, "service", "",
|
|
"Regular expression to filter on service instances.")
|
|
f.StringVar(&c.conf.tag, "tag", "",
|
|
"Regular expression to filter on service tags. Must be used with -service.")
|
|
f.StringVar(&c.conf.prefix, "prefix", rExecPrefix,
|
|
"Prefix in the KV store to use for request data.")
|
|
f.DurationVar(&c.conf.wait, "wait", rExecQuietWait,
|
|
"Period to wait with no responses before terminating execution.")
|
|
f.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait,
|
|
"Period to wait for replication before firing event. This is an "+
|
|
"optimization to allow stale reads to be performed.")
|
|
f.BoolVar(&c.conf.verbose, "verbose", false,
|
|
"Enables verbose output.")
|
|
|
|
if err := c.Command.Parse(args); err != nil {
|
|
return 1
|
|
}
|
|
|
|
// Join the commands to execute
|
|
c.conf.cmd = strings.Join(f.Args(), " ")
|
|
|
|
// If there is no command, read stdin for a script input
|
|
if c.conf.cmd == "-" {
|
|
c.conf.cmd = ""
|
|
var buf bytes.Buffer
|
|
_, err := io.Copy(&buf, os.Stdin)
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to read stdin: %v", err))
|
|
c.UI.Error("")
|
|
c.UI.Error(c.Help())
|
|
return 1
|
|
}
|
|
c.conf.script = buf.Bytes()
|
|
}
|
|
|
|
// Ensure we have a command or script
|
|
if c.conf.cmd == "" && len(c.conf.script) == 0 {
|
|
c.UI.Error("Must specify a command to execute")
|
|
c.UI.Error("")
|
|
c.UI.Error(c.Help())
|
|
return 1
|
|
}
|
|
|
|
// Validate the configuration
|
|
if err := c.conf.validate(); err != nil {
|
|
c.UI.Error(err.Error())
|
|
return 1
|
|
}
|
|
|
|
// Create and test the HTTP client
|
|
client, err := c.Command.HTTPClient()
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
|
|
return 1
|
|
}
|
|
info, err := client.Agent().Self()
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
|
|
return 1
|
|
}
|
|
c.client = client
|
|
|
|
// Check if this is a foreign datacenter
|
|
if c.Command.HTTPDatacenter() != "" && c.Command.HTTPDatacenter() != info["Config"]["Datacenter"] {
|
|
if c.conf.verbose {
|
|
c.UI.Info("Remote exec in foreign datacenter, using Session TTL")
|
|
}
|
|
c.conf.foreignDC = true
|
|
c.conf.localDC = info["Config"]["Datacenter"].(string)
|
|
c.conf.localNode = info["Config"]["NodeName"].(string)
|
|
}
|
|
|
|
// Create the job spec
|
|
spec, err := c.makeRExecSpec()
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to create job spec: %s", err))
|
|
return 1
|
|
}
|
|
|
|
// Create a session for this
|
|
c.sessionID, err = c.createSession()
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to create session: %s", err))
|
|
return 1
|
|
}
|
|
defer c.destroySession()
|
|
if c.conf.verbose {
|
|
c.UI.Info(fmt.Sprintf("Created remote execution session: %s", c.sessionID))
|
|
}
|
|
|
|
// Upload the payload
|
|
if err := c.uploadPayload(spec); err != nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to create job file: %s", err))
|
|
return 1
|
|
}
|
|
defer c.destroyData()
|
|
if c.conf.verbose {
|
|
c.UI.Info(fmt.Sprintf("Uploaded remote execution spec"))
|
|
}
|
|
|
|
// Wait for replication. This is done so that when the event is
|
|
// received, the job file can be read using a stale read. If the
|
|
// stale read fails, we expect a consistent read to be done, so
|
|
// largely this is a heuristic.
|
|
select {
|
|
case <-time.After(c.conf.replWait):
|
|
case <-c.ShutdownCh:
|
|
return 1
|
|
}
|
|
|
|
// Fire the event
|
|
id, err := c.fireEvent()
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to fire event: %s", err))
|
|
return 1
|
|
}
|
|
if c.conf.verbose {
|
|
c.UI.Info(fmt.Sprintf("Fired remote execution event: %s", id))
|
|
}
|
|
|
|
// Wait for the job to finish now
|
|
return c.waitForJob()
|
|
}
|
|
|
|
// waitForJob is used to poll for results and wait until the job is terminated
|
|
func (c *ExecCommand) waitForJob() int {
|
|
// Although the session destroy is already deferred, we do it again here,
|
|
// because invalidation of the session before destroyData() ensures there is
|
|
// no race condition allowing an agent to upload data (the acquire will fail).
|
|
defer c.destroySession()
|
|
start := time.Now()
|
|
ackCh := make(chan rExecAck, 128)
|
|
heartCh := make(chan rExecHeart, 128)
|
|
outputCh := make(chan rExecOutput, 128)
|
|
exitCh := make(chan rExecExit, 128)
|
|
doneCh := make(chan struct{})
|
|
errCh := make(chan struct{}, 1)
|
|
defer close(doneCh)
|
|
go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh)
|
|
target := &TargetedUI{UI: c.UI}
|
|
|
|
var ackCount, exitCount, badExit int
|
|
OUTER:
|
|
for {
|
|
// Determine wait time. We provide a larger window if we know about
|
|
// nodes which are still working.
|
|
waitIntv := c.conf.wait
|
|
if ackCount > exitCount {
|
|
waitIntv *= 2
|
|
}
|
|
|
|
select {
|
|
case e := <-ackCh:
|
|
ackCount++
|
|
if c.conf.verbose {
|
|
target.Target = e.Node
|
|
target.Info("acknowledged")
|
|
}
|
|
|
|
case h := <-heartCh:
|
|
if c.conf.verbose {
|
|
target.Target = h.Node
|
|
target.Info("heartbeat received")
|
|
}
|
|
|
|
case e := <-outputCh:
|
|
target.Target = e.Node
|
|
target.Output(string(e.Output))
|
|
|
|
case e := <-exitCh:
|
|
exitCount++
|
|
target.Target = e.Node
|
|
target.Info(fmt.Sprintf("finished with exit code %d", e.Code))
|
|
if e.Code != 0 {
|
|
badExit++
|
|
}
|
|
|
|
case <-time.After(waitIntv):
|
|
c.UI.Info(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount))
|
|
if c.conf.verbose {
|
|
c.UI.Info(fmt.Sprintf("Completed in %0.2f seconds",
|
|
float64(time.Now().Sub(start))/float64(time.Second)))
|
|
}
|
|
if exitCount < ackCount {
|
|
badExit++
|
|
}
|
|
break OUTER
|
|
|
|
case <-errCh:
|
|
return 1
|
|
|
|
case <-c.ShutdownCh:
|
|
return 1
|
|
}
|
|
}
|
|
|
|
if badExit > 0 {
|
|
return 2
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// streamResults is used to perform blocking queries against the KV endpoint and stream in
|
|
// notice of various events into waitForJob
|
|
func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
|
|
outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) {
|
|
kv := c.client.KV()
|
|
opts := consulapi.QueryOptions{WaitTime: c.conf.wait}
|
|
dir := path.Join(c.conf.prefix, c.sessionID) + "/"
|
|
seen := make(map[string]struct{})
|
|
|
|
for {
|
|
// Check if we've been signaled to exit
|
|
select {
|
|
case <-doneCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Block on waiting for new keys
|
|
keys, qm, err := kv.Keys(dir, "", &opts)
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to read results: %s", err))
|
|
goto ERR_EXIT
|
|
}
|
|
|
|
// Fast-path the no-change case
|
|
if qm.LastIndex == opts.WaitIndex {
|
|
continue
|
|
}
|
|
opts.WaitIndex = qm.LastIndex
|
|
|
|
// Handle each key
|
|
for _, key := range keys {
|
|
// Ignore if we've seen it
|
|
if _, ok := seen[key]; ok {
|
|
continue
|
|
}
|
|
seen[key] = struct{}{}
|
|
|
|
// Trim the directory
|
|
full := key
|
|
key = strings.TrimPrefix(key, dir)
|
|
|
|
// Handle the key type
|
|
switch {
|
|
case key == rExecFileName:
|
|
continue
|
|
case strings.HasSuffix(key, rExecAckSuffix):
|
|
ackCh <- rExecAck{Node: strings.TrimSuffix(key, rExecAckSuffix)}
|
|
|
|
case strings.HasSuffix(key, rExecExitSuffix):
|
|
pair, _, err := kv.Get(full, nil)
|
|
if err != nil || pair == nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
|
|
continue
|
|
}
|
|
code, err := strconv.ParseInt(string(pair.Value), 10, 32)
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to parse exit code '%s': %v", pair.Value, err))
|
|
continue
|
|
}
|
|
exitCh <- rExecExit{
|
|
Node: strings.TrimSuffix(key, rExecExitSuffix),
|
|
Code: int(code),
|
|
}
|
|
|
|
case strings.LastIndex(key, rExecOutputDivider) != -1:
|
|
pair, _, err := kv.Get(full, nil)
|
|
if err != nil || pair == nil {
|
|
c.UI.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
|
|
continue
|
|
}
|
|
idx := strings.LastIndex(key, rExecOutputDivider)
|
|
node := key[:idx]
|
|
if len(pair.Value) == 0 {
|
|
heartCh <- rExecHeart{Node: node}
|
|
} else {
|
|
outputCh <- rExecOutput{Node: node, Output: pair.Value}
|
|
}
|
|
|
|
default:
|
|
c.UI.Error(fmt.Sprintf("Unknown key '%s', ignoring.", key))
|
|
}
|
|
}
|
|
}
|
|
|
|
ERR_EXIT:
|
|
select {
|
|
case errCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// validate checks that the configuration is sane
|
|
func (conf *rExecConf) validate() error {
|
|
// Validate the filters
|
|
if conf.node != "" {
|
|
if _, err := regexp.Compile(conf.node); err != nil {
|
|
return fmt.Errorf("Failed to compile node filter regexp: %v", err)
|
|
}
|
|
}
|
|
if conf.service != "" {
|
|
if _, err := regexp.Compile(conf.service); err != nil {
|
|
return fmt.Errorf("Failed to compile service filter regexp: %v", err)
|
|
}
|
|
}
|
|
if conf.tag != "" {
|
|
if _, err := regexp.Compile(conf.tag); err != nil {
|
|
return fmt.Errorf("Failed to compile tag filter regexp: %v", err)
|
|
}
|
|
}
|
|
if conf.tag != "" && conf.service == "" {
|
|
return fmt.Errorf("Cannot provide tag filter without service filter.")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// createSession is used to create a new session for this command
|
|
func (c *ExecCommand) createSession() (string, error) {
|
|
var id string
|
|
var err error
|
|
if c.conf.foreignDC {
|
|
id, err = c.createSessionForeign()
|
|
} else {
|
|
id, err = c.createSessionLocal()
|
|
}
|
|
if err == nil {
|
|
c.stopCh = make(chan struct{})
|
|
go c.renewSession(id, c.stopCh)
|
|
}
|
|
return id, err
|
|
}
|
|
|
|
// createSessionLocal is used to create a new session in a local datacenter
|
|
// This is simpler since we can use the local agent to create the session.
|
|
func (c *ExecCommand) createSessionLocal() (string, error) {
|
|
session := c.client.Session()
|
|
se := consulapi.SessionEntry{
|
|
Name: "Remote Exec",
|
|
Behavior: consulapi.SessionBehaviorDelete,
|
|
TTL: rExecTTL,
|
|
}
|
|
id, _, err := session.Create(&se, nil)
|
|
return id, err
|
|
}
|
|
|
|
// createSessionLocal is used to create a new session in a foreign datacenter
|
|
// This is more complex since the local agent cannot be used to create
|
|
// a session, and we must associate with a node in the remote datacenter.
|
|
func (c *ExecCommand) createSessionForeign() (string, error) {
|
|
// Look for a remote node to bind to
|
|
health := c.client.Health()
|
|
services, _, err := health.Service("consul", "", true, nil)
|
|
if err != nil {
|
|
return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err)
|
|
}
|
|
if len(services) == 0 {
|
|
return "", fmt.Errorf("Failed to find Consul server in remote datacenter")
|
|
}
|
|
node := services[0].Node.Node
|
|
if c.conf.verbose {
|
|
c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s",
|
|
node, c.Command.HTTPDatacenter()))
|
|
}
|
|
|
|
session := c.client.Session()
|
|
se := consulapi.SessionEntry{
|
|
Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC),
|
|
Node: node,
|
|
Checks: []string{},
|
|
Behavior: consulapi.SessionBehaviorDelete,
|
|
TTL: rExecTTL,
|
|
}
|
|
id, _, err := session.CreateNoChecks(&se, nil)
|
|
return id, err
|
|
}
|
|
|
|
// renewSession is a long running routine that periodically renews
|
|
// the session TTL. This is used for foreign sessions where we depend
|
|
// on TTLs.
|
|
func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) {
|
|
session := c.client.Session()
|
|
for {
|
|
select {
|
|
case <-time.After(rExecRenewInterval):
|
|
_, _, err := session.Renew(id, nil)
|
|
if err != nil {
|
|
c.UI.Error(fmt.Sprintf("Session renew failed: %v", err))
|
|
return
|
|
}
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// destroySession is used to destroy the associated session
|
|
func (c *ExecCommand) destroySession() error {
|
|
// Stop the session renew if any
|
|
if c.stopCh != nil {
|
|
close(c.stopCh)
|
|
c.stopCh = nil
|
|
}
|
|
|
|
// Destroy the session explicitly
|
|
session := c.client.Session()
|
|
_, err := session.Destroy(c.sessionID, nil)
|
|
return err
|
|
}
|
|
|
|
// makeRExecSpec creates a serialized job specification
|
|
// that can be uploaded which will be parsed by agents to
|
|
// determine what to do.
|
|
func (c *ExecCommand) makeRExecSpec() ([]byte, error) {
|
|
spec := &rExecSpec{
|
|
Command: c.conf.cmd,
|
|
Script: c.conf.script,
|
|
Wait: c.conf.wait,
|
|
}
|
|
return json.Marshal(spec)
|
|
}
|
|
|
|
// uploadPayload is used to upload the request payload
|
|
func (c *ExecCommand) uploadPayload(payload []byte) error {
|
|
kv := c.client.KV()
|
|
pair := consulapi.KVPair{
|
|
Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName),
|
|
Value: payload,
|
|
Session: c.sessionID,
|
|
}
|
|
ok, _, err := kv.Acquire(&pair, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !ok {
|
|
return fmt.Errorf("failed to acquire key %s", pair.Key)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// destroyData is used to nuke all the data associated with
|
|
// this remote exec. We just do a recursive delete of our
|
|
// data directory.
|
|
func (c *ExecCommand) destroyData() error {
|
|
kv := c.client.KV()
|
|
dir := path.Join(c.conf.prefix, c.sessionID)
|
|
_, err := kv.DeleteTree(dir, nil)
|
|
return err
|
|
}
|
|
|
|
// fireEvent is used to fire the event that will notify nodes
|
|
// about the remote execution. Returns the event ID or error
|
|
func (c *ExecCommand) fireEvent() (string, error) {
|
|
// Create the user event payload
|
|
msg := &rExecEvent{
|
|
Prefix: c.conf.prefix,
|
|
Session: c.sessionID,
|
|
}
|
|
buf, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Format the user event
|
|
event := c.client.Event()
|
|
params := &consulapi.UserEvent{
|
|
Name: "_rexec",
|
|
Payload: buf,
|
|
NodeFilter: c.conf.node,
|
|
ServiceFilter: c.conf.service,
|
|
TagFilter: c.conf.tag,
|
|
}
|
|
|
|
// Fire the event
|
|
id, _, err := event.Fire(params, nil)
|
|
return id, err
|
|
}
|
|
|
|
func (c *ExecCommand) Synopsis() string {
|
|
return "Executes a command on Consul nodes"
|
|
}
|
|
|
|
func (c *ExecCommand) Help() string {
|
|
helpText := `
|
|
Usage: consul exec [options] [-|command...]
|
|
|
|
Evaluates a command on remote Consul nodes. The nodes responding can
|
|
be filtered using regular expressions on node name, service, and tag
|
|
definitions. If a command is '-', stdin will be read until EOF
|
|
and used as a script input.
|
|
|
|
` + c.Command.Help()
|
|
|
|
return strings.TrimSpace(helpText)
|
|
}
|
|
|
|
// TargetedUI is a UI that wraps another UI implementation and modifies
|
|
// the output to indicate a specific target. Specifically, all Say output
|
|
// is prefixed with the target name. Message output is not prefixed but
|
|
// is offset by the length of the target so that output is lined up properly
|
|
// with Say output. Machine-readable output has the proper target set.
|
|
type TargetedUI struct {
|
|
Target string
|
|
UI cli.Ui
|
|
}
|
|
|
|
func (u *TargetedUI) Ask(query string) (string, error) {
|
|
return u.UI.Ask(u.prefixLines(true, query))
|
|
}
|
|
|
|
func (u *TargetedUI) Info(message string) {
|
|
u.UI.Info(u.prefixLines(true, message))
|
|
}
|
|
|
|
func (u *TargetedUI) Output(message string) {
|
|
u.UI.Output(u.prefixLines(false, message))
|
|
}
|
|
|
|
func (u *TargetedUI) Error(message string) {
|
|
u.UI.Error(u.prefixLines(true, message))
|
|
}
|
|
|
|
func (u *TargetedUI) prefixLines(arrow bool, message string) string {
|
|
arrowText := "==>"
|
|
if !arrow {
|
|
arrowText = strings.Repeat(" ", len(arrowText))
|
|
}
|
|
|
|
var result bytes.Buffer
|
|
|
|
for _, line := range strings.Split(message, "\n") {
|
|
result.WriteString(fmt.Sprintf("%s %s: %s\n", arrowText, u.Target, line))
|
|
}
|
|
|
|
return strings.TrimRightFunc(result.String(), unicode.IsSpace)
|
|
}
|