package command
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"path"
"regexp"
"strconv"
"strings"
"time"
"github.com/armon/consul-api"
"github.com/mitchellh/cli"
)
const (
// rExecFileName is the name of the file we append to
// the path, e.g. _rexec/session_id/job
rExecFileName = "job"
// rExecAck is the suffix added to an ack path
rExecAckSuffix = "/ack"
// rExecAck is the suffix added to an exit code
rExecExitSuffix = "/exit"
// rExecOutputDivider is used to namespace the output
rExecOutputDivider = "/out/"
)
// rExecConf is used to pass around configuration
type rExecConf struct {
datacenter string
prefix string
node string
service string
tag string
wait time . Duration
replWait time . Duration
cmd string
script [ ] byte
verbose bool
}
// rExecEvent is the event we broadcast using a user-event
type rExecEvent struct {
Prefix string
Session string
}
// rExecSpec is the file we upload to specify the parameters
// of the remote execution.
type rExecSpec struct {
// Command is a single command to run directly in the shell
Command string ` json:",omitempty" `
// Script should be spilled to a file and executed
Script [ ] byte ` json:",omitempty" `
// Wait is how long we are waiting on a quiet period to terminate
Wait time . Duration
}
// rExecAck is used to transmit an acknowledgement
type rExecAck struct {
Node string
}
// rExecHeart is used to transmit a heartbeat
type rExecHeart struct {
Node string
}
// rExecOutput is used to transmit a chunk of output
type rExecOutput struct {
Node string
Output [ ] byte
}
// rExecExit is used to transmit an exit code
type rExecExit struct {
Node string
Code int
}
// ExecCommand is a Command implementation that is used to
// do remote execution of commands
type ExecCommand struct {
ShutdownCh <- chan struct { }
Ui cli . Ui
conf rExecConf
client * consulapi . Client
sessionID string
}
func ( c * ExecCommand ) Run ( args [ ] string ) int {
cmdFlags := flag . NewFlagSet ( "exec" , flag . ContinueOnError )
cmdFlags . Usage = func ( ) { c . Ui . Output ( c . Help ( ) ) }
cmdFlags . StringVar ( & c . conf . datacenter , "datacenter" , "" , "" )
cmdFlags . StringVar ( & c . conf . node , "node" , "" , "" )
cmdFlags . StringVar ( & c . conf . service , "service" , "" , "" )
cmdFlags . StringVar ( & c . conf . tag , "tag" , "" , "" )
cmdFlags . StringVar ( & c . conf . prefix , "prefix" , "_rexec" , "" )
cmdFlags . DurationVar ( & c . conf . replWait , "wait-repl" , 100 * time . Millisecond , "" )
cmdFlags . DurationVar ( & c . conf . wait , "wait" , time . Second , "" )
cmdFlags . BoolVar ( & c . conf . verbose , "v" , false , "" )
httpAddr := HTTPAddrFlag ( cmdFlags )
if err := cmdFlags . Parse ( args ) ; err != nil {
return 1
}
// Join the commands to execute
c . conf . cmd = strings . Join ( cmdFlags . Args ( ) , " " )
// If there is no command, read stdin for a script input
if c . conf . cmd == "" {
var buf bytes . Buffer
_ , err := io . Copy ( & buf , os . Stdin )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to read stdin: %v" , err ) )
c . Ui . Error ( "" )
c . Ui . Error ( c . Help ( ) )
return 1
}
c . conf . script = buf . Bytes ( )
}
// Ensure we have a command or script
if c . conf . cmd == "" && len ( c . conf . script ) == 0 {
c . Ui . Error ( "Must specify a command to execute" )
c . Ui . Error ( "" )
c . Ui . Error ( c . Help ( ) )
return 1
}
// Validate the configuration
if err := c . conf . validate ( ) ; err != nil {
c . Ui . Error ( err . Error ( ) )
return 1
}
// Create and test the HTTP client
client , err := HTTPClientDC ( * httpAddr , c . conf . datacenter )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Error connecting to Consul agent: %s" , err ) )
return 1
}
_ , err = client . Agent ( ) . NodeName ( )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Error querying Consul agent: %s" , err ) )
return 1
}
c . client = client
// Create the job spec
spec , err := c . makeRExecSpec ( )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to create job spec: %s" , err ) )
return 1
}
// Create a session for this
c . sessionID , err = c . createSession ( )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to create session: %s" , err ) )
return 1
}
defer c . destroySession ( )
// Upload the payload
if err := c . uploadPayload ( spec ) ; err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to create job file: %s" , err ) )
return 1
}
defer c . destroyData ( )
// Wait for replication. This is done so that when the event is
// received, the job file can be read using a stale read. If the
// stale read fails, we expect a consistent read to be done, so
// largely this is a heuristic.
select {
case <- time . After ( c . conf . replWait ) :
case <- c . ShutdownCh :
return 1
}
// Fire the event
id , err := c . fireEvent ( )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to fire event: %s" , err ) )
return 1
}
if c . conf . verbose {
c . Ui . Output ( fmt . Sprintf ( "Fired remote execution event. ID: %s" , id ) )
}
// Wait for the job to finish now
return c . waitForJob ( )
}
// waitForJob is used to poll for results and wait until the job is terminated
func ( c * ExecCommand ) waitForJob ( ) int {
// Although the session destroy is already deferred, we do it again here,
// because invalidation of the session before destroyData() ensures there is
// no race condition allowing an agent to upload data (the acquire will fail).
defer c . destroySession ( )
start := time . Now ( )
ackCh := make ( chan rExecAck , 128 )
heartCh := make ( chan rExecHeart , 128 )
outputCh := make ( chan rExecOutput , 128 )
exitCh := make ( chan rExecExit , 128 )
doneCh := make ( chan struct { } )
errCh := make ( chan struct { } , 1 )
defer close ( doneCh )
go c . streamResults ( doneCh , ackCh , heartCh , outputCh , exitCh , errCh )
var ackCount , exitCount int
OUTER :
for {
// Determine wait time. We provide a larger window if we know about
// nodes which are still working.
waitIntv := c . conf . wait
if ackCount > exitCount {
waitIntv *= 4
}
select {
case e := <- ackCh :
ackCount ++
c . Ui . Output ( fmt . Sprintf ( "Node %s: acknowledged event" , e . Node ) )
case e := <- outputCh :
c . Ui . Output ( fmt . Sprintf ( "Node %s: %s" , e . Node , e . Output ) )
case e := <- exitCh :
exitCount ++
c . Ui . Output ( fmt . Sprintf ( "Node %s: exited with code %d" , e . Node , e . Code ) )
case <- time . After ( waitIntv ) :
c . Ui . Output ( fmt . Sprintf ( "%d / %d node(s) completed / acknowledged" , exitCount , ackCount ) )
c . Ui . Output ( fmt . Sprintf ( "Exec complete in %0.2f seconds" ,
float64 ( time . Now ( ) . Sub ( start ) ) / float64 ( time . Second ) ) )
break OUTER
case <- errCh :
return 1
case <- c . ShutdownCh :
return 1
}
}
return 0
}
// streamResults is used to perform blocking queries against the KV endpoint and stream in
// notice of various events into waitForJob
func ( c * ExecCommand ) streamResults ( doneCh chan struct { } , ackCh chan rExecAck , heartCh chan rExecHeart ,
outputCh chan rExecOutput , exitCh chan rExecExit , errCh chan struct { } ) {
kv := c . client . KV ( )
opts := consulapi . QueryOptions { WaitTime : c . conf . wait }
dir := path . Join ( c . conf . prefix , c . sessionID ) + "/"
seen := make ( map [ string ] struct { } )
for {
// Check if we've been signaled to exit
select {
case <- doneCh :
return
default :
}
// Block on waiting for new keys
keys , qm , err := kv . Keys ( dir , "" , & opts )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to read results: %s" , err ) )
goto ERR_EXIT
}
// Fast-path the no-change case
if qm . LastIndex == opts . WaitIndex {
continue
}
opts . WaitIndex = qm . LastIndex
// Handle each key
for _ , key := range keys {
// Ignore if we've seen it
if _ , ok := seen [ key ] ; ok {
continue
}
seen [ key ] = struct { } { }
// Trim the directory
full := key
key = strings . TrimPrefix ( key , dir )
// Handle the key type
switch {
case key == rExecFileName :
continue
case strings . HasSuffix ( key , rExecAckSuffix ) :
ackCh <- rExecAck { Node : strings . TrimSuffix ( key , rExecAckSuffix ) }
case strings . HasSuffix ( key , rExecExitSuffix ) :
pair , _ , err := kv . Get ( full , nil )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to read key '%s': %v" , full , err ) )
continue
}
code , err := strconv . ParseInt ( string ( pair . Value ) , 10 , 32 )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to parse exit code '%s': %v" , pair . Value , err ) )
continue
}
exitCh <- rExecExit {
Node : strings . TrimSuffix ( key , rExecExitSuffix ) ,
Code : int ( code ) ,
}
case strings . LastIndex ( key , rExecOutputDivider ) != - 1 :
pair , _ , err := kv . Get ( full , nil )
if err != nil {
c . Ui . Error ( fmt . Sprintf ( "Failed to read key '%s': %v" , full , err ) )
continue
}
idx := strings . LastIndex ( key , rExecOutputDivider )
node := key [ : idx ]
if len ( pair . Value ) == 0 {
heartCh <- rExecHeart { Node : node }
} else {
outputCh <- rExecOutput { Node : node , Output : pair . Value }
}
default :
c . Ui . Error ( fmt . Sprintf ( "Unknown key '%s', ignoring." , key ) )
}
}
}
return
ERR_EXIT :
select {
case errCh <- struct { } { } :
default :
}
}
// validate checks that the configuration is sane
func ( conf * rExecConf ) validate ( ) error {
// Validate the filters
if conf . node != "" {
if _ , err := regexp . Compile ( conf . node ) ; err != nil {
return fmt . Errorf ( "Failed to compile node filter regexp: %v" , err )
}
}
if conf . service != "" {
if _ , err := regexp . Compile ( conf . service ) ; err != nil {
return fmt . Errorf ( "Failed to compile service filter regexp: %v" , err )
}
}
if conf . tag != "" {
if _ , err := regexp . Compile ( conf . tag ) ; err != nil {
return fmt . Errorf ( "Failed to compile tag filter regexp: %v" , err )
}
}
if conf . tag != "" && conf . service == "" {
return fmt . Errorf ( "Cannot provide tag filter without service filter." )
}
return nil
}
// createSession is used to create a new session for this command
func ( c * ExecCommand ) createSession ( ) ( string , error ) {
session := c . client . Session ( )
se := consulapi . SessionEntry {
Name : "Remote Exec" ,
}
id , _ , err := session . Create ( & se , nil )
return id , err
}
// destroySession is used to destroy the associated session
func ( c * ExecCommand ) destroySession ( ) error {
session := c . client . Session ( )
_ , err := session . Destroy ( c . sessionID , nil )
return err
}
// makeRExecSpec creates a serialized job specification
// that can be uploaded which will be parsed by agents to
// determine what to do.
func ( c * ExecCommand ) makeRExecSpec ( ) ( [ ] byte , error ) {
spec := & rExecSpec {
Command : c . conf . cmd ,
Script : c . conf . script ,
Wait : c . conf . wait ,
}
return json . Marshal ( spec )
}
// uploadPayload is used to upload the request payload
func ( c * ExecCommand ) uploadPayload ( payload [ ] byte ) error {
kv := c . client . KV ( )
pair := consulapi . KVPair {
Key : path . Join ( c . conf . prefix , c . sessionID , rExecFileName ) ,
Value : payload ,
Session : c . sessionID ,
}
ok , _ , err := kv . Acquire ( & pair , nil )
if err != nil {
return err
}
if ! ok {
return fmt . Errorf ( "failed to acquire key %s" , pair . Key )
}
return nil
}
// destroyData is used to nuke all the data associated with
// this remote exec. We just do a recursive delete of our
// data directory.
func ( c * ExecCommand ) destroyData ( ) error {
kv := c . client . KV ( )
dir := path . Join ( c . conf . prefix , c . sessionID )
_ , err := kv . DeleteTree ( dir , nil )
return err
}
// fireEvent is used to fire the event that will notify nodes
// about the remote execution. Returns the event ID or error
func ( c * ExecCommand ) fireEvent ( ) ( string , error ) {
// Create the user event payload
msg := & rExecEvent {
Prefix : c . conf . prefix ,
Session : c . sessionID ,
}
buf , err := json . Marshal ( msg )
if err != nil {
return "" , err
}
// Format the user event
event := c . client . Event ( )
params := & consulapi . UserEvent {
Name : "_rexec" ,
Payload : buf ,
NodeFilter : c . conf . node ,
ServiceFilter : c . conf . service ,
TagFilter : c . conf . tag ,
}
// Fire the event
id , _ , err := event . Fire ( params , nil )
return id , err
}
func ( c * ExecCommand ) Synopsis ( ) string {
return "Executes a command on Consul nodes"
}
func ( c * ExecCommand ) Help ( ) string {
helpText := `
Usage : consul exec [ options ] [ command ... ]
Evaluates a command on remote Consul nodes . The nodes responding can
be filtered using regular expressions on node name , service , and tag
definitions . If a command is not provided , stdin will be read until EOF
and used as a script input .
Options :
- http - addr = 127.0 .0 .1 : 8500 HTTP address of the Consul agent .
- datacenter = "" Datacenter to dispatch in . Defaults to that of agent .
- prefix = "_rexec" Prefix in the KV store to use for request data
- node = "" Regular expression to filter on node names
- service = "" Regular expression to filter on service instances
- tag = "" Regular expression to filter on service tags . Must be used
with - service .
- wait = 1 s Period to wait with no responses before terminating execution .
- wait - repl = 100 ms Period to wait for replication before firing event . This is an
optimization to allow stale reads to be performed .
- v Enables verbose output
`
return strings . TrimSpace ( helpText )
}