diff --git a/README.md b/README.md index f2e6faa..a0ea8ff 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,11 @@ * 任务执行失败重试设置 * 任务超时设置 * 延时任务 -* 任务执行方式 - * RPC调用执行远程shell命令 - * HTTP-GET请求 +* 任务类型 + * shell任务 + > 在远程服务器上执行shell命令, 调度器与任务执行器保持长连接 + * HTTP任务 + > 访问指定的URL地址 * 查看任务执行日志 * 任务执行结果通知, 支持邮件、Slack @@ -32,7 +34,7 @@ * [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-linux-amd64.tar.gz) * [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-darwin-amd64.tar.gz) * [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-windows-amd64.zip) -* 任务执行器(安装在远程主机上) +* 任务执行器(安装在远程主机上, 执行shell命令需安装) * [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-linux-amd64.tar.gz) * [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-darwin-amd64.tar.gz) * [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-windows-amd64.zip) @@ -76,6 +78,7 @@ * ORM [Xorm](https://github.com/go-xorm/xorm) * UI框架 [Semantic UI](https://semantic-ui.com/) * 依赖管理(所有依赖包放入vendor目录) [Govendor](https://github.com/kardianos/govendor) +* RPC框架 [gRPC](https://github.com/grpc/grpc) ## 反馈 提交[issue](https://github.com/ouqiang/gocron/issues/new) diff --git a/models/host.go b/models/host.go index 989490f..d9ea5a6 100644 --- a/models/host.go +++ b/models/host.go @@ -36,7 +36,7 @@ func (host *Host) Update(id int, data CommonMap) (int64, error) { // 删除 func (host *Host) Delete(id int) (int64, error) { - return Db.Id(id).Delete(host) + return Db.Id(id).Delete(new(Host)) } func (host *Host) Find(id int) error { diff --git a/models/model.go b/models/model.go index b7a79ad..00fc092 100644 --- a/models/model.go +++ b/models/model.go @@ -7,7 +7,6 @@ import ( "github.com/go-xorm/xorm" "gopkg.in/macaron.v1" "strings" - "time" "github.com/ouqiang/gocron/modules/logger" "github.com/ouqiang/gocron/modules/app" ) @@ -82,8 +81,6 @@ func CreateDb() *xorm.Engine { engine.Logger().SetLevel(core.LOG_DEBUG) } - go keepDbAlived(engine) - return engine } @@ -112,14 +109,6 @@ func getDbEngineDSN(engine string, config map[string]string) string { return dsn } -// 定时ping, 防止因数据库超时设置连接被断开 -func keepDbAlived(engine *xorm.Engine) { - t := time.Tick(180 * time.Second) - for { - <- t - engine.Ping() - } -} // 获取数据库配置 func getDbConfig() map[string]string { diff --git a/modules/rpc/client/client.go b/modules/rpc/client/client.go index 2d7e691..a9c2b6e 100644 --- a/modules/rpc/client/client.go +++ b/modules/rpc/client/client.go @@ -1,21 +1,23 @@ package client import ( - "google.golang.org/grpc" pb "github.com/ouqiang/gocron/modules/rpc/proto" "golang.org/x/net/context" "fmt" "time" "errors" + "github.com/ouqiang/gocron/modules/rpc/grpcpool" ) func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) { addr := fmt.Sprintf("%s:%d", ip, port); - conn, err := grpc.Dial(addr, grpc.WithInsecure()) + conn, err := grpcpool.Pool.Get(addr) if err != nil { return "", err } - defer conn.Close() + defer func() { + grpcpool.Pool.Put(addr, conn) + }() c := pb.NewTaskClient(conn) if taskReq.Timeout <= 0 || taskReq.Timeout > 86400 { taskReq.Timeout = 86400 diff --git a/modules/rpc/grpcpool/grpc_pool.go b/modules/rpc/grpcpool/grpc_pool.go new file mode 100644 index 0000000..9084e86 --- /dev/null +++ b/modules/rpc/grpcpool/grpc_pool.go @@ -0,0 +1,109 @@ +package grpcpool + +import ( + "github.com/silenceper/pool" + "sync" + "time" + "google.golang.org/grpc" + "errors" +) + + +var ( + Pool GRPCPool +) + +var ( + ErrInvalidConn = errors.New("invalid connection") +) + +func init() { + Pool = GRPCPool{ + make(map[string]pool.Pool), + sync.RWMutex{}, + } +} + +type GRPCPool struct { + // map key格式 ip:port + conns map[string]pool.Pool + sync.RWMutex +} + +func (p *GRPCPool) Get(addr string) (*grpc.ClientConn, error) { + p.RLock() + p.RUnlock() + pool, ok := p.conns[addr] + if !ok { + err := p.newCommonPool(addr) + if err != nil { + return nil, err + } + } + + pool = p.conns[addr] + conn, err := pool.Get() + if err != nil { + return nil, err + } + + return conn.(*grpc.ClientConn), nil +} + +func (p *GRPCPool) Put(addr string, conn *grpc.ClientConn) error { + p.RLock() + defer p.RUnlock() + pool, ok := p.conns[addr] + if ok { + return pool.Put(conn) + } + + return ErrInvalidConn +} + + +// 释放连接池 +func (p *GRPCPool) Release(addr string) { + p.Lock() + defer p.Unlock() + pool, ok := p.conns[addr] + if !ok { + return + } + pool.Release() + delete(p.conns, addr) +} + +// 初始化底层连接池 +func (p *GRPCPool) newCommonPool(addr string) (error) { + p.Lock() + defer p.Unlock() + commonPool, ok := p.conns[addr] + if ok { + return nil + } + poolConfig := &pool.PoolConfig{ + InitialCap: 1, + MaxCap: 30, + Factory: func() (interface{}, error) { + return grpc.Dial(addr, grpc.WithInsecure()) + }, + Close: func(v interface{}) error { + conn, ok := v.(*grpc.ClientConn) + if ok && conn != nil { + return conn.Close() + } + return ErrInvalidConn + }, + IdleTimeout: 5 * time.Minute, + } + + commonPool, err := pool.NewChannelPool(poolConfig) + if err != nil { + return err + } + + p.conns[addr] = commonPool + + return nil +} \ No newline at end of file diff --git a/routers/host/host.go b/routers/host/host.go index 0207515..f5bb912 100644 --- a/routers/host/host.go +++ b/routers/host/host.go @@ -12,6 +12,8 @@ import ( "html/template" "github.com/ouqiang/gocron/routers/base" "github.com/go-macaron/binding" + "github.com/ouqiang/gocron/modules/rpc/grpcpool" + "strings" ) func Index(ctx *macaron.Context) { @@ -85,11 +87,17 @@ func Store(ctx *macaron.Context, form HostForm) string { return json.CommonFailure("主机名已存在") } - hostModel.Name = form.Name - hostModel.Alias = form.Alias + hostModel.Name = strings.TrimSpace(form.Name) + hostModel.Alias = strings.TrimSpace(form.Alias) hostModel.Port = form.Port - hostModel.Remark = form.Remark + hostModel.Remark = strings.TrimSpace(form.Remark) isCreate := false + oldHostModel := new(models.Host) + err = oldHostModel.Find(int(id)) + if err != nil { + return json.CommonFailure("主机不存在") + } + if id > 0 { _, err = hostModel.UpdateBean(id) } else { @@ -100,12 +108,18 @@ func Store(ctx *macaron.Context, form HostForm) string { return json.CommonFailure("保存失败", err) } - taskModel := new(models.TaskHost) - tasks, err := taskModel.ActiveListByHostId(id) - if err != nil { - return json.CommonFailure("刷新任务主机信息失败", err) - } - if !isCreate && len(tasks) > 0 { + if !isCreate { + oldAddr := fmt.Sprintf("%s:%d", oldHostModel.Name, oldHostModel.Port) + newAddr := fmt.Sprintf("%s:%d", hostModel.Name, hostModel.Port) + if oldAddr != newAddr { + grpcpool.Pool.Release(oldAddr) + } + + taskModel := new(models.TaskHost) + tasks, err := taskModel.ActiveListByHostId(id) + if err != nil { + return json.CommonFailure("刷新任务主机信息失败", err) + } serviceTask := new(service.Task) serviceTask.BatchAdd(tasks) } @@ -129,11 +143,20 @@ func Remove(ctx *macaron.Context) string { } hostModel := new(models.Host) + err = hostModel.Find(int(id)) + if err != nil { + return json.CommonFailure("主机不存在") + } + _, err =hostModel.Delete(id) if err != nil { return json.CommonFailure("操作失败", err) } + addr := fmt.Sprintf("%s:%d", hostModel.Name, hostModel.Port) + grpcpool.Pool.Release(addr) + + return json.Success("操作成功", nil) } diff --git a/templates/task/index.html b/templates/task/index.html index 3a4daca..158763d 100644 --- a/templates/task/index.html +++ b/templates/task/index.html @@ -33,7 +33,7 @@
@@ -70,7 +70,7 @@ {{{.Id}}} {{{.Task.Name}}} {{{.Spec}}} - {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} RPC {{{end}}} + {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SHELL {{{end}}} {{{if eq .Timeout -1}}}后台运行{{{else if gt .Timeout 0}}}{{{.Timeout}}}秒{{{else}}}不限制{{{end}}} {{{.RetryTimes}}} {{{if gt .Multi 0}}}否{{{else}}}是{{{end}}} diff --git a/templates/task/log.html b/templates/task/log.html index 2b827bb..84527fc 100644 --- a/templates/task/log.html +++ b/templates/task/log.html @@ -30,7 +30,7 @@
@@ -68,7 +68,7 @@ {{{.TaskId}}} {{{.Name}}} {{{.Spec}}} - {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} RPC {{{end}}} + {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SHELL {{{end}}} {{{.RetryTimes}}} {{{.Hostname}}} diff --git a/templates/task/task_form.html b/templates/task/task_form.html index 0dfc5ad..86e4b0e 100644 --- a/templates/task/task_form.html +++ b/templates/task/task_form.html @@ -39,7 +39,7 @@
diff --git a/vendor/github.com/silenceper/pool/LICENSE b/vendor/github.com/silenceper/pool/LICENSE new file mode 100644 index 0000000..2d77ed5 --- /dev/null +++ b/vendor/github.com/silenceper/pool/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 nestgo + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/silenceper/pool/README.md b/vendor/github.com/silenceper/pool/README.md new file mode 100644 index 0000000..9bcfe3a --- /dev/null +++ b/vendor/github.com/silenceper/pool/README.md @@ -0,0 +1,62 @@ +# pool +[![GoDoc](http://godoc.org/github.com/silenceper/pool?status.svg)](http://godoc.org/github.com/silenceper/pool) + +Golang 实现的连接池 + + +##功能: + +- 连接池中连接类型为`interface{}`,使得更加通用 +- 链接的最大空闲时间,超时的链接将关闭丢弃,可避免空闲时链接自动失效问题 +- 使用channel处理池中的链接,高效 + +## 基本用法 + +```go + +//factory 创建连接的方法 +factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") } + +//close 关闭链接的方法 +close := func(v interface{}) error { return v.(net.Conn).Close() } + +//创建一个连接池: 初始化5,最大链接30 +poolConfig := &pool.PoolConfig{ + InitialCap: 5, + MaxCap: 30, + Factory: factory, + Close: close, + //链接最大空闲时间,超过该时间的链接 将会关闭,可避免空闲时链接EOF,自动失效的问题 + IdleTimeout: 15 * time.Second, +} +p, err := pool.NewChannelPool(poolConfig) +if err != nil { + fmt.Println("err=", err) +} + +//从连接池中取得一个链接 +v, err := p.Get() + +//do something +//conn=v.(net.Conn) + +//将链接放回连接池中 +p.Put(v) + +//释放连接池中的所有链接 +p.Release() + +//查看当前链接中的数量 +current := p.Len() + + +``` + + +####注: +该连接池参考 [https://github.com/fatih/pool](https://github.com/fatih/pool) 实现,改变以及增加原有的一些功能。 + + +## License + +The MIT License (MIT) - see LICENSE for more details diff --git a/vendor/github.com/silenceper/pool/channel.go b/vendor/github.com/silenceper/pool/channel.go new file mode 100644 index 0000000..7e36eaf --- /dev/null +++ b/vendor/github.com/silenceper/pool/channel.go @@ -0,0 +1,156 @@ +package pool + +import ( + "errors" + "fmt" + "sync" + "time" +) + +//PoolConfig 连接池相关配置 +type PoolConfig struct { + //连接池中拥有的最小连接数 + InitialCap int + //连接池中拥有的最大的连接数 + MaxCap int + //生成连接的方法 + Factory func() (interface{}, error) + //关闭链接的方法 + Close func(interface{}) error + //链接最大空闲时间,超过该事件则将失效 + IdleTimeout time.Duration +} + +//channelPool 存放链接信息 +type channelPool struct { + mu sync.Mutex + conns chan *idleConn + factory func() (interface{}, error) + close func(interface{}) error + idleTimeout time.Duration +} + +type idleConn struct { + conn interface{} + t time.Time +} + +//NewChannelPool 初始化链接 +func NewChannelPool(poolConfig *PoolConfig) (Pool, error) { + if poolConfig.InitialCap < 0 || poolConfig.MaxCap <= 0 || poolConfig.InitialCap > poolConfig.MaxCap { + return nil, errors.New("invalid capacity settings") + } + + c := &channelPool{ + conns: make(chan *idleConn, poolConfig.MaxCap), + factory: poolConfig.Factory, + close: poolConfig.Close, + idleTimeout: poolConfig.IdleTimeout, + } + + for i := 0; i < poolConfig.InitialCap; i++ { + conn, err := c.factory() + if err != nil { + c.Release() + return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) + } + c.conns <- &idleConn{conn: conn, t: time.Now()} + } + + return c, nil +} + +//getConns 获取所有连接 +func (c *channelPool) getConns() chan *idleConn { + c.mu.Lock() + conns := c.conns + c.mu.Unlock() + return conns +} + +//Get 从pool中取一个连接 +func (c *channelPool) Get() (interface{}, error) { + conns := c.getConns() + if conns == nil { + return nil, ErrClosed + } + for { + select { + case wrapConn := <-conns: + if wrapConn == nil { + return nil, ErrClosed + } + //判断是否超时,超时则丢弃 + if timeout := c.idleTimeout; timeout > 0 { + if wrapConn.t.Add(timeout).Before(time.Now()) { + //丢弃并关闭该链接 + c.Close(wrapConn.conn) + continue + } + } + return wrapConn.conn, nil + default: + conn, err := c.factory() + if err != nil { + return nil, err + } + + return conn, nil + } + } +} + +//Put 将连接放回pool中 +func (c *channelPool) Put(conn interface{}) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conns == nil { + return c.Close(conn) + } + + select { + case c.conns <- &idleConn{conn: conn, t: time.Now()}: + return nil + default: + //连接池已满,直接关闭该链接 + return c.Close(conn) + } +} + +//Close 关闭单条连接 +func (c *channelPool) Close(conn interface{}) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + return c.close(conn) +} + +//Release 释放连接池中所有链接 +func (c *channelPool) Release() { + c.mu.Lock() + conns := c.conns + c.conns = nil + c.factory = nil + closeFun := c.close + c.close = nil + c.mu.Unlock() + + if conns == nil { + return + } + + close(conns) + for wrapConn := range conns { + closeFun(wrapConn.conn) + } +} + +//Len 连接池中已有的连接 +func (c *channelPool) Len() int { + return len(c.getConns()) +} diff --git a/vendor/github.com/silenceper/pool/pool.go b/vendor/github.com/silenceper/pool/pool.go new file mode 100644 index 0000000..7ec40ff --- /dev/null +++ b/vendor/github.com/silenceper/pool/pool.go @@ -0,0 +1,21 @@ +package pool + +import "errors" + +var ( + //ErrClosed 连接池已经关闭Error + ErrClosed = errors.New("pool is closed") +) + +//Pool 基本方法 +type Pool interface { + Get() (interface{}, error) + + Put(interface{}) error + + Close(interface{}) error + + Release() + + Len() int +} diff --git a/vendor/vendor.json b/vendor/vendor.json index cdbe511..0cffdc1 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -148,6 +148,12 @@ "revision": "c28ec761087c32fd75ad7514db2a4988d5c872d9", "revisionTime": "2017-05-14T12:16:09Z" }, + { + "checksumSHA1": "cVGA2CJTJsCAVa5VKTM8k/ma/BU=", + "path": "github.com/silenceper/pool", + "revision": "e4c4536403d9765c8ea74cda2ee49dcb0848bc7a", + "revisionTime": "2016-11-14T14:21:55Z" + }, { "checksumSHA1": "1keN4Q9F8uHk/Gt5YXG0oCEecKM=", "path": "github.com/urfave/cli",