增加gRPC连接池

pull/21/merge
ouqiang 2017-05-28 23:13:22 +08:00
parent bbed4f4c20
commit 5a8c3391f5
14 changed files with 425 additions and 33 deletions

View File

@ -10,9 +10,11 @@
* 任务执行失败重试设置 * 任务执行失败重试设置
* 任务超时设置 * 任务超时设置
* 延时任务 * 延时任务
* 任务执行方式 * 任务类型
* RPC调用执行远程shell命令 * shell任务
* HTTP-GET请求 > 在远程服务器上执行shell命令, 调度器与任务执行器保持长连接
* HTTP任务
> 访问指定的URL地址
* 查看任务执行日志 * 查看任务执行日志
* 任务执行结果通知, 支持邮件、Slack * 任务执行结果通知, 支持邮件、Slack
@ -32,7 +34,7 @@
* [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-linux-amd64.tar.gz) * [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-linux-amd64.tar.gz)
* [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-darwin-amd64.tar.gz) * [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-darwin-amd64.tar.gz)
* [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-windows-amd64.zip) * [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-windows-amd64.zip)
* 任务执行器(安装在远程主机上) * 任务执行器(安装在远程主机上, 执行shell命令需安装)
* [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-linux-amd64.tar.gz) * [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-linux-amd64.tar.gz)
* [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-darwin-amd64.tar.gz) * [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-darwin-amd64.tar.gz)
* [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-windows-amd64.zip) * [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-windows-amd64.zip)
@ -76,6 +78,7 @@
* ORM [Xorm](https://github.com/go-xorm/xorm) * ORM [Xorm](https://github.com/go-xorm/xorm)
* UI框架 [Semantic UI](https://semantic-ui.com/) * UI框架 [Semantic UI](https://semantic-ui.com/)
* 依赖管理(所有依赖包放入vendor目录) [Govendor](https://github.com/kardianos/govendor) * 依赖管理(所有依赖包放入vendor目录) [Govendor](https://github.com/kardianos/govendor)
* RPC框架 [gRPC](https://github.com/grpc/grpc)
## 反馈 ## 反馈
提交[issue](https://github.com/ouqiang/gocron/issues/new) 提交[issue](https://github.com/ouqiang/gocron/issues/new)

View File

@ -36,7 +36,7 @@ func (host *Host) Update(id int, data CommonMap) (int64, error) {
// 删除 // 删除
func (host *Host) Delete(id int) (int64, error) { func (host *Host) Delete(id int) (int64, error) {
return Db.Id(id).Delete(host) return Db.Id(id).Delete(new(Host))
} }
func (host *Host) Find(id int) error { func (host *Host) Find(id int) error {

View File

@ -7,7 +7,6 @@ import (
"github.com/go-xorm/xorm" "github.com/go-xorm/xorm"
"gopkg.in/macaron.v1" "gopkg.in/macaron.v1"
"strings" "strings"
"time"
"github.com/ouqiang/gocron/modules/logger" "github.com/ouqiang/gocron/modules/logger"
"github.com/ouqiang/gocron/modules/app" "github.com/ouqiang/gocron/modules/app"
) )
@ -82,8 +81,6 @@ func CreateDb() *xorm.Engine {
engine.Logger().SetLevel(core.LOG_DEBUG) engine.Logger().SetLevel(core.LOG_DEBUG)
} }
go keepDbAlived(engine)
return engine return engine
} }
@ -112,14 +109,6 @@ func getDbEngineDSN(engine string, config map[string]string) string {
return dsn return dsn
} }
// 定时ping, 防止因数据库超时设置连接被断开
func keepDbAlived(engine *xorm.Engine) {
t := time.Tick(180 * time.Second)
for {
<- t
engine.Ping()
}
}
// 获取数据库配置 // 获取数据库配置
func getDbConfig() map[string]string { func getDbConfig() map[string]string {

View File

@ -1,21 +1,23 @@
package client package client
import ( import (
"google.golang.org/grpc"
pb "github.com/ouqiang/gocron/modules/rpc/proto" pb "github.com/ouqiang/gocron/modules/rpc/proto"
"golang.org/x/net/context" "golang.org/x/net/context"
"fmt" "fmt"
"time" "time"
"errors" "errors"
"github.com/ouqiang/gocron/modules/rpc/grpcpool"
) )
func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) { func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
addr := fmt.Sprintf("%s:%d", ip, port); addr := fmt.Sprintf("%s:%d", ip, port);
conn, err := grpc.Dial(addr, grpc.WithInsecure()) conn, err := grpcpool.Pool.Get(addr)
if err != nil { if err != nil {
return "", err return "", err
} }
defer conn.Close() defer func() {
grpcpool.Pool.Put(addr, conn)
}()
c := pb.NewTaskClient(conn) c := pb.NewTaskClient(conn)
if taskReq.Timeout <= 0 || taskReq.Timeout > 86400 { if taskReq.Timeout <= 0 || taskReq.Timeout > 86400 {
taskReq.Timeout = 86400 taskReq.Timeout = 86400

View File

@ -0,0 +1,109 @@
package grpcpool
import (
"github.com/silenceper/pool"
"sync"
"time"
"google.golang.org/grpc"
"errors"
)
var (
Pool GRPCPool
)
var (
ErrInvalidConn = errors.New("invalid connection")
)
func init() {
Pool = GRPCPool{
make(map[string]pool.Pool),
sync.RWMutex{},
}
}
type GRPCPool struct {
// map key格式 ip:port
conns map[string]pool.Pool
sync.RWMutex
}
func (p *GRPCPool) Get(addr string) (*grpc.ClientConn, error) {
p.RLock()
p.RUnlock()
pool, ok := p.conns[addr]
if !ok {
err := p.newCommonPool(addr)
if err != nil {
return nil, err
}
}
pool = p.conns[addr]
conn, err := pool.Get()
if err != nil {
return nil, err
}
return conn.(*grpc.ClientConn), nil
}
func (p *GRPCPool) Put(addr string, conn *grpc.ClientConn) error {
p.RLock()
defer p.RUnlock()
pool, ok := p.conns[addr]
if ok {
return pool.Put(conn)
}
return ErrInvalidConn
}
// 释放连接池
func (p *GRPCPool) Release(addr string) {
p.Lock()
defer p.Unlock()
pool, ok := p.conns[addr]
if !ok {
return
}
pool.Release()
delete(p.conns, addr)
}
// 初始化底层连接池
func (p *GRPCPool) newCommonPool(addr string) (error) {
p.Lock()
defer p.Unlock()
commonPool, ok := p.conns[addr]
if ok {
return nil
}
poolConfig := &pool.PoolConfig{
InitialCap: 1,
MaxCap: 30,
Factory: func() (interface{}, error) {
return grpc.Dial(addr, grpc.WithInsecure())
},
Close: func(v interface{}) error {
conn, ok := v.(*grpc.ClientConn)
if ok && conn != nil {
return conn.Close()
}
return ErrInvalidConn
},
IdleTimeout: 5 * time.Minute,
}
commonPool, err := pool.NewChannelPool(poolConfig)
if err != nil {
return err
}
p.conns[addr] = commonPool
return nil
}

View File

@ -12,6 +12,8 @@ import (
"html/template" "html/template"
"github.com/ouqiang/gocron/routers/base" "github.com/ouqiang/gocron/routers/base"
"github.com/go-macaron/binding" "github.com/go-macaron/binding"
"github.com/ouqiang/gocron/modules/rpc/grpcpool"
"strings"
) )
func Index(ctx *macaron.Context) { func Index(ctx *macaron.Context) {
@ -85,11 +87,17 @@ func Store(ctx *macaron.Context, form HostForm) string {
return json.CommonFailure("主机名已存在") return json.CommonFailure("主机名已存在")
} }
hostModel.Name = form.Name hostModel.Name = strings.TrimSpace(form.Name)
hostModel.Alias = form.Alias hostModel.Alias = strings.TrimSpace(form.Alias)
hostModel.Port = form.Port hostModel.Port = form.Port
hostModel.Remark = form.Remark hostModel.Remark = strings.TrimSpace(form.Remark)
isCreate := false isCreate := false
oldHostModel := new(models.Host)
err = oldHostModel.Find(int(id))
if err != nil {
return json.CommonFailure("主机不存在")
}
if id > 0 { if id > 0 {
_, err = hostModel.UpdateBean(id) _, err = hostModel.UpdateBean(id)
} else { } else {
@ -100,12 +108,18 @@ func Store(ctx *macaron.Context, form HostForm) string {
return json.CommonFailure("保存失败", err) return json.CommonFailure("保存失败", err)
} }
if !isCreate {
oldAddr := fmt.Sprintf("%s:%d", oldHostModel.Name, oldHostModel.Port)
newAddr := fmt.Sprintf("%s:%d", hostModel.Name, hostModel.Port)
if oldAddr != newAddr {
grpcpool.Pool.Release(oldAddr)
}
taskModel := new(models.TaskHost) taskModel := new(models.TaskHost)
tasks, err := taskModel.ActiveListByHostId(id) tasks, err := taskModel.ActiveListByHostId(id)
if err != nil { if err != nil {
return json.CommonFailure("刷新任务主机信息失败", err) return json.CommonFailure("刷新任务主机信息失败", err)
} }
if !isCreate && len(tasks) > 0 {
serviceTask := new(service.Task) serviceTask := new(service.Task)
serviceTask.BatchAdd(tasks) serviceTask.BatchAdd(tasks)
} }
@ -129,11 +143,20 @@ func Remove(ctx *macaron.Context) string {
} }
hostModel := new(models.Host) hostModel := new(models.Host)
err = hostModel.Find(int(id))
if err != nil {
return json.CommonFailure("主机不存在")
}
_, err =hostModel.Delete(id) _, err =hostModel.Delete(id)
if err != nil { if err != nil {
return json.CommonFailure("操作失败", err) return json.CommonFailure("操作失败", err)
} }
addr := fmt.Sprintf("%s:%d", hostModel.Name, hostModel.Port)
grpcpool.Pool.Release(addr)
return json.Success("操作成功", nil) return json.Success("操作成功", nil)
} }

View File

@ -33,7 +33,7 @@
<div class="field"> <div class="field">
<select name="protocol" id="protocol"> <select name="protocol" id="protocol">
<option value="0"></option> <option value="0"></option>
<option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">RPC</option> <option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">SHELL</option>
<option value="1" {{{if eq .Params.Protocol 1}}}selected{{{end}}}>HTTP</option> <option value="1" {{{if eq .Params.Protocol 1}}}selected{{{end}}}>HTTP</option>
</select> </select>
</div> </div>
@ -70,7 +70,7 @@
<td>{{{.Id}}}</td> <td>{{{.Id}}}</td>
<td>{{{.Task.Name}}}</td> <td>{{{.Task.Name}}}</td>
<td>{{{.Spec}}}</td> <td>{{{.Spec}}}</td>
<td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} RPC {{{end}}}</td> <td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SHELL {{{end}}}</td>
<td>{{{if eq .Timeout -1}}}{{{else if gt .Timeout 0}}}{{{.Timeout}}}{{{else}}}{{{end}}}</td> <td>{{{if eq .Timeout -1}}}{{{else if gt .Timeout 0}}}{{{.Timeout}}}{{{else}}}{{{end}}}</td>
<td>{{{.RetryTimes}}}</td> <td>{{{.RetryTimes}}}</td>
<td>{{{if gt .Multi 0}}}{{{else}}}{{{end}}}</td> <td>{{{if gt .Multi 0}}}{{{else}}}{{{end}}}</td>

View File

@ -30,7 +30,7 @@
<div class="field"> <div class="field">
<select name="protocol" id="protocol"> <select name="protocol" id="protocol">
<option value="0"></option> <option value="0"></option>
<option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">RPC</option> <option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">SHELL</option>
<option value="1" {{{if eq .Params.Protocol 1}}}selected{{{end}}}>HTTP</option> <option value="1" {{{if eq .Params.Protocol 1}}}selected{{{end}}}>HTTP</option>
</select> </select>
</div> </div>
@ -68,7 +68,7 @@
<td><a href="/task?id={{{.TaskId}}}">{{{.TaskId}}}</a></td> <td><a href="/task?id={{{.TaskId}}}">{{{.TaskId}}}</a></td>
<td>{{{.Name}}}</td> <td>{{{.Name}}}</td>
<td>{{{.Spec}}}</td> <td>{{{.Spec}}}</td>
<td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} RPC {{{end}}}</td> <td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SHELL {{{end}}}</td>
<td>{{{.RetryTimes}}}</td> <td>{{{.RetryTimes}}}</td>
<td>{{{.Hostname}}}</td> <td>{{{.Hostname}}}</td>
<td> <td>

View File

@ -39,7 +39,7 @@
<div class="field"> <div class="field">
<label></label> <label></label>
<select name="protocol" id="protocol"> <select name="protocol" id="protocol">
<option value="2" {{{if .Task}}} {{{if eq .Task.Protocol 2}}}selected{{{end}}} {{{end}}} data-match="host_id" data-validate-type="selectProtocol">RPC</option> <option value="2" {{{if .Task}}} {{{if eq .Task.Protocol 2}}}selected{{{end}}} {{{end}}} data-match="host_id" data-validate-type="selectProtocol">SHELL</option>
<option value="1" {{{if .Task}}} {{{if eq .Task.Protocol 1}}}selected{{{end}}} {{{end}}}>HTTP</option> <option value="1" {{{if .Task}}} {{{if eq .Task.Protocol 1}}}selected{{{end}}} {{{end}}}>HTTP</option>
</select> </select>
</div> </div>

21
vendor/github.com/silenceper/pool/LICENSE generated vendored Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 nestgo
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

62
vendor/github.com/silenceper/pool/README.md generated vendored Normal file
View File

@ -0,0 +1,62 @@
# pool
[![GoDoc](http://godoc.org/github.com/silenceper/pool?status.svg)](http://godoc.org/github.com/silenceper/pool)
Golang 实现的连接池
##功能:
- 连接池中连接类型为`interface{}`,使得更加通用
- 链接的最大空闲时间,超时的链接将关闭丢弃,可避免空闲时链接自动失效问题
- 使用channel处理池中的链接高效
## 基本用法
```go
//factory 创建连接的方法
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") }
//close 关闭链接的方法
close := func(v interface{}) error { return v.(net.Conn).Close() }
//创建一个连接池: 初始化5最大链接30
poolConfig := &pool.PoolConfig{
InitialCap: 5,
MaxCap: 30,
Factory: factory,
Close: close,
//链接最大空闲时间,超过该时间的链接 将会关闭可避免空闲时链接EOF自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}
//从连接池中取得一个链接
v, err := p.Get()
//do something
//conn=v.(net.Conn)
//将链接放回连接池中
p.Put(v)
//释放连接池中的所有链接
p.Release()
//查看当前链接中的数量
current := p.Len()
```
####注:
该连接池参考 [https://github.com/fatih/pool](https://github.com/fatih/pool) 实现,改变以及增加原有的一些功能。
## License
The MIT License (MIT) - see LICENSE for more details

156
vendor/github.com/silenceper/pool/channel.go generated vendored Normal file
View File

@ -0,0 +1,156 @@
package pool
import (
"errors"
"fmt"
"sync"
"time"
)
//PoolConfig 连接池相关配置
type PoolConfig struct {
//连接池中拥有的最小连接数
InitialCap int
//连接池中拥有的最大的连接数
MaxCap int
//生成连接的方法
Factory func() (interface{}, error)
//关闭链接的方法
Close func(interface{}) error
//链接最大空闲时间,超过该事件则将失效
IdleTimeout time.Duration
}
//channelPool 存放链接信息
type channelPool struct {
mu sync.Mutex
conns chan *idleConn
factory func() (interface{}, error)
close func(interface{}) error
idleTimeout time.Duration
}
type idleConn struct {
conn interface{}
t time.Time
}
//NewChannelPool 初始化链接
func NewChannelPool(poolConfig *PoolConfig) (Pool, error) {
if poolConfig.InitialCap < 0 || poolConfig.MaxCap <= 0 || poolConfig.InitialCap > poolConfig.MaxCap {
return nil, errors.New("invalid capacity settings")
}
c := &channelPool{
conns: make(chan *idleConn, poolConfig.MaxCap),
factory: poolConfig.Factory,
close: poolConfig.Close,
idleTimeout: poolConfig.IdleTimeout,
}
for i := 0; i < poolConfig.InitialCap; i++ {
conn, err := c.factory()
if err != nil {
c.Release()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- &idleConn{conn: conn, t: time.Now()}
}
return c, nil
}
//getConns 获取所有连接
func (c *channelPool) getConns() chan *idleConn {
c.mu.Lock()
conns := c.conns
c.mu.Unlock()
return conns
}
//Get 从pool中取一个连接
func (c *channelPool) Get() (interface{}, error) {
conns := c.getConns()
if conns == nil {
return nil, ErrClosed
}
for {
select {
case wrapConn := <-conns:
if wrapConn == nil {
return nil, ErrClosed
}
//判断是否超时,超时则丢弃
if timeout := c.idleTimeout; timeout > 0 {
if wrapConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该链接
c.Close(wrapConn.conn)
continue
}
}
return wrapConn.conn, nil
default:
conn, err := c.factory()
if err != nil {
return nil, err
}
return conn, nil
}
}
}
//Put 将连接放回pool中
func (c *channelPool) Put(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.conns == nil {
return c.Close(conn)
}
select {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
return nil
default:
//连接池已满,直接关闭该链接
return c.Close(conn)
}
}
//Close 关闭单条连接
func (c *channelPool) Close(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
return c.close(conn)
}
//Release 释放连接池中所有链接
func (c *channelPool) Release() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
closeFun := c.close
c.close = nil
c.mu.Unlock()
if conns == nil {
return
}
close(conns)
for wrapConn := range conns {
closeFun(wrapConn.conn)
}
}
//Len 连接池中已有的连接
func (c *channelPool) Len() int {
return len(c.getConns())
}

21
vendor/github.com/silenceper/pool/pool.go generated vendored Normal file
View File

@ -0,0 +1,21 @@
package pool
import "errors"
var (
//ErrClosed 连接池已经关闭Error
ErrClosed = errors.New("pool is closed")
)
//Pool 基本方法
type Pool interface {
Get() (interface{}, error)
Put(interface{}) error
Close(interface{}) error
Release()
Len() int
}

6
vendor/vendor.json vendored
View File

@ -148,6 +148,12 @@
"revision": "c28ec761087c32fd75ad7514db2a4988d5c872d9", "revision": "c28ec761087c32fd75ad7514db2a4988d5c872d9",
"revisionTime": "2017-05-14T12:16:09Z" "revisionTime": "2017-05-14T12:16:09Z"
}, },
{
"checksumSHA1": "cVGA2CJTJsCAVa5VKTM8k/ma/BU=",
"path": "github.com/silenceper/pool",
"revision": "e4c4536403d9765c8ea74cda2ee49dcb0848bc7a",
"revisionTime": "2016-11-14T14:21:55Z"
},
{ {
"checksumSHA1": "1keN4Q9F8uHk/Gt5YXG0oCEecKM=", "checksumSHA1": "1keN4Q9F8uHk/Gt5YXG0oCEecKM=",
"path": "github.com/urfave/cli", "path": "github.com/urfave/cli",