package cronsun

import (
	"encoding/json"
	"fmt"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	client "github.com/coreos/etcd/clientv3"

	"github.com/shunfei/cronsun/conf"
	"github.com/shunfei/cronsun/log"
)

var (
	lID *leaseID
)

// 维持 lease id 服务
func StartProc() error {
	lID = &leaseID{
		ttl:  conf.Config.ProcTtl,
		lk:   new(sync.RWMutex),
		done: make(chan struct{}),
	}

	if lID.ttl == 0 {
		return nil
	}

	err := lID.set()
	go lID.keepAlive()
	return err
}

func Reload(i interface{}) {
	if lID.ttl == conf.Config.ProcTtl {
		return
	}

	close(lID.done)
	lID.done, lID.ttl = make(chan struct{}), conf.Config.ProcTtl
	if conf.Config.ProcTtl == 0 {
		return
	}

	if err := lID.set(); err != nil {
		log.Warnf("proc lease id set err: %s", err.Error())
	}
	go lID.keepAlive()
}

func Exit(i interface{}) {
	if lID.done != nil {
		close(lID.done)
	}
}

type leaseID struct {
	ttl int64
	ID  client.LeaseID
	lk  *sync.RWMutex

	done chan struct{}
}

func (l *leaseID) get() client.LeaseID {
	if l.ttl == 0 {
		return -1
	}

	l.lk.RLock()
	id := l.ID
	l.lk.RUnlock()
	return id
}

func (l *leaseID) set() error {
	id := client.LeaseID(-1)
	resp, err := DefalutClient.Grant(l.ttl + 2)
	if err == nil {
		id = resp.ID
	}

	l.lk.Lock()
	l.ID = id
	l.lk.Unlock()
	return err
}

func (l *leaseID) keepAlive() {
	duration := time.Duration(l.ttl) * time.Second
	timer := time.NewTimer(duration)
	for {
		select {
		case <-l.done:
			return
		case <-timer.C:
			if l.ttl == 0 {
				return
			}

			id := l.get()
			if id > 0 {
				_, err := DefalutClient.KeepAliveOnce(l.ID)
				if err == nil {
					timer.Reset(duration)
					continue
				}

				log.Warnf("proc lease id[%x] keepAlive err: %s, try to reset...", id, err.Error())
			}

			if err := l.set(); err != nil {
				log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
			} else {
				log.Infof("proc set lease id[%x] success", l.get())
			}
			timer.Reset(duration)
		}
	}
}

// 当前执行中的任务信息
// key: /cronsun/proc/node/group/jobId/pid
// value: 开始执行时间
// key 会自动过期,防止进程意外退出后没有清除相关 key,过期时间可配置
type Process struct {
	// parse from key path
	ID     string `json:"id"` // pid
	JobID  string `json:"jobId"`
	Group  string `json:"group"`
	NodeID string `json:"nodeId"`
	// parse from value
	ProcessVal

	running int32
	hasPut  int32
	wg      sync.WaitGroup
	done    chan struct{}
}

type ProcessVal struct {
	Time   time.Time `json:"time"`   // 开始执行时间
	Killed bool      `json:"killed"` // 是否强制杀死
}

func GetProcFromKey(key string) (proc *Process, err error) {
	ss := strings.Split(key, "/")
	var sslen = len(ss)
	if sslen < 5 {
		err = fmt.Errorf("invalid proc key [%s]", key)
		return
	}

	proc = &Process{
		ID:     ss[sslen-1],
		JobID:  ss[sslen-2],
		Group:  ss[sslen-3],
		NodeID: ss[sslen-4],
	}
	return
}

func (p *Process) Key() string {
	return conf.Config.Proc + p.NodeID + "/" + p.Group + "/" + p.JobID + "/" + p.ID
}

func (p *Process) Val() (string, error) {
	b, err := json.Marshal(&p.ProcessVal)
	if err != nil {
		return "", err
	}

	return string(b), nil
}

// 获取节点正在执行任务的数量
func (j *Job) CountRunning() (int64, error) {
	resp, err := DefalutClient.Get(conf.Config.Proc+j.runOn+"/"+j.Group+"/"+j.ID, client.WithPrefix(), client.WithCountOnly())
	if err != nil {
		return 0, err
	}

	return resp.Count, nil
}

// put 出错也进行 del 操作
// 有可能某种原因,put 命令已经发送到 etcd server
// 目前已知的 deadline 会出现此情况
func (p *Process) put() (err error) {
	if atomic.LoadInt32(&p.running) != 1 {
		return
	}

	if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) {
		return
	}

	id := lID.get()
	val, err := p.Val()
	if err != nil {
		return err
	}
	if id < 0 {
		if _, err = DefalutClient.Put(p.Key(), val); err != nil {
			return
		}
	}

	_, err = DefalutClient.Put(p.Key(), val, client.WithLease(id))
	return
}

func (p *Process) del() error {
	if atomic.LoadInt32(&p.hasPut) != 1 {
		return nil
	}

	_, err := DefalutClient.Delete(p.Key())
	return err
}

func (p *Process) Start() {
	if p == nil {
		return
	}

	if !atomic.CompareAndSwapInt32(&p.running, 0, 1) {
		return
	}

	if conf.Config.ProcReq == 0 {
		if err := p.put(); err != nil {
			log.Warnf("proc put[%s] err: %s", p.Key(), err.Error())
		}
		return
	}

	p.done = make(chan struct{})
	p.wg.Add(1)
	go func() {
		select {
		case <-p.done:
		case <-time.After(time.Duration(conf.Config.ProcReq) * time.Second):
			if err := p.put(); err != nil {
				log.Warnf("proc put[%s] err: %s", p.Key(), err.Error())
			}
		}
		p.wg.Done()
	}()
}

func (p *Process) Stop() {
	if p == nil {
		return
	}

	if !atomic.CompareAndSwapInt32(&p.running, 1, 0) {
		return
	}

	if p.done != nil {
		close(p.done)
	}
	p.wg.Wait()

	if err := p.del(); err != nil {
		log.Warnf("proc del[%s] err: %s", p.Key(), err.Error())
	}
}

func WatchProcs(nid string) client.WatchChan {
	return DefalutClient.Watch(conf.Config.Proc+nid, client.WithPrefix())
}