command/exec: Fixing use of shutdown ch

pull/312/head
Armon Dadgar 2014-08-31 22:46:08 -07:00
parent 53777527e0
commit 61a2170b7d
1 changed files with 9 additions and 5 deletions

View File

@ -94,7 +94,7 @@ type rExecExit struct {
// ExecCommand is a Command implementation that is used to // ExecCommand is a Command implementation that is used to
// do remote execution of commands // do remote execution of commands
type ExecCommand struct { type ExecCommand struct {
ShutdownCh chan struct{} ShutdownCh <-chan struct{}
Ui cli.Ui Ui cli.Ui
conf rExecConf conf rExecConf
client *consulapi.Client client *consulapi.Client
@ -218,8 +218,9 @@ func (c *ExecCommand) waitForJob() int {
outputCh := make(chan rExecOutput, 128) outputCh := make(chan rExecOutput, 128)
exitCh := make(chan rExecExit, 128) exitCh := make(chan rExecExit, 128)
doneCh := make(chan struct{}) doneCh := make(chan struct{})
errCh := make(chan struct{}, 1)
defer close(doneCh) defer close(doneCh)
go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh) go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh)
var ackCount, exitCount int var ackCount, exitCount int
OUTER: OUTER:
for { for {
@ -243,11 +244,14 @@ OUTER:
c.Ui.Output(fmt.Sprintf("Node %s: exited with code %d", e.Node, e.Code)) c.Ui.Output(fmt.Sprintf("Node %s: exited with code %d", e.Node, e.Code))
case <-time.After(waitIntv): case <-time.After(waitIntv):
c.Ui.Output(fmt.Sprintf("%d nodes completed, %d nodes acknowledged", exitCount, ackCount)) c.Ui.Output(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount))
c.Ui.Output(fmt.Sprintf("Exec complete in %0.2f seconds", c.Ui.Output(fmt.Sprintf("Exec complete in %0.2f seconds",
float64(time.Now().Sub(start))/float64(time.Second))) float64(time.Now().Sub(start))/float64(time.Second)))
break OUTER break OUTER
case <-errCh:
return 1
case <-c.ShutdownCh: case <-c.ShutdownCh:
return 1 return 1
} }
@ -258,7 +262,7 @@ OUTER:
// streamResults is used to perform blocking queries against the KV endpoint and stream in // streamResults is used to perform blocking queries against the KV endpoint and stream in
// notice of various events into waitForJob // notice of various events into waitForJob
func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart, func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
outputCh chan rExecOutput, exitCh chan rExecExit) { outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) {
kv := c.client.KV() kv := c.client.KV()
opts := consulapi.QueryOptions{WaitTime: c.conf.wait} opts := consulapi.QueryOptions{WaitTime: c.conf.wait}
dir := path.Join(c.conf.prefix, c.sessionID) + "/" dir := path.Join(c.conf.prefix, c.sessionID) + "/"
@ -343,7 +347,7 @@ func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, h
ERR_EXIT: ERR_EXIT:
select { select {
case c.ShutdownCh <- struct{}{}: case errCh <- struct{}{}:
default: default:
} }
} }