package agent

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	osexec "os/exec"
	"path"
	"strconv"
	"sync"
	"syscall"
	"time"

	"github.com/hashicorp/consul/agent/exec"
	"github.com/hashicorp/consul/agent/structs"
	"github.com/hashicorp/consul/api"
)

const (
	// remoteExecFileName is the name of the file we append to
	// the path, e.g. _rexec/session_id/job
	remoteExecFileName = "job"

	// rExecAck is the suffix added to an ack path
	remoteExecAckSuffix = "ack"

	// remoteExecAck is the suffix added to an exit code
	remoteExecExitSuffix = "exit"

	// remoteExecOutputDivider is used to namespace the output
	remoteExecOutputDivider = "out"

	// remoteExecOutputSize is the size we chunk output too
	remoteExecOutputSize = 4 * 1024

	// remoteExecOutputDeadline is how long we wait before uploading
	// less than the chunk size
	remoteExecOutputDeadline = 500 * time.Millisecond
)

// remoteExecEvent is used as the payload of the user event to transmit
// what we need to know about the event
type remoteExecEvent struct {
	Prefix  string
	Session string
}

// remoteExecSpec is used as the specification of the remote exec.
// It is stored in the KV store
type remoteExecSpec struct {
	Command string
	Args    []string
	Script  []byte
	Wait    time.Duration
}

type rexecWriter struct {
	BufCh    chan []byte
	BufSize  int
	BufIdle  time.Duration
	CancelCh chan struct{}

	buf     []byte
	bufLen  int
	bufLock sync.Mutex
	flush   *time.Timer
}

func (r *rexecWriter) Write(b []byte) (int, error) {
	r.bufLock.Lock()
	defer r.bufLock.Unlock()
	if r.flush != nil {
		r.flush.Stop()
		r.flush = nil
	}
	inpLen := len(b)
	if r.buf == nil {
		r.buf = make([]byte, r.BufSize)
	}

COPY:
	remain := len(r.buf) - r.bufLen
	if remain > len(b) {
		copy(r.buf[r.bufLen:], b)
		r.bufLen += len(b)
	} else {
		copy(r.buf[r.bufLen:], b[:remain])
		b = b[remain:]
		r.bufLen += remain
		r.bufLock.Unlock()
		r.Flush()
		r.bufLock.Lock()
		goto COPY
	}

	r.flush = time.AfterFunc(r.BufIdle, r.Flush)
	return inpLen, nil
}

func (r *rexecWriter) Flush() {
	r.bufLock.Lock()
	defer r.bufLock.Unlock()
	if r.flush != nil {
		r.flush.Stop()
		r.flush = nil
	}
	if r.bufLen == 0 {
		return
	}
	select {
	case r.BufCh <- r.buf[:r.bufLen]:
		r.buf = make([]byte, r.BufSize)
		r.bufLen = 0
	case <-r.CancelCh:
		r.bufLen = 0
	}
}

// handleRemoteExec is invoked when a new remote exec request is received
func (a *Agent) handleRemoteExec(msg *UserEvent) {
	a.logger.Debug("received remote exec event", "id", msg.ID)
	// Decode the event payload
	var event remoteExecEvent
	if err := json.Unmarshal(msg.Payload, &event); err != nil {
		a.logger.Error("failed to decode remote exec event", "error", err)
		return
	}

	// Read the job specification
	var spec remoteExecSpec
	if !a.remoteExecGetSpec(&event, &spec) {
		return
	}

	// Write the acknowledgement
	if !a.remoteExecWriteAck(&event) {
		return
	}

	// Ensure we write out an exit code
	exitCode := 0
	defer a.remoteExecWriteExitCode(&event, &exitCode)

	// Check if this is a script, we may need to spill to disk
	var script string
	if len(spec.Script) != 0 {
		tmpFile, err := ioutil.TempFile("", "rexec")
		if err != nil {
			a.logger.Debug("failed to make tmp file", "error", err)
			exitCode = 255
			return
		}
		defer os.Remove(tmpFile.Name())
		os.Chmod(tmpFile.Name(), 0750)
		tmpFile.Write(spec.Script)
		tmpFile.Close()
		script = tmpFile.Name()
	} else {
		script = spec.Command
	}

	// Create the exec.Cmd
	a.logger.Info("remote exec script", "script", script)
	var cmd *osexec.Cmd
	var err error
	if len(spec.Args) > 0 {
		cmd, err = exec.Subprocess(spec.Args)
	} else {
		cmd, err = exec.Script(script)
	}
	if err != nil {
		a.logger.Debug("failed to start remote exec", "error", err)
		exitCode = 255
		return
	}

	// Setup the output streaming
	writer := &rexecWriter{
		BufCh:    make(chan []byte, 16),
		BufSize:  remoteExecOutputSize,
		BufIdle:  remoteExecOutputDeadline,
		CancelCh: make(chan struct{}),
	}
	cmd.Stdout = writer
	cmd.Stderr = writer

	// Start execution
	if err := cmd.Start(); err != nil {
		a.logger.Debug("failed to start remote exec", "error", err)
		exitCode = 255
		return
	}

	// Wait for the process to exit
	exitCh := make(chan int, 1)
	go func() {
		err := cmd.Wait()
		writer.Flush()
		close(writer.BufCh)
		if err == nil {
			exitCh <- 0
			return
		}

		// Try to determine the exit code
		if exitErr, ok := err.(*osexec.ExitError); ok {
			if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
				exitCh <- status.ExitStatus()
				return
			}
		}
		exitCh <- 1
	}()

	// Wait until we are complete, uploading as we go
WAIT:
	for num := 0; ; num++ {
		select {
		case out := <-writer.BufCh:
			if out == nil {
				break WAIT
			}
			if !a.remoteExecWriteOutput(&event, num, out) {
				close(writer.CancelCh)
				exitCode = 255
				return
			}
		case <-time.After(spec.Wait):
			// Acts like a heartbeat, since there is no output
			if !a.remoteExecWriteOutput(&event, num, nil) {
				close(writer.CancelCh)
				exitCode = 255
				return
			}
		}
	}

	// Get the exit code
	exitCode = <-exitCh
}

// remoteExecGetSpec is used to get the exec specification.
// Returns if execution should continue
func (a *Agent) remoteExecGetSpec(event *remoteExecEvent, spec *remoteExecSpec) bool {
	get := structs.KeyRequest{
		Datacenter: a.config.Datacenter,
		Key:        path.Join(event.Prefix, event.Session, remoteExecFileName),
		QueryOptions: structs.QueryOptions{
			AllowStale: true, // Stale read for scale! Retry on failure.
		},
	}
	get.Token = a.tokens.AgentToken()
	var out structs.IndexedDirEntries
QUERY:
	if err := a.RPC("KVS.Get", &get, &out); err != nil {
		a.logger.Error("failed to get remote exec job", "error", err)
		return false
	}
	if len(out.Entries) == 0 {
		// If the initial read was stale and had no data, retry as a consistent read
		if get.QueryOptions.AllowStale {
			a.logger.Debug("trying consistent fetch of remote exec job spec")
			get.QueryOptions.AllowStale = false
			goto QUERY
		} else {
			a.logger.Debug("remote exec aborted, job spec missing")
			return false
		}
	}
	if err := json.Unmarshal(out.Entries[0].Value, &spec); err != nil {
		a.logger.Error("failed to decode remote exec spec", "error", err)
		return false
	}
	return true
}

// remoteExecWriteAck is used to write an ack. Returns if execution should
// continue.
func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool {
	if err := a.remoteExecWriteKey(event, remoteExecAckSuffix, nil); err != nil {
		a.logger.Error("failed to ack remote exec job", "error", err)
		return false
	}
	return true
}

// remoteExecWriteOutput is used to write output
func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool {
	suffix := path.Join(remoteExecOutputDivider, fmt.Sprintf("%05x", num))
	if err := a.remoteExecWriteKey(event, suffix, output); err != nil {
		a.logger.Error("failed to write output for remote exec job", "error", err)
		return false
	}
	return true
}

// remoteExecWriteExitCode is used to write an exit code
func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode *int) bool {
	val := []byte(strconv.FormatInt(int64(*exitCode), 10))
	if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil {
		a.logger.Error("failed to write exit code for remote exec job", "error", err)
		return false
	}
	return true
}

// remoteExecWriteKey is used to write an output key for a remote exec job
func (a *Agent) remoteExecWriteKey(event *remoteExecEvent, suffix string, val []byte) error {
	key := path.Join(event.Prefix, event.Session, a.config.NodeName, suffix)
	write := structs.KVSRequest{
		Datacenter: a.config.Datacenter,
		Op:         api.KVLock,
		DirEnt: structs.DirEntry{
			Key:     key,
			Value:   val,
			Session: event.Session,
		},
	}
	write.Token = a.tokens.AgentToken()
	var success bool
	if err := a.RPC("KVS.Apply", &write, &success); err != nil {
		return err
	}
	if !success {
		return fmt.Errorf("write failed")
	}
	return nil
}