RPC调用超时处理

pull/21/merge
ouqiang 2017-05-27 13:10:57 +08:00
parent ef996f8536
commit 527780105f
7 changed files with 42 additions and 23 deletions

View File

@ -11,7 +11,7 @@
* 任务超时设置
* 延时任务
* 任务执行方式
* RPC调用
* RPC调用执行远程shell命令
* HTTP-GET请求
* 查看任务执行日志
* 任务执行结果通知, 支持邮件、Slack
@ -32,7 +32,7 @@
* [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-linux-amd64.tar.gz)
* [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-darwin-amd64.tar.gz)
* [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-windows-amd64.zip)
* 任务执行器(安装在远程主机上, RPC调用需安装)
* 任务执行器(安装在远程主机上)
* [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-linux-amd64.tar.gz)
* [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-darwin-amd64.tar.gz)
* [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-windows-amd64.zip)

View File

@ -10,7 +10,7 @@ import (
"github.com/ouqiang/gocron/cmd"
)
const AppVersion = "0.3"
const AppVersion = "0.4"
func main() {
app := cli.NewApp()

View File

@ -6,13 +6,14 @@ import (
"golang.org/x/net/context"
"fmt"
"time"
"errors"
)
func Exec(ip string, port int, taskReq *pb.TaskRequest) (output string, err error) {
func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
addr := fmt.Sprintf("%s:%d", ip, port);
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return
return "", err
}
defer conn.Close()
c := pb.NewTaskClient(conn)
@ -23,9 +24,12 @@ func Exec(ip string, port int, taskReq *pb.TaskRequest) (output string, err erro
ctx, _ := context.WithTimeout(context.Background(), timeout)
resp, err := c.Run(ctx, taskReq)
if err != nil {
return
return "", err
}
output = resp.Output
return
if resp.Error == "" {
return resp.Output, nil
}
return resp.Output, errors.New(resp.Error)
}

View File

@ -59,6 +59,7 @@ func (m *TaskRequest) GetTimeout() int32 {
type TaskResponse struct {
Output string `protobuf:"bytes,1,opt,name=output" json:"output,omitempty"`
Error string `protobuf:"bytes,2,opt,name=error" json:"error,omitempty"`
}
func (m *TaskResponse) Reset() { *m = TaskResponse{} }
@ -73,6 +74,13 @@ func (m *TaskResponse) GetOutput() string {
return ""
}
func (m *TaskResponse) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func init() {
proto.RegisterType((*TaskRequest)(nil), "rpc.TaskRequest")
proto.RegisterType((*TaskResponse)(nil), "rpc.TaskResponse")
@ -153,15 +161,16 @@ var _Task_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("task.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 157 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2c, 0xce,
0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0x72, 0xe4, 0xe2, 0x0e,
0x49, 0x2c, 0xce, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x92, 0xe0, 0x62, 0x4f, 0xce,
0xcf, 0xcd, 0x4d, 0xcc, 0x4b, 0x91, 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x71, 0x41, 0x32,
0x25, 0x99, 0xb9, 0xa9, 0xf9, 0xa5, 0x25, 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x30, 0xae,
0x92, 0x1a, 0x17, 0x0f, 0xc4, 0x88, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x21, 0x31, 0x2e, 0xb6,
0xfc, 0xd2, 0x92, 0x82, 0xd2, 0x12, 0x09, 0x46, 0xb0, 0x11, 0x50, 0x9e, 0x91, 0x09, 0x17, 0x0b,
0x48, 0x9d, 0x90, 0x0e, 0x17, 0x73, 0x50, 0x69, 0x9e, 0x90, 0x80, 0x5e, 0x51, 0x41, 0xb2, 0x1e,
0x92, 0xe5, 0x52, 0x82, 0x48, 0x22, 0x10, 0xb3, 0x94, 0x18, 0x92, 0xd8, 0xc0, 0x8e, 0x35, 0x06,
0x04, 0x00, 0x00, 0xff, 0xff, 0x82, 0x08, 0x5d, 0x10, 0xba, 0x00, 0x00, 0x00,
// 170 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x8f, 0xb1, 0x0e, 0x82, 0x40,
0x0c, 0x86, 0x45, 0x04, 0x63, 0x75, 0xd0, 0xc6, 0x98, 0x8b, 0x13, 0x61, 0x62, 0x30, 0x0c, 0xea,
0xe8, 0xe2, 0x2b, 0x5c, 0x7c, 0x01, 0xc4, 0x1b, 0x0c, 0x81, 0x9e, 0xbd, 0xde, 0xfb, 0x1b, 0x38,
0x48, 0x18, 0xbf, 0x36, 0xfd, 0xfa, 0xff, 0x00, 0x52, 0xb9, 0xa6, 0xb4, 0x4c, 0x42, 0x18, 0xb3,
0xad, 0xf3, 0x27, 0x6c, 0x5f, 0x95, 0x6b, 0xb4, 0xf9, 0x79, 0xe3, 0x04, 0x15, 0xac, 0x6b, 0x6a,
0xdb, 0xaa, 0xfb, 0xa8, 0x65, 0x16, 0x15, 0x1b, 0x3d, 0x61, 0xbf, 0x91, 0x6f, 0x6b, 0xc8, 0x8b,
0x8a, 0xb3, 0xa8, 0x48, 0xf4, 0x84, 0xf9, 0x03, 0x76, 0x41, 0xe1, 0x2c, 0x75, 0xce, 0xe0, 0x09,
0x52, 0xf2, 0x62, 0xbd, 0xa8, 0x68, 0x50, 0x8c, 0x84, 0x47, 0x48, 0x0c, 0x33, 0xf1, 0x68, 0x0e,
0x70, 0xbd, 0xc3, 0xaa, 0xbf, 0xc6, 0x0b, 0xc4, 0xda, 0x77, 0xb8, 0x2f, 0xd9, 0xd6, 0xe5, 0x2c,
0xd2, 0xf9, 0x30, 0x9b, 0x84, 0x0f, 0xf9, 0xe2, 0x9d, 0x0e, 0x15, 0x6e, 0xff, 0x00, 0x00, 0x00,
0xff, 0xff, 0xef, 0x3c, 0x71, 0x6b, 0xd0, 0x00, 0x00, 0x00,
}

View File

@ -12,5 +12,6 @@ message TaskRequest {
}
message TaskResponse {
string output = 1; //
string output = 1; //
string error = 2; //
}

View File

@ -15,8 +15,13 @@ func (s Server) Run(ctx context.Context, req *pb.TaskRequest) (*pb.TaskResponse,
output, err := utils.ExecShell(ctx, req.Command)
resp := new(pb.TaskResponse)
resp.Output = output
if err != nil {
resp.Error = err.Error()
} else {
resp.Error = ""
}
return resp, err
return resp, nil
}
func Start(addr string) {

View File

@ -65,13 +65,13 @@
<div class="six fields">
<div class="field">
<label>()</label>
<input type="text" name="timeout" placeholder="默认0, 不限制" value="{{{.Task.Timeout}}}">
<input type="text" name="timeout" placeholder="默认0, 不限制" value="{{{if .Task}}} {{{.Task.Timeout}}} {{{else}}} 0 {{{end}}}">
</div>
</div>
<div class="six fields">
<div class="field">
<label></label>
<input type="text" name="retry_times" placeholder="默认0, 不重试" value="{{{.Task.RetryTimes}}}">
<input type="text" name="retry_times" placeholder="默认0, 不重试" value="{{{if .Task}}} {{{.Task.RetryTimes}}} {{{else}}} 0 {{{end}}}">
</div>
</div>
<div class="three fields">