agent: Refactor remote exec write code

pull/312/head
Armon Dadgar 10 years ago
parent a0c6dbfe2a
commit 9ba4f31fde

@ -264,70 +264,49 @@ QUERY:
// remoteExecWriteAck is used to write an ack. Returns if execution should // remoteExecWriteAck is used to write an ack. Returns if execution should
// continue. // continue.
func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool { func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool {
write := structs.KVSRequest{ if err := a.remoteExecWriteKey(event, remoteExecAckSuffix, nil); err != nil {
Datacenter: a.config.Datacenter,
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: path.Join(event.Prefix, event.Session,
a.config.NodeName, remoteExecAckSuffix),
Session: event.Session,
},
}
var success bool
if err := a.RPC("KVS.Apply", &write, &success); err != nil {
a.logger.Printf("[ERR] agent: failed to ack remote exec job: %v", err) a.logger.Printf("[ERR] agent: failed to ack remote exec job: %v", err)
return false return false
} }
if !success {
a.logger.Printf("[DEBUG] agent: remote exec aborted, ack failed")
return false
}
return true return true
} }
// remoteExecWriteOutput is used to write output // remoteExecWriteOutput is used to write output
func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool { func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool {
outputNum := fmt.Sprintf("%05x", num) suffix := path.Join(remoteExecOutputDivider, fmt.Sprintf("%05x", num))
key := path.Join(event.Prefix, event.Session, if err := a.remoteExecWriteKey(event, suffix, output); err != nil {
a.config.NodeName, remoteExecOutputDivider, outputNum)
write := structs.KVSRequest{
Datacenter: a.config.Datacenter,
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: key,
Value: output,
Session: event.Session,
},
}
var success bool
if err := a.RPC("KVS.Apply", &write, &success); err != nil {
a.logger.Printf("[ERR] agent: failed to write output for remote exec job: %v", err) a.logger.Printf("[ERR] agent: failed to write output for remote exec job: %v", err)
return false return false
} }
if !success {
a.logger.Printf("[DEBUG] agent: remote exec aborted, output write failed")
return false
}
return true return true
} }
// remoteExecWriteExitCode is used to write an exit code // remoteExecWriteExitCode is used to write an exit code
func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) { func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) {
val := []byte(strconv.FormatInt(int64(exitCode), 10))
if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil {
a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err)
}
}
// remoteExecWriteKey is used to write an output key for a remote exec job
func (a *Agent) remoteExecWriteKey(event *remoteExecEvent, suffix string, val []byte) error {
key := path.Join(event.Prefix, event.Session, a.config.NodeName, suffix)
write := structs.KVSRequest{ write := structs.KVSRequest{
Datacenter: a.config.Datacenter, Datacenter: a.config.Datacenter,
Op: structs.KVSLock, Op: structs.KVSLock,
DirEnt: structs.DirEntry{ DirEnt: structs.DirEntry{
Key: path.Join(event.Prefix, event.Session, Key: key,
a.config.NodeName, remoteExecExitSuffix), Value: val,
Value: []byte(strconv.FormatInt(int64(exitCode), 10)),
Session: event.Session, Session: event.Session,
}, },
} }
var success bool var success bool
if err := a.RPC("KVS.Apply", &write, &success); err != nil { if err := a.RPC("KVS.Apply", &write, &success); err != nil {
a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err) return err
} }
if !success { if !success {
a.logger.Printf("[DEBUG] agent: remote exec aborted, exit code write failed") return fmt.Errorf("write failed")
} }
return nil
} }

Loading…
Cancel
Save