新增RPC任务执行, 删除SSH任务

pull/21/merge
ouqiang 2017-05-26 18:09:07 +08:00
parent 604384a5f4
commit ccad05e50f
27 changed files with 223 additions and 441 deletions

View File

@ -71,5 +71,5 @@
* UI框架 [Semantic UI](https://semantic-ui.com/)
* 依赖管理(所有依赖包放入vendor目录) [govendor](https://github.com/kardianos/govendor)
## 贡献
欢迎提交PR
## 反馈
提交[issue](https://github.com/ouqiang/gocron/issues/new)

View File

@ -52,11 +52,11 @@ if [[ $ARCH != '386' && $ARCH != 'amd64' ]];then
exit 1
fi
echo '开始编译'
echo '开始编译调度中心'
if [[ $OS = 'windows' ]];then
GOOS=$OS GOARCH=$ARCH go build -ldflags '-w -H windowsgui'
GOOS=$OS GOARCH=$ARCH go build -tags gocron -ldflags '-w -H windowsgui'
else
GOOS=$OS GOARCH=$ARCH go build -ldflags '-w'
GOOS=$OS GOARCH=$ARCH go build -tags gocron -ldflags '-w'
fi
if [[ $? != 0 ]];then

83
build_node.sh Normal file
View File

@ -0,0 +1,83 @@
#!/usr/bin/env bash
# set -x -u
# 任务节点打包, 生成压缩包 gocron-node.zip或gocron-node.tar.gz
# ./build-node.sh -p windows -a amd64
# 参数含义
# -p 指定平台(windows|linux|darwin)
# -a 指定体系架构(amd64|386), 默认amd64
# 目标平台 windows,linux,darwin
OS=''
# 目标平台架构
ARCH=''
# 应用名称
APP_NAME='gocron-node'
# 可执行文件名
EXEC_NAME=''
# 压缩包名称
COMPRESS_FILE=''
# -p 平台 -a 架构
while getopts "p:a:" OPT;
do
case $OPT in
p) OS=$OPTARG
;;
a) ARCH=$OPTARG
;;
esac
done
if [[ -z $OS ]];then
echo "平台不能为空"
exit 1
fi
if [[ $OS != 'windows' && $OS != 'linux' && $OS != 'darwin' ]];then
echo '平台错误,支持的平台 windows linux darmin(osx)'
exit 1
fi
if [[ -z $ARCH ]];then
ARCH='amd64'
fi
if [[ $ARCH != '386' && $ARCH != 'amd64' ]];then
echo 'arch错误仅支持 386 amd64'
exit 1
fi
if [[ $OS = 'windows' ]];then
EXEC_NAME=${APP_NAME}.exe
COMPRESS_FILE=${APP_NAME}-${OS}-${ARCH}.zip
else
EXEC_NAME=${APP_NAME}
COMPRESS_FILE=${APP_NAME}-${OS}-${ARCH}.tar.gz
fi
echo '开始编译任务节点'
if [[ $OS = 'windows' ]];then
GOOS=$OS GOARCH=$ARCH go build -tags node -ldflags '-w -H windowsgui' -o $EXEC_NAME
else
GOOS=$OS GOARCH=$ARCH go build -tags node -ldflags '-w' -o $EXEC_NAME
fi
if [[ $? != 0 ]];then
exit 1
fi
echo '编译完成'
if [[ $OS = 'windows' ]];then
zip -rq $COMPRESS_FILE $EXEC_NAME
else
tar czf $COMPRESS_FILE $EXEC_NAME
fi
rm $EXEC_NAME
echo '打包完成'
echo '生成压缩文件--' $COMPRESS_FILE

20
gocron-node.go Normal file
View File

@ -0,0 +1,20 @@
// +build node
// 任务节点
package main
import (
"github.com/ouqiang/gocron/modules/rpc/server"
"os"
"fmt"
)
func main() {
var addr string
if (len(os.Args) < 2) {
fmt.Println("usage ./gocron-node addr:port")
os.Exit(1)
}
addr = os.Args[1]
server.Start(addr)
}

View File

@ -1,10 +1,7 @@
package main
// +build gocron
// 调度中心
/*--------------------------------------------------------
Linux crontab
HTTPSSH
--------------------------------------------------------*/
package main
import (
"github.com/urfave/cli"

View File

@ -1,14 +1,7 @@
package models
import (
"github.com/ouqiang/gocron/modules/ssh"
"github.com/go-xorm/xorm"
"github.com/ouqiang/gocron/modules/app"
"github.com/ouqiang/gocron/modules/utils"
"errors"
"io/ioutil"
"strings"
"github.com/ouqiang/gocron/modules/logger"
)
// 主机
@ -16,10 +9,8 @@ type Host struct {
Id int16 `xorm:"smallint pk autoincr"`
Name string `xorm:"varchar(64) notnull"` // 主机名称
Alias string `xorm:"varchar(32) notnull default '' "` // 主机别名
Username string `xorm:"varchar(32) notnull default '' "` // ssh 用户名
Port int `xorm:"notnull default 22"` // 主机端口
Remark string `xorm:"varchar(100) notnull default '' "` // 备注
AuthType ssh.HostAuthType `xorm:"tinyint notnull default 1"` // 认证方式 1: 密码 2: 公钥
BaseModel `xorm:"-"`
}
@ -34,7 +25,7 @@ func (host *Host) Create() (insertId int16, err error) {
}
func (host *Host) UpdateBean(id int16) (int64, error) {
return Db.ID(id).Cols("name,alias,username,port,remark,auth_type").Update(host)
return Db.ID(id).Cols("name,alias,port,remark").Update(host)
}
@ -76,7 +67,7 @@ func (host *Host) List(params CommonMap) ([]Host, error) {
func (host *Host) AllList() ([]Host, error) {
list := make([]Host, 0)
err := Db.Desc("id").Find(&list)
err := Db.Cols("name,port").Desc("id").Find(&list)
return list, err
}
@ -87,38 +78,6 @@ func (host *Host) Total(params CommonMap) (int64, error) {
return session.Count(host)
}
func (h *Host) GetPasswordByHost(host string) (string, error) {
path := app.DataDir + "/ssh/password/" + host
return h.readFile(path)
}
func (h *Host) GetPrivateKeyByHost(host string) (string,error) {
path := app.DataDir + "/ssh/private_key/" + host
return h.readFile(path)
}
func (host *Host) readFile(file string) (string, error) {
logger.Debug("认证文件路径: ", file)
if !utils.FileExist(file) {
return "", errors.New(file + "-认证文件不存在或无权限访问")
}
contentByte, err := ioutil.ReadFile(file)
if err != nil {
return "", err
}
content := string(contentByte)
content = strings.TrimSpace(content)
if content == "" {
return "", errors.New("密码为空")
}
return content, nil
}
// 解析where
func (host *Host) parseWhere(session *xorm.Session, params CommonMap) {
if len(params) == 0 {

View File

@ -2,17 +2,14 @@ package models
import (
"time"
"github.com/ouqiang/gocron/modules/ssh"
"github.com/go-xorm/xorm"
"github.com/ouqiang/gocron/modules/utils"
)
type TaskProtocol int8
const (
TaskHTTP TaskProtocol = iota + 1 // HTTP协议
TaskSSH // SSH命令
TaskLocalCommand // 本地命令
TaskRPC // RPC方式执行命令
)
// 任务
@ -20,12 +17,12 @@ type Task struct {
Id int `xorm:"int pk autoincr"`
Name string `xorm:"varchar(32) notnull"` // 任务名称
Spec string `xorm:"varchar(64) notnull"` // crontab
Protocol TaskProtocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh-command 3: 系统命令
Protocol TaskProtocol `xorm:"tinyint notnull"` // 协议 1:http 2:系统命令
Command string `xorm:"varchar(256) notnull"` // URL地址或shell命令
Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制
Multi int8 `xorm:"tinyint notnull default 1"` // 是否允许多实例运行
RetryTimes int8 `xorm:"tinyint notnull default 0"` // 重试次数
HostId int16 `xorm:"smallint notnull default 0"` // SSH host id
HostId int16 `xorm:"smallint notnull default 0"` // RPC host id
NotifyStatus int8 `xorm:"smallint notnull default 1"` // 任务执行结束是否通知 0: 不通知 1: 失败通知 2: 执行结束通知
NotifyType int8 `xorm:"smallint notnull default 0"` // 通知类型 1: 邮件 2: slack
NotifyReceiverId string `xorm:"varchar(256) notnull default '' "` // 通知接受者ID, setting表主键ID多个ID逗号分隔
@ -40,9 +37,7 @@ type TaskHost struct {
Task `xorm:"extends"`
Name string
Port int
Username string
Alias string
AuthType ssh.HostAuthType
}
func (TaskHost) TableName() string {
@ -69,19 +64,6 @@ func (task *Task) CreateTestTask() {
task.Command = "http://ip.taobao.com/service/getIpInfo.php?ip=117.27.140.253"
task.Status = Enabled
task.Create()
// 系统命令
task.Id = 0
task.Name = "测试系统命令任务"
task.Protocol = TaskLocalCommand
task.Spec = "@every 1m"
task.Status = Enabled
if utils.IsWindows() {
task.Command = "dir"
} else {
task.Command = "ls"
}
task.Create()
}
func (task *Task) UpdateBean(id int) (int64, error) {
@ -111,7 +93,7 @@ func (task *Task) Enable(id int) (int64, error) {
// 获取所有激活任务
func (task *Task) ActiveList() ([]TaskHost, error) {
list := make([]TaskHost, 0)
fields := "t.*, host.alias,host.name,host.username,host.port,host.auth_type"
fields := "t.*, host.alias,host.name,host.port"
err := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id").Where("t.status = ?", Enabled).Cols(fields).Find(&list)
return list, err
@ -120,7 +102,7 @@ func (task *Task) ActiveList() ([]TaskHost, error) {
// 获取某个主机下的所有激活任务
func (task *Task) ActiveListByHostId(hostId int16) ([]TaskHost, error) {
list := make([]TaskHost, 0)
fields := "t.*, host.alias,host.name,host.username,host.port,host.auth_type"
fields := "t.*, host.alias,host.name,host.port"
err := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id").Where("t.status = ? AND t.host_id = ?", Enabled, hostId).Cols(fields).Find(&list)
return list, err
@ -146,7 +128,7 @@ func (task *Task) NameExist(name string, id int) (bool, error) {
func(task *Task) Detail(id int) (TaskHost, error) {
taskHost := TaskHost{}
fields := "t.*, host.alias,host.name,host.username,host.port,host.auth_type"
fields := "t.*, host.alias,host.name,host.port"
_, err := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id").Where("t.id=?", id).Cols(fields).Get(&taskHost)
return taskHost, err

View File

@ -14,15 +14,14 @@ type TaskLog struct {
TaskId int `xorm:"int notnull index default 0"` // 任务id
Name string `xorm:"varchar(32) notnull"` // 任务名称
Spec string `xorm:"varchar(64) notnull"` // crontab
Protocol TaskProtocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh-command 3:系统命令
Protocol TaskProtocol `xorm:"tinyint notnull"` // 协议 1:http 2:RPC
Command string `xorm:"varchar(256) notnull"` // URL地址或shell命令
Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制
RetryTimes int8 `xorm:"tinyint notnull default 0"` // 任务重试次数
Hostname string `xorm:"varchar(128) notnull defalut '' "` // SSH主机名,逗号分隔
Hostname string `xorm:"varchar(128) notnull defalut '' "` // RPC主机名,逗号分隔
StartTime time.Time `xorm:"datetime created"` // 开始执行时间
EndTime time.Time `xorm:"datetime updated"` // 执行完成(失败)时间
Status Status `xorm:"tinyint notnull default 1"` // 状态 0:执行失败 1:执行中 2:执行完毕 3:任务取消(上次任务未执行完成) 4:异步执行
NotifyId string `xorm:"varchar(32) notnull default '' "` // 回调通知ID
Result string `xorm:"mediumtext notnull defalut '' "` // 执行结果
TotalTime int `xorm:"-"` // 执行总时长
BaseModel `xorm:"-"`
@ -42,16 +41,6 @@ func (taskLog *TaskLog) Update(id int64, data CommonMap) (int64, error) {
return Db.Table(taskLog).ID(id).Update(data)
}
func (taskLog *TaskLog) UpdateStatus(notifyId string, status Status, result string) (int64, error) {
taskLog.Status = status
taskLog.Result = result
return Db.Cols("status,result").Where("notify_id = ?", notifyId).Update(taskLog)
}
func (taskLog *TaskLog) setStatus(id int64, status Status) (int64, error) {
return taskLog.Update(id, CommonMap{"status": status})
}
func (taskLog *TaskLog) List(params CommonMap) ([]TaskLog, error) {
taskLog.parsePageAndPageSize(params)
list := make([]TaskLog, 0)
@ -72,17 +61,6 @@ func (taskLog *TaskLog) List(params CommonMap) ([]TaskLog, error) {
return list, err
}
// 根据通知ID获取任务ID
func (taskLog *TaskLog) GetTaskIdByNotifyId(notifyId string) (taskId int, err error) {
exist, err := Db.Where("notify_id = ?", notifyId).Get(taskLog)
if !exist || err != nil {
return
}
taskId = taskLog.TaskId
return
}
// 清空表
func (taskLog *TaskLog) Clear() (int64, error) {
return Db.Where("1=1").Delete(taskLog);

View File

@ -2,22 +2,24 @@ package client
import (
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
pb "github.com/ouqiang/gocron/modules/rpc/proto"
"golang.org/x/net/context"
"fmt"
)
func Start() {
conn, err := grpc.Dial("127.0.0.1:50000", grpc.WithInsecure())
func Exec(ip string, port int, taskReq *pb.TaskRequest) (output string, err error) {
addr := fmt.Sprintf("%s:%d", ip, port);
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
grpclog.Fatal(err)
return
}
defer conn.Close()
c := pb.NewTaskClient(conn)
req := new(pb.TaskRequest)
resp, err := c.Run(context.Background(), req)
resp, err := c.Run(context.Background(), taskReq)
if err != nil {
grpclog.Fatal(err)
return
}
grpclog.Println(resp.Name)
output = resp.Output
return
}

View File

@ -1,6 +1,5 @@
// Code generated by protoc-gen-go.
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: task.proto
// DO NOT EDIT!
/*
Package rpc is a generated protocol buffer package.
@ -35,7 +34,8 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type TaskRequest struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Command string `protobuf:"bytes,2,opt,name=command" json:"command,omitempty"`
Timeout int32 `protobuf:"varint,3,opt,name=timeout" json:"timeout,omitempty"`
}
func (m *TaskRequest) Reset() { *m = TaskRequest{} }
@ -43,15 +43,22 @@ func (m *TaskRequest) String() string { return proto.CompactTextStrin
func (*TaskRequest) ProtoMessage() {}
func (*TaskRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *TaskRequest) GetName() string {
func (m *TaskRequest) GetCommand() string {
if m != nil {
return m.Name
return m.Command
}
return ""
}
func (m *TaskRequest) GetTimeout() int32 {
if m != nil {
return m.Timeout
}
return 0
}
type TaskResponse struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Output string `protobuf:"bytes,1,opt,name=output" json:"output,omitempty"`
}
func (m *TaskResponse) Reset() { *m = TaskResponse{} }
@ -59,9 +66,9 @@ func (m *TaskResponse) String() string { return proto.CompactTextStri
func (*TaskResponse) ProtoMessage() {}
func (*TaskResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *TaskResponse) GetName() string {
func (m *TaskResponse) GetOutput() string {
if m != nil {
return m.Name
return m.Output
}
return ""
}
@ -146,13 +153,15 @@ var _Task_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("task.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 123 bytes of a gzipped FileDescriptorProto
// 157 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2c, 0xce,
0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0x52, 0xe4, 0xe2, 0x0e,
0x49, 0x2c, 0xce, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x12, 0xe2, 0x62, 0xc9, 0x4b,
0xcc, 0x4d, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x02, 0xb3, 0x95, 0x94, 0xb8, 0x78, 0x20,
0x4a, 0x8a, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0xb1, 0xa9, 0x31, 0x32, 0xe1, 0x62, 0x01, 0xa9, 0x11,
0xd2, 0xe1, 0x62, 0x0e, 0x2a, 0xcd, 0x13, 0x12, 0xd0, 0x2b, 0x2a, 0x48, 0xd6, 0x43, 0x32, 0x58,
0x4a, 0x10, 0x49, 0x04, 0x62, 0x8e, 0x12, 0x43, 0x12, 0x1b, 0xd8, 0x21, 0xc6, 0x80, 0x00, 0x00,
0x00, 0xff, 0xff, 0xf0, 0x04, 0x2a, 0x14, 0x96, 0x00, 0x00, 0x00,
0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0x72, 0xe4, 0xe2, 0x0e,
0x49, 0x2c, 0xce, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x92, 0xe0, 0x62, 0x4f, 0xce,
0xcf, 0xcd, 0x4d, 0xcc, 0x4b, 0x91, 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x71, 0x41, 0x32,
0x25, 0x99, 0xb9, 0xa9, 0xf9, 0xa5, 0x25, 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x30, 0xae,
0x92, 0x1a, 0x17, 0x0f, 0xc4, 0x88, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x21, 0x31, 0x2e, 0xb6,
0xfc, 0xd2, 0x92, 0x82, 0xd2, 0x12, 0x09, 0x46, 0xb0, 0x11, 0x50, 0x9e, 0x91, 0x09, 0x17, 0x0b,
0x48, 0x9d, 0x90, 0x0e, 0x17, 0x73, 0x50, 0x69, 0x9e, 0x90, 0x80, 0x5e, 0x51, 0x41, 0xb2, 0x1e,
0x92, 0xe5, 0x52, 0x82, 0x48, 0x22, 0x10, 0xb3, 0x94, 0x18, 0x92, 0xd8, 0xc0, 0x8e, 0x35, 0x06,
0x04, 0x00, 0x00, 0xff, 0xff, 0x82, 0x08, 0x5d, 0x10, 0xba, 0x00, 0x00, 0x00,
}

View File

@ -7,9 +7,10 @@ service Task {
}
message TaskRequest {
string name = 1;
string command = 2; //
int32 timeout = 3; //
}
message TaskResponse {
string name = 1;
string output = 1; //
}

View File

@ -6,25 +6,27 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc"
pb "github.com/ouqiang/gocron/modules/rpc/proto"
"github.com/ouqiang/gocron/modules/utils"
)
type Server struct {}
func (s Server) Run(ctx context.Context, req *pb.TaskRequest) (*pb.TaskResponse, error) {
output, err := utils.ExecShellWithTimeout(int(req.Timeout), req.Command)
resp := new(pb.TaskResponse)
resp.Name = "gRPC"
resp.Output = output
return resp, nil
return resp, err
}
func Start() {
l, err := net.Listen("tcp", "127.0.0.1:50000")
func Start(addr string) {
l, err := net.Listen("tcp", addr)
if err != nil {
grpclog.Fatal(err)
}
s := grpc.NewServer()
pb.RegisterTaskServer(s, Server{})
grpclog.Println("listen address ", "127.0.0.1:50000")
grpclog.Println("listen address ", addr)
s.Serve(l)
}

View File

@ -6,21 +6,15 @@ import (
"os/exec"
"syscall"
"time"
"fmt"
)
// 执行shell命令可设置执行超时时间
func ExecShellWithTimeout(timeout int, command string, args... string) (string, error) {
cmd := exec.Command(command, args...)
func ExecShellWithTimeout(timeout int, command string) (string, error) {
cmd := exec.Command("/bin/bash", "-c", command)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
// 后台运行
if timeout == -1 {
go cmd.CombinedOutput()
return "", nil
}
// 不限制超时
if timeout == 0 {
output ,err := cmd.CombinedOutput()
@ -36,9 +30,4 @@ func ExecShellWithTimeout(timeout int, command string, args... string) (string,
timer.Stop()
return string(output), err
}
// 格式化环境变量
func FormatEnv(key, value string) string {
return fmt.Sprintf("export %s=%s;", key, value)
}

View File

@ -7,23 +7,18 @@ import (
"time"
"os/exec"
"strconv"
"fmt"
)
// 执行shell命令可设置执行超时时间
func ExecShellWithTimeout(timeout int, command string, args... string) (string, error) {
cmd := exec.Command(command, args...)
func ExecShellWithTimeout(timeout int, command string) (string, error) {
cmd := exec.Command("cmd", "/C", command)
// 隐藏cmd窗口
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
// 后台运行
if timeout == -1 {
go cmd.CombinedOutput()
return "", nil
}
// 不限制超时
if timeout <= 0 {
output ,err := cmd.CombinedOutput()
return string(output), err
return ConvertEncoding(string(output), err)
}
d := time.Duration(timeout) * time.Second
@ -35,10 +30,15 @@ func ExecShellWithTimeout(timeout int, command string, args... string) (string,
output ,err := cmd.CombinedOutput()
timer.Stop()
return string(output), err
return ConvertEncoding(string(output), err)
}
// 格式化环境变量
func FormatEnv(key, value string) string {
return fmt.Sprintf("set %s=%s & ", key, value)
func ConvertEncoding(outputGBK string, err error) (string, error) {
// windows平台编码为gbk需转换为utf8才能入库
outputUTF8, ok := GBK2UTF8(outputGBK)
if ok {
return outputUTF8, err
}
return "命令输出转换编码失败(gbk to utf8)", err
}

View File

@ -6,7 +6,6 @@ import (
"github.com/ouqiang/gocron/modules/utils"
"github.com/ouqiang/gocron/modules/logger"
"strconv"
"github.com/ouqiang/gocron/modules/ssh"
"github.com/ouqiang/gocron/service"
"github.com/Unknwon/paginater"
"fmt"
@ -56,52 +55,11 @@ func Edit(ctx *macaron.Context) {
ctx.HTML(200, "host/host_form")
}
func Ping(ctx *macaron.Context) string {
id := ctx.ParamsInt(":id")
hostModel := new(models.Host)
err := hostModel.Find(id)
json := utils.JsonResponse{}
if err != nil || hostModel.Id <= 0{
return json.CommonFailure("主机不存在", err)
}
sshConfig := ssh.SSHConfig{}
sshConfig.User = hostModel.Username
sshConfig.Host = hostModel.Name
sshConfig.Port = hostModel.Port
sshConfig.ExecTimeout = 5
sshConfig.AuthType = hostModel.AuthType
var password string
var privateKey string
if hostModel.AuthType == ssh.HostPassword {
password, err = hostModel.GetPasswordByHost(hostModel.Name)
if err != nil {
return json.CommonFailure(err.Error(), err)
}
sshConfig.Password = password
} else {
privateKey, err = hostModel.GetPrivateKeyByHost(hostModel.Name)
if err != nil {
return json.CommonFailure(err.Error(), err)
}
sshConfig.PrivateKey = privateKey
}
_, err = ssh.Exec(sshConfig, "pwd")
if err != nil {
return json.CommonFailure("连接失败-" + err.Error(), err)
}
return json.Success("连接成功", nil)
}
type HostForm struct {
Id int16
Name string `binding:"Required;MaxSize(64)"`
Alias string `binding:"Required;MaxSize(32)"`
Username string `binding:"Required;MaxSize(32)"`
Port int `binding:"Required;Range(1-65535)"`
AuthType ssh.HostAuthType `binding:"Required:Range(1,2)"`
Remark string
}
@ -129,10 +87,8 @@ func Store(ctx *macaron.Context, form HostForm) string {
hostModel.Name = form.Name
hostModel.Alias = form.Alias
hostModel.Username = form.Username
hostModel.Port = form.Port
hostModel.Remark = form.Remark
hostModel.AuthType = form.AuthType
isCreate := false
if id > 0 {
_, err = hostModel.UpdateBean(id)

View File

@ -69,7 +69,6 @@ func Register(m *macaron.Macaron) {
m.Group("/host", func() {
m.Get("/create", host.Create)
m.Get("/edit/:id", host.Edit)
m.Get("/ping/:id", host.Ping)
m.Post("/store", binding.Bind(host.HostForm{}), host.Store)
m.Get("", host.Index)
m.Post("/remove/:id", host.Remove)
@ -97,7 +96,6 @@ func Register(m *macaron.Macaron) {
// API
m.Group("/api/v1", func() {
m.Route("/tasklog/update-status", "GET,POST", tasklog.UpdateStatus)
m.Post("/tasklog/remove/:id", tasklog.Remove)
m.Post("/delaytask/push", delaytask.Create)
m.Post("/delaytask/log/remove/:id", delaytask.Remove)

View File

@ -20,9 +20,9 @@ type TaskForm struct {
Id int
Name string `binding:"Required;MaxSize(32)"`
Spec string `binding:"Required;MaxSize(64)"`
Protocol models.TaskProtocol `binding:"In(1,2,3)"`
Protocol models.TaskProtocol `binding:"In(1,2)"`
Command string `binding:"Required;MaxSize(256)"`
Timeout int `binding:"Range(-1,86400)"`
Timeout int `binding:"Range(0,86400)"`
Multi int8 `binding:"In(1,2)"`
RetryTimes int8
HostId int16
@ -111,11 +111,11 @@ func Store(ctx *macaron.Context, form TaskForm) string {
return json.CommonFailure("任务名称已存在")
}
if form.Protocol == models.TaskSSH && form.HostId <= 0 {
if form.Protocol == models.TaskRPC && form.HostId <= 0 {
return json.CommonFailure("请选择主机名")
}
if form.Protocol != models.TaskHTTP {
if form.Protocol == models.TaskRPC {
taskModel.HostId = form.HostId
} else {
taskModel.HostId = 0
@ -142,9 +142,6 @@ func Store(ctx *macaron.Context, form TaskForm) string {
if !strings.HasPrefix(command, "http://") && !strings.HasPrefix(command, "https://") {
return json.CommonFailure("请输入正确的URL地址")
}
if taskModel.Timeout == -1 {
return json.CommonFailure("HTTP任务不支持后台运行")
}
if taskModel.Timeout > 300 {
return json.CommonFailure("HTTP任务超时时间不能超过300秒")
}
@ -155,10 +152,6 @@ func Store(ctx *macaron.Context, form TaskForm) string {
}
if taskModel.Protocol != models.TaskSSH {
taskModel.HostId = 0
}
if id == 0 {
id, err = taskModel.Create()
} else {

View File

@ -11,8 +11,6 @@ import (
"fmt"
"html/template"
"github.com/ouqiang/gocron/routers/base"
"github.com/ouqiang/gocron/service"
"errors"
)
func Index(ctx *macaron.Context) {
@ -66,51 +64,6 @@ func Remove(ctx *macaron.Context) string {
return json.Success("删除成功", nil)
}
// 更新任务状态
func UpdateStatus(ctx *macaron.Context) string {
id := ctx.QueryTrim("id")
status := ctx.QueryInt("status")
result := ctx.QueryTrim("result")
json := utils.JsonResponse{}
if id == "" {
return json.CommonFailure("任务ID不能为空")
}
if status != 1 && status != 2 {
return json.CommonFailure("status值错误")
}
if status == 1 {
status -= 1
}
taskLogModel := new(models.TaskLog)
affectRows, err := taskLogModel.UpdateStatus(id, models.Status(status), result)
if err != nil || affectRows == 0 {
return json.CommonFailure("更新任务状态失败")
}
// 发送通知
taskId, err := taskLogModel.GetTaskIdByNotifyId(id)
if err != nil || taskId <= 0 {
logger.Error("异步任务回调#根据notify-id获取taskId失败", err)
return json.Success("success", nil)
}
taskModel := new(models.Task)
task, err := taskModel.Detail(taskId)
if err != nil || task.Id <= 0 {
logger.Error("异步任务回调#根据获取任务详情失败", err)
return json.Success("success", nil)
}
taskResult := service.TaskResult{}
taskResult.Result = result
if status == 0 {
taskResult.Err = errors.New("error")
}
service.SendNotification(task, taskResult)
return json.Success("success", nil)
}
// 解析查询参数
func parseQueryParams(ctx *macaron.Context) (models.CommonMap) {
var params models.CommonMap = models.CommonMap{}

View File

@ -5,14 +5,14 @@ import (
"strconv"
"time"
"github.com/ouqiang/gocron/modules/logger"
"github.com/ouqiang/gocron/modules/ssh"
"github.com/jakecoffman/cron"
"github.com/ouqiang/gocron/modules/utils"
"errors"
"fmt"
"github.com/ouqiang/gocron/modules/httpclient"
"github.com/ouqiang/gocron/modules/notify"
"sync"
rpcClient "github.com/ouqiang/gocron/modules/rpc/client"
pb "github.com/ouqiang/gocron/modules/rpc/proto"
)
// 定时任务调度管理器
@ -76,7 +76,6 @@ type TaskResult struct {
Result string
Err error
RetryTimes int8
IsAsync bool
}
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
@ -137,38 +136,6 @@ type Handler interface {
Run(taskModel models.TaskHost) (string, error)
}
// 本地命令
type LocalCommandHandler struct {}
// 运行本地命令
func (h *LocalCommandHandler) Run(taskModel models.TaskHost) (string, error) {
if taskModel.Command == "" {
return "", errors.New("invalid command")
}
if utils.IsWindows() {
return h.runOnWindows(taskModel)
}
return h.runOnUnix(taskModel)
}
// 执行Windows命令
func (h *LocalCommandHandler) runOnWindows(taskModel models.TaskHost) (string, error) {
outputGBK, err := utils.ExecShellWithTimeout(taskModel.Timeout, "cmd", "/C", taskModel.Command)
// windows平台编码为gbk需转换为utf8才能入库
outputUTF8, ok := utils.GBK2UTF8(outputGBK)
if ok {
return outputUTF8, err
}
return "命令输出转换编码失败(gbk to utf8)", err
}
// 执行Unix命令
func (h *LocalCommandHandler) runOnUnix(taskModel models.TaskHost) (string, error) {
return utils.ExecShellWithTimeout(taskModel.Timeout, "/bin/bash", "-c", taskModel.Command)
}
// HTTP任务
type HTTPHandler struct{}
@ -189,42 +156,20 @@ func (h *HTTPHandler) Run(taskModel models.TaskHost) (result string, err error)
return resp.Body, err
}
// SSH-command任务
type SSHCommandHandler struct{}
// RPC调用执行任务
type RPCHandler struct {}
func (h *SSHCommandHandler) Run(taskModel models.TaskHost) (string, error) {
hostModel := new(models.Host)
err := hostModel.Find(int(taskModel.HostId))
if err != nil {
return "", err
}
sshConfig := ssh.SSHConfig{}
sshConfig.User = hostModel.Username
sshConfig.Host = hostModel.Name
sshConfig.Port = hostModel.Port
sshConfig.ExecTimeout = taskModel.Timeout
sshConfig.AuthType = hostModel.AuthType
var password string
var privateKey string
if hostModel.AuthType == ssh.HostPassword {
password, err = hostModel.GetPasswordByHost(hostModel.Name)
if err != nil {
return "", err
}
sshConfig.Password = password
} else {
privateKey, err = hostModel.GetPrivateKeyByHost(hostModel.Name)
if err != nil {
return "", err
}
sshConfig.PrivateKey = privateKey
}
func (h *RPCHandler) Run(taskModel models.TaskHost) (result string, err error) {
taskRequest := new(pb.TaskRequest)
taskRequest.Timeout = int32(taskModel.Timeout)
taskRequest.Command = taskModel.Command
return ssh.Exec(sshConfig, taskModel.Command)
return rpcClient.Exec(taskModel.Name, taskModel.Port, taskRequest)
}
// 创建任务日志
func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, string, error) {
func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, error) {
taskLogModel := new(models.TaskLog)
taskLogModel.TaskId = taskModel.Id
taskLogModel.Name = taskModel.Task.Name
@ -232,20 +177,14 @@ func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, stri
taskLogModel.Protocol = taskModel.Protocol
taskLogModel.Command = taskModel.Command
taskLogModel.Timeout = taskModel.Timeout
if taskModel.Protocol == models.TaskSSH {
if taskModel.Protocol == models.TaskRPC {
taskLogModel.Hostname = taskModel.Alias + "-" + taskModel.Name
}
taskLogModel.StartTime = time.Now()
taskLogModel.Status = status
// SSH执行远程命令后台运行
var notifyId string = ""
if taskModel.Timeout == -1 {
notifyId = utils.RandString(32);
taskLogModel.NotifyId = notifyId;
}
insertId, err := taskLogModel.Create()
return insertId, notifyId, err
return insertId, err
}
// 更新任务日志
@ -255,9 +194,7 @@ func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) {
var result string = taskResult.Result
if taskResult.Err != nil {
status = models.Failure
} else if taskResult.IsAsync {
status = models.Background
} else {
} else {
status = models.Finish
}
return taskLogModel.Update(taskLogId, models.CommonMap{
@ -276,7 +213,7 @@ func createJob(taskModel models.TaskHost) cron.FuncJob {
taskFunc := func() {
TaskNum.Add()
defer TaskNum.Done()
taskLogId := beforeExecJob(&taskModel)
taskLogId := beforeExecJob(taskModel)
if taskLogId <= 0 {
return
}
@ -294,37 +231,26 @@ func createHandler(taskModel models.TaskHost) Handler {
switch taskModel.Protocol {
case models.TaskHTTP:
handler = new(HTTPHandler)
case models.TaskSSH:
handler = new(SSHCommandHandler)
case models.TaskLocalCommand:
handler = new(LocalCommandHandler)
case models.TaskRPC:
handler = new(RPCHandler)
}
return handler;
}
func beforeExecJob(taskModel *models.TaskHost) (taskLogId int64) {
func beforeExecJob(taskModel models.TaskHost) (taskLogId int64) {
if taskModel.Multi == 0 && runInstance.has(taskModel.Id) {
createTaskLog(*taskModel, models.Cancel)
createTaskLog(taskModel, models.Cancel)
return
}
if taskModel.Multi == 0 {
runInstance.add(taskModel.Id)
}
taskLogId, notifyId, err := createTaskLog(*taskModel, models.Running)
taskLogId, err := createTaskLog(taskModel, models.Running)
if err != nil {
logger.Error("任务开始执行#写入任务日志失败-", err)
return
}
// 设置notifyId到环境变量中
if notifyId != "" {
envName := "GOCRON_TASK_ID"
if taskModel.Protocol == models.TaskSSH {
taskModel.Command = fmt.Sprintf("%s%s", utils.FormatUnixEnv(envName, notifyId), taskModel.Command)
} else {
taskModel.Command = fmt.Sprintf("%s%s", utils.FormatEnv(envName, notifyId), taskModel.Command)
}
}
logger.Debugf("任务命令-%s", taskModel.Command)
@ -335,16 +261,10 @@ func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId in
if taskResult.Err != nil {
taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result
}
if taskModel.Timeout == -1 {
taskResult.IsAsync = true
}
_, err := updateTaskLog(taskLogId, taskResult)
if err != nil {
logger.Error("任务结束#更新任务日志失败-", err)
}
if taskResult.IsAsync {
return
}
SendNotification(taskModel, taskResult)
}

View File

@ -17,9 +17,15 @@
<input type="hidden" name="id" value="{{{.Host.Id}}}">
<div class="four fields">
<div class="field">
<label> (IP)</label>
<label></label>
<div class="ui small input">
<input type="text" name="name" value="{{{.Host.Name}}}">
<input type="text" name="name" value="{{{.Host.Name}}}" placeholder="192.168.50.154">
</div>
</div>
<div class="field">
<label></label>
<div class="ui small input">
<input type="text" name="port" value="{{{.Host.Port}}}" placeholder="5921">
</div>
</div>
<div class="field">
@ -29,35 +35,6 @@
</div>
</div>
</div>
<div class="four fields">
<div class="field">
<label>SSH</label>
<div class="ui small input">
<input type="text" name="username" value="{{{.Host.Username}}}">
</div>
</div>
<div class="field">
<label>SSH</label>
<div class="ui small input">
<input type="text" name="port" value="{{{.Host.Port}}}">
</div>
</div>
</div>
<div class="fields">
<div class="field">
<div class="ui blue message">
: gocron/data/ssh/password/<br>
: echo '12345678' > data/ssh/password/127.0.0.1<br><br>
: gocron/data/ssh/private_key/<br>
: cp ~/.ssh/id_rsa data/ssh/private_key/127.0.0.1
</div>
<label></label>
<select name="auth_type" id="authType">
<option value="2" {{{if .Host}}} {{{if eq .Host.AuthType 2}}}selected {{{end}}} {{{end}}} data-validate-type="selectPrivateKey" data-match="private_key"></option>
<option value="1" {{{if .Host}}} {{{if eq .Host.AuthType 1}}}selected {{{end}}} {{{end}}} data-validate-type="selectPassword" data-match="password"></option>
</select>
</div>
</div>
<div class="two fields">
<div class="field">
<label></label>
@ -112,19 +89,6 @@
}
]
},
username: {
identifier : 'username',
rules: [
{
type : 'empty',
prompt : 'SSH'
},
{
type : 'maxLength[32]',
prompt : '32'
}
]
},
port: {
identifier : 'port',
rules: [

View File

@ -35,7 +35,6 @@
<th>ID</th>
<th></th>
<th></th>
<th></th>
<th></th>
<th></th>
<th></th>
@ -47,7 +46,6 @@
<td>{{{.Id}}}</td>
<td>{{{.Name}}}</td>
<td>{{{.Alias}}}</td>
<td>{{{.Username}}}</td>
<td>{{{.Port}}}</td>
<td>{{{.Remark}}}</td>
<td class="operation">
@ -55,7 +53,6 @@
<button class="ui positive button" onclick="util.removeConfirm('/host/remove/{{{.Id}}}')"></button><br>
<div style="margin-top: 5px;">
<a class="ui twitter button" href="/task?host_id={{{.Id}}}"></a>
<button class="ui blue button" @click="ping({{{.Id}}})"></button>
</div>
</td>
</tr>
@ -66,23 +63,4 @@
</div>
</div>
<script type="text/javascript">
var Vue = new Vue({
el: '.ui.striped.table',
methods: {
ping: function(id) {
swal({
title: '',
text: "连接中.......",
type: 'info',
showConfirmButton: false
});
util.get("/host/ping/" + id, function(code, message) {
swal('', '', 'success');
})
}
}
});
</script>
{{{ template "common/footer" . }}}

View File

@ -33,8 +33,7 @@
<div class="field">
<select name="protocol" id="protocol">
<option value="0"></option>
<option value="3" {{{if eq .Params.Protocol 3}}}selected{{{end}}}></option>
<option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">SSH</option>
<option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">RPC</option>
<option value="1" {{{if eq .Params.Protocol 1}}}selected{{{end}}}>HTTP</option>
</select>
</div>
@ -71,7 +70,7 @@
<td>{{{.Id}}}</td>
<td>{{{.Task.Name}}}</td>
<td>{{{.Spec}}}</td>
<td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SSH {{{else if eq .Protocol 3}}}{{{end}}}</td>
<td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} RPC {{{end}}}</td>
<td>{{{if eq .Timeout -1}}}{{{else if gt .Timeout 0}}}{{{.Timeout}}}{{{else}}}{{{end}}}</td>
<td>{{{.RetryTimes}}}</td>
<td>{{{if gt .Multi 0}}}{{{else}}}{{{end}}}</td>

View File

@ -30,8 +30,7 @@
<div class="field">
<select name="protocol" id="protocol">
<option value="0"></option>
<option value="3" {{{if eq .Params.Protocol 3}}}selected{{{end}}}></option>
<option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">SSH</option>
<option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">RPC</option>
<option value="1" {{{if eq .Params.Protocol 1}}}selected{{{end}}}>HTTP</option>
</select>
</div>
@ -71,7 +70,7 @@
<td><a href="/task?id={{{.TaskId}}}">{{{.TaskId}}}</a></td>
<td>{{{.Name}}}</td>
<td>{{{.Spec}}}</td>
<td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SSH {{{else}}} {{{end}}}</td>
<td>{{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} RPC {{{end}}}</td>
<td>{{{if eq .Timeout -1}}}{{{else if gt .Timeout 0}}}{{{.Timeout}}}{{{else}}}{{{end}}}</td>
<td>{{{.RetryTimes}}}</td>
<td>{{{.Hostname}}}</td>

View File

@ -39,8 +39,7 @@
<div class="field">
<label></label>
<select name="protocol" id="protocol">
<option value="3" {{{if .Task}}} {{{if eq .Task.Protocol 3}}}selected{{{end}}} {{{end}}}></option>
<option value="2" {{{if .Task}}} {{{if eq .Task.Protocol 2}}}selected{{{end}}} {{{end}}} data-match="host_id" data-validate-type="selectProtocol">SSH</option>
<option value="2" {{{if .Task}}} {{{if eq .Task.Protocol 2}}}selected{{{end}}} {{{end}}} data-match="host_id" data-validate-type="selectProtocol">RPC</option>
<option value="1" {{{if .Task}}} {{{if eq .Task.Protocol 1}}}selected{{{end}}} {{{end}}}>HTTP</option>
</select>
</div>
@ -49,7 +48,7 @@
<div class="field">
<label></label>
<div class="ui blue message">
<pre>SSH</pre>
<pre>RPC</pre>
</div>
<select name="host_id" id="hostId">
<option value=""></option>
@ -183,9 +182,6 @@
case '2':
$('#command').attr('placeholder', 'shell');
break;
case '3':
$('#command').attr('placeholder', '');
break;
}
}

View File

@ -14,6 +14,10 @@ do
if [[ $? != 0 ]];then
break
fi
./build_node.sh -p $i
if [[ $? != 0 ]];then
break
fi
done
# 身份认证