Merge branch 'master' of github.com:shunfei/cronsun into fix/panic-on-group-update

pull/64/head
Doflatango 2018-02-26 17:49:52 +08:00
commit 640af0fcb3
8 changed files with 57 additions and 31 deletions

View File

@ -3,9 +3,7 @@ go:
- 1.9
install:
- go get -u github.com/coreos/etcd/clientv3
- go get github.com/coreos/etcd/mvcc/mvccpb
- go get github.com/rogpeppe/fastuuid
- go get golang.org/x/net/context
- go get gopkg.in/mgo.v2
- go get github.com/fsnotify/fsnotify
- go get github.com/go-gomail/gomail

View File

@ -1,11 +1,11 @@
package cronsun
import (
"context"
"fmt"
"strings"
"time"
"golang.org/x/net/context"
client "github.com/coreos/etcd/clientv3"
"github.com/shunfei/cronsun/conf"
@ -36,7 +36,7 @@ func NewClient(cfg *conf.Conf) (c *Client, err error) {
}
func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
ctx, cancel := NewEtcdTimeoutContext(c)
defer cancel()
return c.Client.Put(ctx, key, val, opts...)
}
@ -46,7 +46,7 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse,
return c.Put(key, val)
}
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
ctx, cancel := NewEtcdTimeoutContext(c)
tresp, err := DefalutClient.Txn(ctx).
If(client.Compare(client.ModRevision(key), "=", rev)).
Then(client.OpPut(key, val)).
@ -65,13 +65,13 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse,
}
func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
ctx, cancel := NewEtcdTimeoutContext(c)
defer cancel()
return c.Client.Get(ctx, key, opts...)
}
func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
ctx, cancel := NewEtcdTimeoutContext(c)
defer cancel()
return c.Client.Delete(ctx, key, opts...)
}
@ -81,20 +81,26 @@ func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan {
}
func (c *Client) Grant(ttl int64) (*client.LeaseGrantResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
ctx, cancel := NewEtcdTimeoutContext(c)
defer cancel()
return c.Client.Grant(ctx, ttl)
}
func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveResponse, error) {
func (c *Client) Revoke(id client.LeaseID) (*client.LeaseRevokeResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel()
return c.Client.Revoke(ctx, id)
}
func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveResponse, error) {
ctx, cancel := NewEtcdTimeoutContext(c)
defer cancel()
return c.Client.KeepAliveOnce(ctx, id)
}
func (c *Client) GetLock(key string, id client.LeaseID) (bool, error) {
key = conf.Config.Lock + key
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
ctx, cancel := NewEtcdTimeoutContext(c)
resp, err := DefalutClient.Txn(ctx).
If(client.Compare(client.CreateRevision(key), "=", 0)).
Then(client.OpPut(key, "", client.WithLease(id))).
@ -116,3 +122,28 @@ func (c *Client) DelLock(key string) error {
func IsValidAsKeyPath(s string) bool {
return strings.IndexByte(s, '/') == -1
}
// etcdTimeoutContext return better error info
type etcdTimeoutContext struct {
context.Context
etcdEndpoints []string
}
func (c *etcdTimeoutContext) Err() error {
err := c.Context.Err()
if err == context.DeadlineExceeded {
err = fmt.Errorf("%s: etcd(%v) maybe lost",
err, c.etcdEndpoints)
}
return err
}
// NewEtcdTimeoutContext return a new etcdTimeoutContext
func NewEtcdTimeoutContext(c *Client) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
etcdCtx := &etcdTimeoutContext{}
etcdCtx.Context = ctx
etcdCtx.etcdEndpoints = c.Endpoints()
return etcdCtx, cancel
}

View File

@ -24,7 +24,7 @@ func On(name string, fs ...func(interface{})) error {
}
for _, f := range fs {
if fs == nil {
if f == nil {
continue
}

View File

@ -6,7 +6,6 @@ import (
"strings"
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
@ -65,10 +64,10 @@ func WatchGroups() client.WatchChan {
return DefalutClient.Watch(conf.Config.Group, client.WithPrefix(), client.WithPrevKV())
}
func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) {
func GetGroupFromKv(key, value []byte) (g *Group, err error) {
g = new(Group)
if err = json.Unmarshal(kv.Value, g); err != nil {
err = fmt.Errorf("group[%s] umarshal err: %s", string(kv.Key), err.Error())
if err = json.Unmarshal(value, g); err != nil {
err = fmt.Errorf("group[%s] umarshal err: %s", string(key), err.Error())
}
return
}

14
job.go
View File

@ -2,6 +2,7 @@ package cronsun
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os/exec"
@ -13,10 +14,7 @@ import (
"syscall"
"time"
"golang.org/x/net/context"
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
@ -120,8 +118,8 @@ func (l *locker) unlock() {
close(l.done)
l.timer.Stop()
if _, err := DefalutClient.KeepAliveOnce(l.lID); err != nil {
log.Warnf("unlock keep alive err: %s", err.Error())
if _, err := DefalutClient.Revoke(l.lID); err != nil {
log.Warnf("unlock revoke err: %s", err.Error())
}
}
@ -372,10 +370,10 @@ func WatchJobs() client.WatchChan {
return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
}
func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
func GetJobFromKv(key, value []byte) (job *Job, err error) {
job = new(Job)
if err = json.Unmarshal(kv.Value, job); err != nil {
err = fmt.Errorf("job[%s] umarshal err: %s", string(kv.Key), err.Error())
if err = json.Unmarshal(value, job); err != nil {
err = fmt.Errorf("job[%s] umarshal err: %s", string(key), err.Error())
return
}

View File

@ -371,7 +371,7 @@ func (n *Node) watchJobs() {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
job, err := cronsun.GetJobFromKv(ev.Kv)
job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value)
if err != nil {
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
@ -380,7 +380,7 @@ func (n *Node) watchJobs() {
job.Init(n.ID)
n.addJob(job, true)
case ev.IsModify():
job, err := cronsun.GetJobFromKv(ev.Kv)
job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value)
if err != nil {
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
@ -403,7 +403,7 @@ func (n *Node) watchGroups() {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
g, err := cronsun.GetGroupFromKv(ev.Kv)
g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value)
if err != nil {
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
@ -411,7 +411,7 @@ func (n *Node) watchGroups() {
n.addGroup(g)
case ev.IsModify():
g, err := cronsun.GetGroupFromKv(ev.Kv)
g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value)
if err != nil {
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue

View File

@ -94,9 +94,8 @@ func (m *Mail) Serve() {
if m.open {
if err = m.sc.Close(); err != nil {
log.Warnf("close smtp server err: %s", err.Error())
} else {
m.open = false
}
m.open = false
}
m.timer.Reset(time.Duration(m.cf.Keepalive) * time.Second)
}

View File

@ -3,6 +3,7 @@ package session
import (
"bytes"
"encoding/gob"
"errors"
"net/http"
client "github.com/coreos/etcd/clientv3"
@ -120,7 +121,7 @@ func (this *EtcdStore) Store(sess *Session) (err error) {
if sess.leaseID == 0 {
lresp, err := this.client.Grant(int64(this.conf.Expiration))
if err != nil {
return err
return errors.New("etcd create new lease faild: " + err.Error()) // err
}
sess.leaseID = lresp.ID
}