gocron/internal/modules/rpc/grpcpool/grpc_pool.go

141 lines
2.4 KiB
Go
Raw Normal View History

2017-05-28 15:13:22 +00:00
package grpcpool
import (
2017-09-16 09:58:33 +00:00
"errors"
2018-01-30 11:26:04 +00:00
"strings"
"sync"
"time"
2018-03-25 05:12:12 +00:00
"github.com/ouqiang/gocron/internal/modules/app"
"github.com/ouqiang/gocron/internal/modules/rpc/auth"
2017-09-16 09:58:33 +00:00
"github.com/silenceper/pool"
"google.golang.org/grpc"
2017-05-28 15:13:22 +00:00
)
var (
2017-09-16 09:58:33 +00:00
Pool GRPCPool
2017-05-28 15:13:22 +00:00
)
var (
2017-09-16 09:58:33 +00:00
ErrInvalidConn = errors.New("invalid connection")
2017-05-28 15:13:22 +00:00
)
2017-09-16 09:58:33 +00:00
func init() {
Pool = GRPCPool{
make(map[string]pool.Pool),
sync.RWMutex{},
}
2017-05-28 15:13:22 +00:00
}
type GRPCPool struct {
2017-09-16 09:58:33 +00:00
// map key格式 ip:port
conns map[string]pool.Pool
sync.RWMutex
2017-05-28 15:13:22 +00:00
}
2017-09-16 09:58:33 +00:00
func (p *GRPCPool) Get(addr string) (*grpc.ClientConn, error) {
p.RLock()
pool, ok := p.conns[addr]
p.RUnlock()
if !ok {
err := p.newCommonPool(addr)
if err != nil {
return nil, err
}
}
p.RLock()
pool = p.conns[addr]
p.RUnlock()
conn, err := pool.Get()
if err != nil {
return nil, err
}
return conn.(*grpc.ClientConn), nil
2017-05-28 15:13:22 +00:00
}
func (p *GRPCPool) Put(addr string, conn *grpc.ClientConn) error {
2017-09-16 09:58:33 +00:00
p.RLock()
defer p.RUnlock()
pool, ok := p.conns[addr]
if ok {
return pool.Put(conn)
}
return ErrInvalidConn
2017-05-28 15:13:22 +00:00
}
// 释放连接池
func (p *GRPCPool) Release(addr string) {
2017-09-16 09:58:33 +00:00
p.Lock()
defer p.Unlock()
pool, ok := p.conns[addr]
if !ok {
return
}
pool.Release()
delete(p.conns, addr)
2017-05-28 15:13:22 +00:00
}
2017-05-29 09:05:21 +00:00
// 释放所有连接池
2017-09-16 09:58:33 +00:00
func (p *GRPCPool) ReleaseAll() {
p.Lock()
defer p.Unlock()
for _, pool := range p.conns {
pool.Release()
}
2017-05-29 09:05:21 +00:00
}
2017-05-28 15:13:22 +00:00
// 初始化底层连接池
2017-09-16 09:58:33 +00:00
func (p *GRPCPool) newCommonPool(addr string) error {
p.Lock()
defer p.Unlock()
commonPool, ok := p.conns[addr]
if ok {
return nil
}
poolConfig := &pool.PoolConfig{
InitialCap: 1,
MaxCap: 30,
Factory: func() (interface{}, error) {
if !app.Setting.EnableTLS {
return grpc.Dial(addr, grpc.WithInsecure())
}
server := strings.Split(addr, ":")
certificate := auth.Certificate{
CAFile: app.Setting.CAFile,
CertFile: app.Setting.CertFile,
KeyFile: app.Setting.KeyFile,
ServerName: server[0],
}
transportCreds, err := certificate.GetTransportCredsForClient()
if err != nil {
return nil, err
}
return grpc.Dial(addr, grpc.WithTransportCredentials(transportCreds))
},
Close: func(v interface{}) error {
conn, ok := v.(*grpc.ClientConn)
if ok && conn != nil {
return conn.Close()
}
return ErrInvalidConn
},
IdleTimeout: 3 * time.Minute,
}
commonPool, err := pool.NewChannelPool(poolConfig)
if err != nil {
return err
}
p.conns[addr] = commonPool
return nil
}