RPC调用使用context控制超时

pull/21/merge
ouqiang 2017-05-26 21:59:01 +08:00
parent ccad05e50f
commit 6e0afb91c0
10 changed files with 112 additions and 75 deletions

View File

@ -1,5 +1,5 @@
[![Build Status](https://travis-ci.org/ouqiang/gocron.png)](https://travis-ci.org/ouqiang/gocron)
# gocron - 定时任务web管理系统
# gocron - 定时任务管理系统
# 项目简介
使用Go语言开发的定时任务集中调度和管理系统, 用于替代Linux-crontab [查看文档](https://github.com/ouqiang/gocron/wiki)
@ -11,9 +11,8 @@
* 任务超时设置
* 延时任务
* 任务执行方式
* 调用本机系统命令
* 通过SSH执行远程命令
* 执行HTTP-GET请求
* RPC调用
* HTTP-GET请求
* 查看任务执行日志
* 任务执行结果通知, 支持邮件、Slack
@ -29,23 +28,35 @@
## 下载
* [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-linux-amd64.tar.gz)
* [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-darwin-amd64.tar.gz)
* [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-windows-amd64.zip)
* 调度器(管理后台)
* [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-linux-amd64.tar.gz)
* [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-darwin-amd64.tar.gz)
* [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-windows-amd64.zip)
* 任务执行器(安装在远程主机上, RPC调用需安装)
* [Linux-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-linux-amd64.tar.gz)
* [Mac OS-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-darwin-amd64.tar.gz)
* [Windows-64位](http://opns468ov.bkt.clouddn.com/gocron/gocron-node-windows-amd64.zip)
## 安装
### 二进制安装
1. 解压压缩包
2. `cd 解压目录`
3. 启动
* Windows: `gocron.exe web`
* Linux、Mac OS: `./gocron web`
3. 启动
* 调度器启动
* Windows: `gocron.exe web`
* Linux、Mac OS: `./gocron web`
* 任务执行器启动
* Windows: `gocron-node.exe ip:port`
* Linux、Mac OS: `./gocron-node ip:port`
4. 浏览器访问 http://localhost:5920
### 源码安装
1. `go`语言版本1.7+
2. `go get -d github.com/ouqiang/gocron`
3. 编译 `go build`
3. 编译
* 调度器 `go build -tags gocron`
* 任务执行器 `go build -tags node`
4. 启动、访问方式同上
### 命令
@ -57,19 +68,14 @@
* -h 查看帮助
* gocron serv
* -s stop|status stop:停止gocron status:查看运行状态
## 安全
* 使用`https`访问保证数据传输安全, 可在web服务器如nginx中配置https通过反向代理访问内部的gocron
* 网站访问设置IP白名单
* SSH登录设置IP白名单
* gocron-node ip:port, 默认0.0.0.0:5921
## 程序使用的组件
* web框架 [Macaron](http://go-macaron.com/)
* 定时任务调度 [cron](https://github.com/robfig/cron)
* 定时任务调度 [Cron](https://github.com/robfig/cron)
* ORM [Xorm](https://github.com/go-xorm/xorm)
* UI框架 [Semantic UI](https://semantic-ui.com/)
* 依赖管理(所有依赖包放入vendor目录) [govendor](https://github.com/kardianos/govendor)
* 依赖管理(所有依赖包放入vendor目录) [Govendor](https://github.com/kardianos/govendor)
## 反馈
提交[issue](https://github.com/ouqiang/gocron/issues/new)

View File

@ -52,7 +52,7 @@ if [[ $ARCH != '386' && $ARCH != 'amd64' ]];then
exit 1
fi
echo '开始编译调度中心'
echo '开始编译调度'
if [[ $OS = 'windows' ]];then
GOOS=$OS GOARCH=$ARCH go build -tags gocron -ldflags '-w -H windowsgui'
else

View File

@ -6,15 +6,14 @@ package main
import (
"github.com/ouqiang/gocron/modules/rpc/server"
"os"
"fmt"
)
func main() {
var addr string
if (len(os.Args) < 2) {
fmt.Println("usage ./gocron-node addr:port")
os.Exit(1)
}
addr = os.Args[1]
addr = "0.0.0.0:5921"
} else {
addr = os.Args[1]
}
server.Start(addr)
}

View File

@ -25,7 +25,6 @@ const (
Running Status = 1 // 运行中
Finish Status = 2 // 完成
Cancel Status = 3 // 取消
Background Status = 4 // 后台运行
Waiting Status = 5 // 等待中
)

View File

@ -5,6 +5,7 @@ import (
pb "github.com/ouqiang/gocron/modules/rpc/proto"
"golang.org/x/net/context"
"fmt"
"time"
)
func Exec(ip string, port int, taskReq *pb.TaskRequest) (output string, err error) {
@ -15,7 +16,12 @@ func Exec(ip string, port int, taskReq *pb.TaskRequest) (output string, err erro
}
defer conn.Close()
c := pb.NewTaskClient(conn)
resp, err := c.Run(context.Background(), taskReq)
if taskReq.Timeout <= 0 || taskReq.Timeout > 86400 {
taskReq.Timeout = 86400
}
timeout := time.Duration(taskReq.Timeout) * time.Second
ctx, _ := context.WithTimeout(context.Background(), timeout)
resp, err := c.Run(ctx, taskReq)
if err != nil {
return
}

View File

@ -12,7 +12,7 @@ import (
type Server struct {}
func (s Server) Run(ctx context.Context, req *pb.TaskRequest) (*pb.TaskResponse, error) {
output, err := utils.ExecShellWithTimeout(int(req.Timeout), req.Command)
output, err := utils.ExecShell(ctx, req.Command)
resp := new(pb.TaskResponse)
resp.Output = output
@ -27,6 +27,9 @@ func Start(addr string) {
s := grpc.NewServer()
pb.RegisterTaskServer(s, Server{})
grpclog.Println("listen address ", addr)
s.Serve(l)
err = s.Serve(l)
if err != nil {
grpclog.Fatal(err)
}
}

View File

@ -5,29 +5,34 @@ package utils
import (
"os/exec"
"syscall"
"time"
"golang.org/x/net/context"
"errors"
)
type Result struct {
output string
err error
}
// 执行shell命令可设置执行超时时间
func ExecShellWithTimeout(timeout int, command string) (string, error) {
func ExecShell(ctx context.Context, command string) (string, error) {
cmd := exec.Command("/bin/bash", "-c", command)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
// 不限制超时
if timeout == 0 {
var resultChan chan Result = make(chan Result)
go func() {
output ,err := cmd.CombinedOutput()
return string(output), err
resultChan <- Result{string(output), err}
}()
select {
case <- ctx.Done():
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
return "", errors.New("timeout killed")
case result := <- resultChan:
return result.output, result.err
}
d := time.Duration(timeout) * time.Second
timer := time.AfterFunc(d, func() {
// 超时kill进程
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
})
output ,err := cmd.CombinedOutput()
timer.Stop()
return string(output), err
return "", nil
}

View File

@ -4,41 +4,48 @@ package utils
import (
"syscall"
"time"
"os/exec"
"strconv"
"golang.org/x/net/context"
"errors"
)
// 执行shell命令可设置执行超时时间
func ExecShellWithTimeout(timeout int, command string) (string, error) {
cmd := exec.Command("cmd", "/C", command)
// 隐藏cmd窗口
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
// 不限制超时
if timeout <= 0 {
output ,err := cmd.CombinedOutput()
return ConvertEncoding(string(output), err)
}
d := time.Duration(timeout) * time.Second
timer := time.AfterFunc(d, func() {
// 超时kill进程
exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(cmd.Process.Pid)).Run()
cmd.Process.Kill()
})
output ,err := cmd.CombinedOutput()
timer.Stop()
return ConvertEncoding(string(output), err)
type Result struct {
output string
err error
}
func ConvertEncoding(outputGBK string, err error) (string, error) {
// 执行shell命令可设置执行超时时间
func ExecShell(ctx context.Context, command string) (string, error) {
cmd := exec.Command("cmd", "/C", command)
// 隐藏cmd窗口
cmd.SysProcAttr = &syscall.SysProcAttr{
HideWindow: true,
}
var resultChan chan Result = make(chan Result)
go func() {
output ,err := cmd.CombinedOutput()
resultChan <- Result{string(output), err}
}()
select {
case <- ctx.Done():
exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(cmd.Process.Pid)).Run()
cmd.Process.Kill()
return "", errors.New("timeout killed")
case result := <- resultChan:
return ConvertEncoding(result.output), result.err
}
return "", nil
}
func ConvertEncoding(outputGBK string) (string) {
// windows平台编码为gbk需转换为utf8才能入库
outputUTF8, ok := GBK2UTF8(outputGBK)
if ok {
return outputUTF8, err
return outputUTF8
}
return "命令输出转换编码失败(gbk to utf8)", err
return "命令输出转换编码失败(gbk to utf8)"
}

View File

@ -41,7 +41,6 @@
<option value="2" {{{if eq .Params.Status 1}}}selected{{{end}}}></option>
<option value="3" {{{if eq .Params.Status 2}}}selected{{{end}}}></option>
<option value="4" {{{if eq .Params.Status 3}}}selected{{{end}}}></option>
<option value="5" {{{if eq .Params.Status 4}}}selected{{{end}}}></option>
</select>
</div>
<div class="field">
@ -92,8 +91,6 @@
<span style="color:red"></span>
{{{else if eq .Status 3}}}
<span style="color:#4499EE"></span>
{{{else if eq .Status 4}}}
<span style="color:#43A102"></span>
{{{end}}}
</td>
<td>

View File

@ -47,9 +47,6 @@
<div class="three fields" id="hostField">
<div class="field">
<label></label>
<div class="ui blue message">
<pre>RPC</pre>
</div>
<select name="host_id" id="hostId">
<option value=""></option>
{{{range $i, $v := .Hosts}}}
@ -337,6 +334,24 @@
}
]
},
timeout: {
identifier : 'timeout',
rules: [
{
type : 'integer[0..86400]',
prompt : '0-86400'
}
]
},
retryTimes: {
identifier : 'retry_times',
rules: [
{
type : 'integer[0..10]',
prompt : '0-10'
}
]
},
remark: {
identifier : 'remark',
rules: [