gRPC客户端不可用连接, 不放回连接池, 直接关闭

pull/21/merge
ouqiang 2017-05-29 10:19:58 +08:00
parent 5a8c3391f5
commit 8e6b212b1d
1 changed files with 21 additions and 4 deletions

View File

@ -7,16 +7,21 @@ import (
"time" "time"
"errors" "errors"
"github.com/ouqiang/gocron/modules/rpc/grpcpool" "github.com/ouqiang/gocron/modules/rpc/grpcpool"
"google.golang.org/grpc/codes"
"google.golang.org/grpc"
) )
func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) { func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
addr := fmt.Sprintf("%s:%d", ip, port); addr := fmt.Sprintf("%s:%d", ip, port)
conn, err := grpcpool.Pool.Get(addr) conn, err := grpcpool.Pool.Get(addr)
if err != nil { if err != nil {
return "", err return "", err
} }
isConnClosed := false
defer func() { defer func() {
grpcpool.Pool.Put(addr, conn) if !isConnClosed {
grpcpool.Pool.Put(addr, conn)
}
}() }()
c := pb.NewTaskClient(conn) c := pb.NewTaskClient(conn)
if taskReq.Timeout <= 0 || taskReq.Timeout > 86400 { if taskReq.Timeout <= 0 || taskReq.Timeout > 86400 {
@ -27,7 +32,7 @@ func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
defer cancel() defer cancel()
resp, err := c.Run(ctx, taskReq) resp, err := c.Run(ctx, taskReq)
if err != nil { if err != nil {
return "", err return parseGRPCError(err, conn, &isConnClosed)
} }
if resp.Error == "" { if resp.Error == "" {
@ -36,3 +41,15 @@ func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
return resp.Output, errors.New(resp.Error) return resp.Output, errors.New(resp.Error)
} }
func parseGRPCError(err error, conn *grpc.ClientConn, connClosed *bool) (string, error) {
switch grpc.Code(err) {
case codes.Unavailable:
conn.Close()
*connClosed = true
return "", errors.New("无法连接远程服务器")
case codes.DeadlineExceeded:
return "", errors.New("执行超时, 强制结束")
}
return "", err
}