diff --git a/bin/web/server.go b/bin/web/server.go index d785738..9f69747 100644 --- a/bin/web/server.go +++ b/bin/web/server.go @@ -2,6 +2,7 @@ package main import ( "net" + "time" "github.com/cockroachdb/cmux" @@ -33,6 +34,13 @@ func main() { return } + noticer, err := models.NewMail(10 * time.Second) + if err != nil { + log.Error(err.Error()) + return + } + go models.StartNoticer(noticer) + go func() { err := httpServer.Serve(httpL) if err != nil { diff --git a/conf/conf.go b/conf/conf.go index fef45cd..bfda79b 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -43,12 +43,13 @@ func Init() error { } type Conf struct { - Node string // node 进程路径 - Proc string // 当前执行任务路径 - Cmd string // cmd 路径 - Once string // 马上执行任务路径 - Lock string // job lock 路径 - Group string // 节点分组 + Node string // node 进程路径 + Proc string // 当前执行任务路径 + Cmd string // cmd 路径 + Once string // 马上执行任务路径 + Lock string // job lock 路径 + Group string // 节点分组 + Noticer string // 通知 Ttl int64 // 节点超时时间,单位秒 ReqTimeout int // 请求超时时间,单位秒 @@ -128,10 +129,13 @@ func (c *Conf) parse() error { } log.InitConf(c.Log) - c.Cmd = cleanKeyPrefix(c.Cmd) c.Node = cleanKeyPrefix(c.Node) c.Proc = cleanKeyPrefix(c.Proc) + c.Cmd = cleanKeyPrefix(c.Cmd) + c.Once = cleanKeyPrefix(c.Once) + c.Lock = cleanKeyPrefix(c.Lock) c.Group = cleanKeyPrefix(c.Group) + c.Noticer = cleanKeyPrefix(c.Noticer) return nil } @@ -185,7 +189,7 @@ func (c *Conf) reload() { } // etcd key 选项需要重启 - cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Lock, cf.Group = c.Node, c.Proc, c.Cmd, c.Once, c.Lock, c.Group + cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Lock, cf.Group, cf.Noticer = c.Node, c.Proc, c.Cmd, c.Once, c.Lock, c.Group, c.Noticer *c = *cf log.Noticef("config file[%s] reload success", *confFile) diff --git a/conf/files/base.json.sample b/conf/files/base.json.sample index a678c72..f63f801 100644 --- a/conf/files/base.json.sample +++ b/conf/files/base.json.sample @@ -6,6 +6,7 @@ "Once": "/cronsun/once/", "Lock": "/cronsun/lock/", "Group": "/cronsun/group/", + "Noticer": "/cronsun/noticer/", "#Ttl": "节点超时时间,单位秒", "Ttl": 10, "#ReqTimeout": "etcd 请求超时时间,单位秒", diff --git a/models/noticer.go b/models/noticer.go index fa2d78b..2ec50c6 100644 --- a/models/noticer.go +++ b/models/noticer.go @@ -1,8 +1,11 @@ package models import ( + "encoding/json" + "fmt" "time" + client "github.com/coreos/etcd/clientv3" "github.com/go-gomail/gomail" "sunteng/commons/log" @@ -28,9 +31,25 @@ type Mail struct { msgChan chan *Message } -func NewMail() (m *Mail, err error) { - cf := conf.Config.Mail - sc, err := cf.Dialer.Dial() +func NewMail(timeout time.Duration) (m *Mail, err error) { + var ( + sc gomail.SendCloser + done = make(chan struct{}) + cf = conf.Config.Mail + ) + + // qq 邮箱的 Auth 出错后, 501 命令超时 2min 才能退出 + go func() { + sc, err = cf.Dialer.Dial() + close(done) + }() + + select { + case <-done: + case <-time.After(timeout): + err = fmt.Errorf("connect to smtp timeout") + } + if err != nil { return } @@ -47,6 +66,7 @@ func NewMail() (m *Mail, err error) { func (m *Mail) Serve() { var err error + sm := gomail.NewMessage() for { select { case msg := <-m.msgChan: @@ -59,7 +79,7 @@ func (m *Mail) Serve() { m.open = true } - sm := gomail.NewMessage() + sm.Reset() sm.SetHeader("From", m.cf.Username) sm.SetHeader("To", msg.To...) sm.SetHeader("Subject", msg.Subject) @@ -79,3 +99,26 @@ func (m *Mail) Serve() { } } } + +func (m *Mail) Send(msg *Message) { + m.msgChan <- msg +} + +func StartNoticer(n Noticer) { + go n.Serve() + rch := DefalutClient.Watch(conf.Config.Noticer, client.WithPrefix()) + var err error + for wresp := range rch { + for _, ev := range wresp.Events { + switch { + case ev.IsCreate(), ev.IsModify(): + msg := new(Message) + if err = json.Unmarshal(ev.Kv.Value, msg); err != nil { + log.Warnf("msg[%s] umarshal err: %s", string(ev.Kv.Value), err.Error()) + continue + } + n.Send(msg) + } + } + } +}