gRPC去掉连接池, 设置keepAlive

pull/164/head
ouqiang 2019-06-02 11:37:59 +08:00
parent 09985998ca
commit c997ce7cab
14 changed files with 155 additions and 429 deletions

View File

@ -9,7 +9,7 @@ type Host struct {
Id int16 `json:"id" xorm:"smallint pk autoincr"`
Name string `json:"name" xorm:"varchar(64) notnull"` // 主机名称
Alias string `json:"alias" xorm:"varchar(32) notnull default '' "` // 主机别名
Port int `json:"port" xorm:"notnull default 22"` // 主机端口
Port int `json:"port" xorm:"notnull default 5921"` // 主机端口
Remark string `json:"remark" xorm:"varchar(100) notnull default '' "` // 备注
BaseModel `json:"-" xorm:"-"`
Selected bool `json:"-" xorm:"-"`

View File

@ -22,7 +22,7 @@ type TaskLog struct {
StartTime time.Time `json:"start_time" xorm:"datetime created"` // 开始执行时间
EndTime time.Time `json:"end_time" xorm:"datetime updated"` // 执行完成(失败)时间
Status Status `json:"status" xorm:"tinyint notnull index default 1"` // 状态 0:执行失败 1:执行中 2:执行完毕 3:任务取消(上次任务未执行完成) 4:异步执行
Result string `json:"result" xorm:"mediumtext notnull default '' "` // 执行结果
Result string `json:"result" xorm:"mediumtext notnull "` // 执行结果
TotalTime int `json:"total_time" xorm:"-"` // 执行总时长
BaseModel `json:"-" xorm:"-"`
}

View File

@ -42,28 +42,24 @@ func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
}
}()
addr := fmt.Sprintf("%s:%d", ip, port)
conn, err := grpcpool.Pool.Get(addr)
c, err := grpcpool.Pool.Get(addr)
if err != nil {
return "", err
}
isConnClosed := false
defer func() {
if !isConnClosed {
grpcpool.Pool.Put(addr, conn)
}
}()
c := pb.NewTaskClient(conn)
if taskReq.Timeout <= 0 || taskReq.Timeout > 86400 {
taskReq.Timeout = 86400
}
timeout := time.Duration(taskReq.Timeout) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
taskUniqueKey := generateTaskUniqueKey(ip, port, taskReq.Id)
taskMap.Store(taskUniqueKey, cancel)
defer taskMap.Delete(taskUniqueKey)
resp, err := c.Run(ctx, taskReq)
if err != nil {
return parseGRPCError(err, conn, &isConnClosed)
return parseGRPCError(err)
}
if resp.Error == "" {
@ -73,11 +69,9 @@ func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
return resp.Output, errors.New(resp.Error)
}
func parseGRPCError(err error, conn *grpc.ClientConn, connClosed *bool) (string, error) {
func parseGRPCError(err error) (string, error) {
switch grpc.Code(err) {
case codes.Unavailable, codes.Internal:
conn.Close()
*connClosed = true
case codes.Unavailable:
return "", errUnavailable
case codes.DeadlineExceeded:
return "", errors.New("执行超时, 强制结束")

View File

@ -1,140 +1,119 @@
package grpcpool
import (
"errors"
"context"
"strings"
"sync"
"time"
"github.com/ouqiang/gocron/internal/modules/app"
"github.com/ouqiang/gocron/internal/modules/rpc/auth"
"github.com/silenceper/pool"
"github.com/ouqiang/gocron/internal/modules/rpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
const (
backOffMaxDelay = 3 * time.Second
dialTimeout = 2 * time.Second
)
var (
Pool GRPCPool
)
var (
ErrInvalidConn = errors.New("invalid connection")
)
func init() {
Pool = GRPCPool{
make(map[string]pool.Pool),
sync.RWMutex{},
Pool = &GRPCPool{
conns: make(map[string]*Client),
}
keepAliveParams = keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}
)
type Client struct {
conn *grpc.ClientConn
rpcClient rpc.TaskClient
}
type GRPCPool struct {
// map key格式 ip:port
conns map[string]pool.Pool
sync.RWMutex
conns map[string]*Client
mu sync.RWMutex
}
func (p *GRPCPool) Get(addr string) (*grpc.ClientConn, error) {
p.RLock()
pool, ok := p.conns[addr]
p.RUnlock()
func (p *GRPCPool) Get(addr string) (rpc.TaskClient, error) {
p.mu.RLock()
client, ok := p.conns[addr]
p.mu.RUnlock()
if ok {
return client.rpcClient, nil
}
client, err := p.factory(addr)
if err != nil {
return nil, err
}
p.conns[addr] = client
return client.rpcClient, nil
}
// 释放连接
func (p *GRPCPool) Release(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
client, ok := p.conns[addr]
if !ok {
err := p.newCommonPool(addr)
return
}
delete(p.conns, addr)
client.conn.Close()
}
// 创建连接
func (p *GRPCPool) factory(addr string) (*Client, error) {
p.mu.Lock()
defer p.mu.Unlock()
client, ok := p.conns[addr]
if ok {
return client, nil
}
opts := []grpc.DialOption{
grpc.WithKeepaliveParams(keepAliveParams),
grpc.WithBackoffMaxDelay(backOffMaxDelay),
}
if !app.Setting.EnableTLS {
opts = append(opts, grpc.WithInsecure())
} else {
server := strings.Split(addr, ":")
certificate := auth.Certificate{
CAFile: app.Setting.CAFile,
CertFile: app.Setting.CertFile,
KeyFile: app.Setting.KeyFile,
ServerName: server[0],
}
transportCreds, err := certificate.GetTransportCredsForClient()
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithTransportCredentials(transportCreds))
}
p.RLock()
pool = p.conns[addr]
p.RUnlock()
conn, err := pool.Get()
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, err
}
return conn.(*grpc.ClientConn), nil
}
func (p *GRPCPool) Put(addr string, conn *grpc.ClientConn) error {
p.RLock()
defer p.RUnlock()
pool, ok := p.conns[addr]
if ok {
return pool.Put(conn)
}
return ErrInvalidConn
}
// 释放连接池
func (p *GRPCPool) Release(addr string) {
p.Lock()
defer p.Unlock()
pool, ok := p.conns[addr]
if !ok {
return
}
pool.Release()
delete(p.conns, addr)
}
// 释放所有连接池
func (p *GRPCPool) ReleaseAll() {
p.Lock()
defer p.Unlock()
for _, pool := range p.conns {
pool.Release()
}
}
// 初始化底层连接池
func (p *GRPCPool) newCommonPool(addr string) error {
p.Lock()
defer p.Unlock()
commonPool, ok := p.conns[addr]
if ok {
return nil
}
poolConfig := &pool.PoolConfig{
InitialCap: 1,
MaxCap: 30,
Factory: func() (interface{}, error) {
if !app.Setting.EnableTLS {
return grpc.Dial(addr, grpc.WithInsecure())
}
server := strings.Split(addr, ":")
certificate := auth.Certificate{
CAFile: app.Setting.CAFile,
CertFile: app.Setting.CertFile,
KeyFile: app.Setting.KeyFile,
ServerName: server[0],
}
transportCreds, err := certificate.GetTransportCredsForClient()
if err != nil {
return nil, err
}
return grpc.Dial(addr, grpc.WithTransportCredentials(transportCreds))
},
Close: func(v interface{}) error {
conn, ok := v.(*grpc.ClientConn)
if ok && conn != nil {
return conn.Close()
}
return ErrInvalidConn
},
IdleTimeout: 3 * time.Minute,
}
commonPool, err := pool.NewChannelPool(poolConfig)
if err != nil {
return err
}
p.conns[addr] = commonPool
return nil
client = &Client{
conn: conn,
rpcClient: rpc.NewTaskClient(conn),
}
return client, nil
}

View File

@ -2,6 +2,9 @@ package server
import (
"net"
"time"
"google.golang.org/grpc/keepalive"
"github.com/ouqiang/gocron/internal/modules/rpc/auth"
pb "github.com/ouqiang/gocron/internal/modules/rpc/proto"
@ -14,6 +17,19 @@ import (
type Server struct{}
var keepAlivePolicy = keepalive.EnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
}
var keepAliveParams = keepalive.ServerParameters{
MaxConnectionIdle: 1 * time.Minute,
MaxConnectionAge: 2 * time.Hour,
MaxConnectionAgeGrace: 3 * time.Hour,
Time: 30 * time.Second,
Timeout: 3 * time.Second,
}
func (s Server) Run(ctx context.Context, req *pb.TaskRequest) (*pb.TaskResponse, error) {
defer func() {
if err := recover(); err != nil {
@ -39,21 +55,24 @@ func Start(addr string, enableTLS bool, certificate auth.Certificate) {
}
var s *grpc.Server
opts := []grpc.ServerOption{
grpc.KeepaliveParams(keepAliveParams),
grpc.KeepaliveEnforcementPolicy(keepAlivePolicy),
}
if enableTLS {
tlsConfig, err := certificate.GetTLSConfigForServer()
if err != nil {
grpclog.Fatal(err)
}
opt := grpc.Creds(credentials.NewTLS(tlsConfig))
s = grpc.NewServer(opt)
pb.RegisterTaskServer(s, Server{})
grpclog.Printf("listen %s with TLS", addr)
} else {
s = grpc.NewServer()
pb.RegisterTaskServer(s, Server{})
grpclog.Printf("listen %s", addr)
opts = append(opts, opt)
}
s = grpc.NewServer(opts...)
pb.RegisterTaskServer(s, Server{})
grpclog.Printf("server listen on %s", addr)
err = s.Serve(l)
grpclog.Fatal(err)
if err != nil {
grpclog.Fatal(err)
}
}

View File

@ -4,6 +4,8 @@ import (
"strconv"
"strings"
"github.com/ouqiang/goutil"
"github.com/go-macaron/binding"
"github.com/jakecoffman/cron"
"github.com/ouqiang/gocron/internal/models"
@ -149,7 +151,9 @@ func Store(ctx *macaron.Context, form TaskForm) string {
}
if taskModel.Level == models.TaskLevelParent {
_, err = cron.Parse(form.Spec)
err = goutil.PanicToError(func() {
cron.Parse(form.Spec)
})
if err != nil {
return json.CommonFailure("crontab表达式解析失败", err)
}

View File

@ -9,6 +9,8 @@ import (
"sync"
"time"
"github.com/ouqiang/goutil"
"github.com/jakecoffman/cron"
"github.com/ouqiang/gocron/internal/models"
"github.com/ouqiang/gocron/internal/modules/app"
@ -160,7 +162,9 @@ func (task Task) Add(taskModel models.Task) {
}
cronName := strconv.Itoa(taskModel.Id)
err := serviceCron.AddFunc(taskModel.Spec, taskFunc, cronName)
err := goutil.PanicToError(func() {
serviceCron.AddFunc(taskModel.Spec, taskFunc, cronName)
})
if err != nil {
logger.Error("添加任务到调度器失败#", err)
}

View File

@ -90,18 +90,13 @@ type FuncJob func()
func (f FuncJob) Run() { f() }
// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func(), name string) error {
return c.AddJob(spec, FuncJob(cmd), name)
func (c *Cron) AddFunc(spec string, cmd func(), name string) {
c.AddJob(spec, FuncJob(cmd), name)
}
// AddFunc adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job, name string) error {
schedule, err := Parse(spec)
if err != nil {
return err
}
c.Schedule(schedule, cmd, name)
return nil
func (c *Cron) AddJob(spec string, cmd Job, name string) {
c.Schedule(Parse(spec), cmd, name)
}
// RemoveJob removes a Job from the Cron based on name.
@ -244,7 +239,7 @@ func (c *Cron) entrySnapshot() []*Entry {
Next: e.Next,
Prev: e.Prev,
Job: e.Job,
Name: e.Name,
Name: e.Name,
})
}
return entries

View File

@ -1,7 +1,6 @@
package cron
import (
"fmt"
"log"
"math"
"strconv"
@ -15,10 +14,7 @@ import (
// It accepts
// - Full crontab specs, e.g. "* * * * * ?"
// - Descriptors, e.g. "@midnight", "@every 1h30m"
func Parse(spec string) (Schedule, error) {
if len(spec) == 0 {
return nil, fmt.Errorf("Empty spec string")
}
func Parse(spec string) Schedule {
if spec[0] == '@' {
return parseDescriptor(spec)
}
@ -27,7 +23,7 @@ func Parse(spec string) (Schedule, error) {
// (second) (minute) (hour) (day of month) (month) (day of week, optional)
fields := strings.Fields(spec)
if len(fields) != 5 && len(fields) != 6 {
return nil, fmt.Errorf("Expected 5 or 6 fields, found %d: %s", len(fields), spec)
log.Panicf("Expected 5 or 6 fields, found %d: %s", len(fields), spec)
}
// If a sixth field is not provided (DayOfWeek), then it is equivalent to star.
@ -44,7 +40,7 @@ func Parse(spec string) (Schedule, error) {
Dow: getField(fields[5], dow),
}
return schedule, nil
return schedule
}
// getField returns an Int with the bits set representing all of the times that
@ -160,7 +156,7 @@ func all(r bounds) uint64 {
// parseDescriptor returns a pre-defined schedule for the expression, or panics
// if none matches.
func parseDescriptor(spec string) (Schedule, error) {
func parseDescriptor(spec string) Schedule {
switch spec {
case "@yearly", "@annually":
return &SpecSchedule{
@ -170,7 +166,7 @@ func parseDescriptor(spec string) (Schedule, error) {
Dom: 1 << dom.min,
Month: 1 << months.min,
Dow: all(dow),
}, nil
}
case "@monthly":
return &SpecSchedule{
@ -180,7 +176,7 @@ func parseDescriptor(spec string) (Schedule, error) {
Dom: 1 << dom.min,
Month: all(months),
Dow: all(dow),
}, nil
}
case "@weekly":
return &SpecSchedule{
@ -190,7 +186,7 @@ func parseDescriptor(spec string) (Schedule, error) {
Dom: all(dom),
Month: all(months),
Dow: 1 << dow.min,
}, nil
}
case "@daily", "@midnight":
return &SpecSchedule{
@ -200,7 +196,7 @@ func parseDescriptor(spec string) (Schedule, error) {
Dom: all(dom),
Month: all(months),
Dow: all(dow),
}, nil
}
case "@hourly":
return &SpecSchedule{
@ -210,17 +206,18 @@ func parseDescriptor(spec string) (Schedule, error) {
Dom: all(dom),
Month: all(months),
Dow: all(dow),
}, nil
}
}
const every = "@every "
if strings.HasPrefix(spec, every) {
duration, err := time.ParseDuration(spec[len(every):])
if err != nil {
return nil, fmt.Errorf("Failed to parse duration %s: %s", spec, err)
log.Panicf("Failed to parse duration %s: %s", spec, err)
}
return Every(duration), nil
return Every(duration)
}
return nil, fmt.Errorf("Unrecognized descriptor: %s", spec)
log.Panicf("Unrecognized descriptor: %s", spec)
return nil
}

View File

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2016 nestgo
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,62 +0,0 @@
# pool
[![GoDoc](http://godoc.org/github.com/silenceper/pool?status.svg)](http://godoc.org/github.com/silenceper/pool)
Golang 实现的连接池
##功能:
- 连接池中连接类型为`interface{}`,使得更加通用
- 链接的最大空闲时间,超时的链接将关闭丢弃,可避免空闲时链接自动失效问题
- 使用channel处理池中的链接高效
## 基本用法
```go
//factory 创建连接的方法
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") }
//close 关闭链接的方法
close := func(v interface{}) error { return v.(net.Conn).Close() }
//创建一个连接池: 初始化5最大链接30
poolConfig := &pool.PoolConfig{
InitialCap: 5,
MaxCap: 30,
Factory: factory,
Close: close,
//链接最大空闲时间,超过该时间的链接 将会关闭可避免空闲时链接EOF自动失效的问题
IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
fmt.Println("err=", err)
}
//从连接池中取得一个链接
v, err := p.Get()
//do something
//conn=v.(net.Conn)
//将链接放回连接池中
p.Put(v)
//释放连接池中的所有链接
p.Release()
//查看当前链接中的数量
current := p.Len()
```
####注:
该连接池参考 [https://github.com/fatih/pool](https://github.com/fatih/pool) 实现,改变以及增加原有的一些功能。
## License
The MIT License (MIT) - see LICENSE for more details

View File

@ -1,156 +0,0 @@
package pool
import (
"errors"
"fmt"
"sync"
"time"
)
//PoolConfig 连接池相关配置
type PoolConfig struct {
//连接池中拥有的最小连接数
InitialCap int
//连接池中拥有的最大的连接数
MaxCap int
//生成连接的方法
Factory func() (interface{}, error)
//关闭链接的方法
Close func(interface{}) error
//链接最大空闲时间,超过该事件则将失效
IdleTimeout time.Duration
}
//channelPool 存放链接信息
type channelPool struct {
mu sync.Mutex
conns chan *idleConn
factory func() (interface{}, error)
close func(interface{}) error
idleTimeout time.Duration
}
type idleConn struct {
conn interface{}
t time.Time
}
//NewChannelPool 初始化链接
func NewChannelPool(poolConfig *PoolConfig) (Pool, error) {
if poolConfig.InitialCap < 0 || poolConfig.MaxCap <= 0 || poolConfig.InitialCap > poolConfig.MaxCap {
return nil, errors.New("invalid capacity settings")
}
c := &channelPool{
conns: make(chan *idleConn, poolConfig.MaxCap),
factory: poolConfig.Factory,
close: poolConfig.Close,
idleTimeout: poolConfig.IdleTimeout,
}
for i := 0; i < poolConfig.InitialCap; i++ {
conn, err := c.factory()
if err != nil {
c.Release()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- &idleConn{conn: conn, t: time.Now()}
}
return c, nil
}
//getConns 获取所有连接
func (c *channelPool) getConns() chan *idleConn {
c.mu.Lock()
conns := c.conns
c.mu.Unlock()
return conns
}
//Get 从pool中取一个连接
func (c *channelPool) Get() (interface{}, error) {
conns := c.getConns()
if conns == nil {
return nil, ErrClosed
}
for {
select {
case wrapConn := <-conns:
if wrapConn == nil {
return nil, ErrClosed
}
//判断是否超时,超时则丢弃
if timeout := c.idleTimeout; timeout > 0 {
if wrapConn.t.Add(timeout).Before(time.Now()) {
//丢弃并关闭该链接
c.Close(wrapConn.conn)
continue
}
}
return wrapConn.conn, nil
default:
conn, err := c.factory()
if err != nil {
return nil, err
}
return conn, nil
}
}
}
//Put 将连接放回pool中
func (c *channelPool) Put(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
defer c.mu.Unlock()
if c.conns == nil {
return c.Close(conn)
}
select {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
return nil
default:
//连接池已满,直接关闭该链接
return c.Close(conn)
}
}
//Close 关闭单条连接
func (c *channelPool) Close(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
return c.close(conn)
}
//Release 释放连接池中所有链接
func (c *channelPool) Release() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
closeFun := c.close
c.close = nil
c.mu.Unlock()
if conns == nil {
return
}
close(conns)
for wrapConn := range conns {
closeFun(wrapConn.conn)
}
}
//Len 连接池中已有的连接
func (c *channelPool) Len() int {
return len(c.getConns())
}

View File

@ -1,21 +0,0 @@
package pool
import "errors"
var (
//ErrClosed 连接池已经关闭Error
ErrClosed = errors.New("pool is closed")
)
//Pool 基本方法
type Pool interface {
Get() (interface{}, error)
Put(interface{}) error
Close(interface{}) error
Release()
Len() int
}

12
vendor/vendor.json vendored
View File

@ -113,10 +113,10 @@
"revision": ""
},
{
"checksumSHA1": "j22mTM0X/UI4kbff6RaPeMNH4XY=",
"checksumSHA1": "nzZPoKyx/lv8E/J7zZxuNQ5P1Bk=",
"path": "github.com/jakecoffman/cron",
"revision": "57ac9950da80b6e2c12df9042429278cf8c729eb",
"revisionTime": "2016-09-12T16:42:50Z"
"revision": "7e2009c226a5f6fb032692106d167ab0c3720692",
"revisionTime": "2019-01-06T20:08:28Z"
},
{
"checksumSHA1": "iKPMvbAueGfdyHcWCgzwKzm8WVo=",
@ -154,12 +154,6 @@
"revision": "fcdcb7b85139b8b9ca1d8ef1d98d8593e90d117f",
"revisionTime": "2018-03-30T20:43:14Z"
},
{
"checksumSHA1": "cVGA2CJTJsCAVa5VKTM8k/ma/BU=",
"path": "github.com/silenceper/pool",
"revision": "e4c4536403d9765c8ea74cda2ee49dcb0848bc7a",
"revisionTime": "2016-11-14T14:21:55Z"
},
{
"checksumSHA1": "1keN4Q9F8uHk/Gt5YXG0oCEecKM=",
"path": "github.com/urfave/cli",