完善HTTP任务、SSH任务

pull/21/merge
ouqiang 8 years ago
parent c556e40912
commit 1533dc3ca1

@ -7,8 +7,6 @@ import (
"github.com/go-macaron/session"
"github.com/go-macaron/csrf"
"github.com/ouqiang/cron-scheduler/modules/app"
"fmt"
"os"
"github.com/ouqiang/cron-scheduler/routers"
)
@ -31,10 +29,7 @@ var CmdWeb = cli.Command{
}
func run(ctx *cli.Context) {
// 检测环境
app.CheckEnv()
// 启动定时任务
runScheduler()
app.InitEnv()
m := macaron.Classic()
// 注册路由
routers.Register(m)
@ -44,12 +39,6 @@ func run(ctx *cli.Context) {
m.Run(port)
}
// 定时任务调度
func runScheduler() {
fmt.Println("hello world")
os.Exit(1)
}
// 中间件注册
func registerMiddleware(m *macaron.Macaron) {
m.Use(macaron.Logger())

@ -3,7 +3,7 @@ package main
/*--------------------------------------------------------
Linux crontab
HTTPSSH
HTTPSSH
--------------------------------------------------------*/
import (

@ -23,8 +23,13 @@ const (
)
// 新增
func(host *Host) Create() (int64, error) {
return Db.Insert(host)
func(host *Host) Create() (insertId int16, err error) {
_, err = Db.Insert(host)
if err == nil {
insertId = host.Id
}
return
}
// 更新

@ -26,7 +26,8 @@ type Task struct {
Protocol Protocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh
Type TaskType `xorm:"tinyint notnull default 1"` // 任务类型 1: 定时任务 2: 延时任务
Command string `xorm:"varchar(512) notnull"` // URL地址或shell命令
Timeout int `xorm:"mediumint notnull default 0"` // 定时任务:执行超时时间(单位秒)0不限制 延时任务: 延时timeout秒后执行
Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制
Delay int `xorm:"int notnull default 0"` // 延时任务,延时时间(单位秒)
SshHosts string `xorm:"varchar(512) notnull defalut '' "` // SSH主机名, host id逗号分隔
Remark string `xorm:"varchar(512) notnull default ''"` // 备注
Created time.Time `xorm:"datetime notnull created"` // 创建时间
@ -37,10 +38,15 @@ type Task struct {
}
// 新增
func(task *Task) Create() (int64, error) {
func(task *Task) Create() (insertId int, err error) {
task.Status = Enabled
return Db.Insert(task)
_, err = Db.Insert(task)
if err == nil {
insertId = task.Id
}
return
}
// 更新
@ -63,6 +69,14 @@ func(task *Task) Enable(id int) (int64, error) {
return task.Update(id, CommonMap{"status": Enabled})
}
func(task *Task) ActiveList() ([]Task, error) {
task.parsePageAndPageSize()
list := make([]Task, 0)
err := Db.Where("status = ?", Enabled).Desc("id").Find(&list)
return list, err
}
func(task *Task) List() ([]Task, error) {
task.parsePageAndPageSize()
list := make([]Task, 0)

@ -16,10 +16,15 @@ type TaskLog struct{
PageSize int `xorm:"-"`
}
func(taskLog *TaskLog) Create() (int64, error) {
func(taskLog *TaskLog) Create() (insertId int, err error) {
taskLog.Status = Running
return Db.Insert(taskLog)
_, err = Db.Insert(taskLog)
if err == nil {
insertId = taskLog.Id
}
return
}
// 更新

@ -24,12 +24,17 @@ type User struct {
}
// 新增
func(user *User) Create() (int64, error) {
func(user *User) Create() (insertId int, err error) {
user.Status = Enabled
user.Salt = user.generateSalt()
user.Password = user.encryptPassword(user.Password, user.Salt)
return Db.Insert(user)
_, err = Db.Insert(user)
if err == nil {
insertId = user.Id
}
return
}
// 更新

@ -1,97 +1,29 @@
package ansible
// ansible ad-hoc playbook命令封装
// ansible ad-hoc 命令封装
import (
"os"
"errors"
"gopkg.in/yaml.v2"
"io/ioutil"
"github.com/ouqiang/cron-scheduler/modules/utils"
)
type Handler map[string]interface{}
type Playbook struct {
Name string
Hosts string
Tasks []Handler
Handlers []Handler
}
func(playbook *Playbook) SetHosts(hosts string) {
playbook.Hosts = hosts
}
func(playbook *Playbook) SetName(name string) {
playbook.Name = name
}
func(playbook *Playbook) AddTask(handler Handler) {
playbook.Tasks = append(playbook.Tasks, handler)
}
func(playbook *Playbook) AddHandler(handler Handler) {
playbook.Handlers = append(playbook.Handlers, handler)
}
/**
* ad-hoc
* hosts
* hosts
* hostFile
* module
* args
*/
func ExecCommand(hosts string, module string, args... string) (output string, err error) {
if hosts== "" || module == "" {
func ExecCommand(hosts string, hostFile string, args... string) (output string, err error) {
if hosts== "" || hostFile == "" || len(args) == 0 {
err = errors.New("参数不完整")
return
}
hostFile := DefaultHosts.GetFilename()
commandArgs := []string{hosts, "-i", hostFile, "-m", module}
if len(args) != 0 {
commandArgs = append(commandArgs, "-a")
commandArgs = append(commandArgs, args...)
}
commandArgs := []string{hosts, "-i", hostFile}
commandArgs = append(commandArgs, args...)
output, err = utils.ExecShell("ansible", commandArgs...)
return
}
// 执行playbook
func ExecPlaybook(playbook Playbook) (result string, err error) {
data, err := yaml.Marshal([]Playbook{playbook})
if err != nil {
return
}
playbookFile, err := ioutil.TempFile(GetTmpDir(), "playbook")
if err != nil {
return
}
hostFile := DefaultHosts.GetFilename()
defer func() {
playbookFile.Close()
os.Remove(playbookFile.Name())
}()
_, err = playbookFile.Write(data)
if err != nil {
return
}
commandArgs := []string{"-i", hostFile, playbookFile.Name()}
result, err = utils.ExecShell("ansible-playbook", commandArgs...)
return
}
// 判断 获取临时目录,默认/dev/shm
func GetTmpDir() string {
dir := "/dev/shm"
_, err := os.Stat(dir)
if os.IsPermission(err) {
return ""
}
return dir
}
}

@ -19,6 +19,7 @@ type Hosts struct {
func NewHosts(hostFilename string) *Hosts {
h := &Hosts{sync.RWMutex{}, hostFilename}
h.Write()
return h
}

@ -8,6 +8,7 @@ import (
"github.com/ouqiang/cron-scheduler/modules/crontask"
"github.com/ouqiang/cron-scheduler/models"
"github.com/ouqiang/cron-scheduler/modules/ansible"
"github.com/ouqiang/cron-scheduler/service"
)
var (
@ -20,7 +21,8 @@ var (
Installed bool // 应用是否安装过
)
func init() {
func InitEnv() {
CheckEnv()
wd, err := os.Getwd()
if err != nil {
panic(err)
@ -79,14 +81,14 @@ func CreateInstallLock() error {
// 初始化资源
func InitResource() {
// 初始化定时任务
crontask.DefaultCronTask = crontask.NewCronTask()
// 初始化DB
models.Db = models.CreateDb(AppConfig)
// 初始化ansible Hosts
ansible.DefaultHosts = ansible.NewHosts(AnsibleHosts)
ansible.DefaultHosts.Write()
os.Exit(1)
// 初始化定时任务
crontask.DefaultCronTask = crontask.NewCronTask()
serviceTask := new(service.Task)
serviceTask.Initialize()
}
// 检测目录是否存在

@ -34,10 +34,11 @@ func(cronTask *CronTask) Add(name string, spec string, cmd cron.FuncJob ) (err e
return errors.New("任务已存在")
}
spec = strings.TrimSpace(spec)
cronTask.Lock()
defer cronTask.Unlock()
cronTask.tasks[name] = cron.New()
specs := strings.Split(spec, "\n")
specs := strings.Split(spec, "|||")
for _, item := range(specs) {
_, err = cron.Parse(item)
if err != nil {
@ -93,7 +94,7 @@ func(cronTask *CronTask) Delete(name string) {
}
// 运行所有任务
func(cronTask *CronTask) run() {
func(cronTask *CronTask) Run() {
for _, cron := range cronTask.tasks {
// cron内部有开启goroutine,此处不用新建goroutine
cron.Start()

@ -1,13 +1,15 @@
package routers
import "gopkg.in/macaron.v1"
import (
"gopkg.in/macaron.v1"
)
// 路由注册
func Register(m *macaron.Macaron) {
// 所有GET方法自动注册HEAD方法
m.SetAutoHead(true)
// 首页
m.Get("/", func(ctx *macaron.Context) (string) {
m.Any("/", func(ctx *macaron.Context) (string) {
return "go home"
})
}

@ -9,15 +9,16 @@ import (
"time"
"github.com/ouqiang/cron-scheduler/modules/crontask"
"github.com/robfig/cron"
"errors"
"github.com/ouqiang/cron-scheduler/modules/ansible"
"fmt"
)
type Task struct {}
// 初始化任务,从数据库取出所有任务添加到定时任务
func(task *Task) Initialize() {
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
func(task *Task) Initialize() {
taskModel := new(models.Task)
taskList, err := taskModel.List()
taskList, err := taskModel.ActiveList()
if err != nil {
utils.RecordLog("获取任务列表错误-", err.Error())
return
@ -29,8 +30,11 @@ func(task *Task) Initialize() {
for _, item := range(taskList) {
task.Add(item)
}
crontask.DefaultCronTask.Run()
}
// 添加任务
func(task *Task) Add(taskModel models.Task) {
taskFunc := createHandlerJob(taskModel)
@ -40,21 +44,24 @@ func(task *Task) Add(taskModel models.Task) {
}
// 定时任务
if taskModel.Type == models.Timing {
crontask.DefaultCronTask.AddOrReplace(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc)
err := crontask.DefaultCronTask.AddOrReplace(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc)
if err != nil {
utils.RecordLog(err)
}
} else if taskModel.Type == models.Delay {
// 延时任务
time.AfterFunc(time.Duration(taskModel.Timeout), taskFunc)
time.AfterFunc(time.Duration(taskModel.Delay) * time.Second, taskFunc)
}
}
type Handler interface {
Run(taskModel models.Task)
Run(taskModel models.Task) (string, error)
}
// HTTP任务
type HTTPHandler struct {}
func(h *HTTPHandler) Run(taskModel models.Task) {
func(h *HTTPHandler) Run(taskModel models.Task) (result string, err error) {
client := &http.Client{}
if (taskModel.Timeout > 0) {
client.Timeout = time.Duration(taskModel.Timeout) * time.Second
@ -68,7 +75,11 @@ func(h *HTTPHandler) Run(taskModel models.Task) {
req.Header.Set("User-Agent", "golang-cron/scheduler")
resp, err := client.Do(req)
defer resp.Body.Close()
defer func() {
if resp != nil {
resp.Body.Close()
}
}()
if err != nil {
utils.RecordLog("HTTP请求错误-", err.Error())
return
@ -78,18 +89,30 @@ func(h *HTTPHandler) Run(taskModel models.Task) {
utils.RecordLog("读取HTTP请求返回值失败-", err.Error())
}
return string(body),err
}
// SSH任务
type SSHHandler struct {}
func(ssh *SSHHandler) Run(taskModel models.Task) {
func(ssh *SSHHandler) Run(taskModel models.Task) (string, error) {
var args []string = []string{
"-m", "shell",
"-a", taskModel.Command,
}
if (taskModel.Timeout > 0) {
// -B 异步执行超时时间, -P 轮询时间
args = append(args, "-B", strconv.Itoa(taskModel.Timeout), "-P", "10")
}
result, err := ansible.ExecCommand(taskModel.SshHosts, ansible.DefaultHosts.GetFilename(), args...)
return result, err
}
func createTaskLog(taskModel models.Task) (int64, error) {
func createTaskLog(taskId int) (int, error) {
taskLogModel := new(models.TaskLog)
taskLogModel.TaskId = taskModel.Id
taskLogModel.TaskId = taskId
taskLogModel.StartTime = time.Now()
taskLogModel.Status = models.Running
insertId, err := taskLogModel.Create()
@ -97,30 +120,44 @@ func createTaskLog(taskModel models.Task) (int64, error) {
return insertId, err
}
func updateTaskLog(taskModel models.Task, result string) {
func updateTaskLog(taskLogId int, result string, err error) (int64, error) {
fmt.Println(taskLogId)
taskLogModel := new(models.TaskLog)
taskLogModel.TaskId= taskModel.Id
taskLogModel.StartTime = time.Now()
var status models.Status
if err != nil {
result = err.Error() + " " + result
status = models.Failure
} else {
status = models.Finish
}
return taskLogModel.Update(taskLogId, models.CommonMap{
"status": status,
"result": result,
});
}
func createHandlerJob(taskModel models.Task) cron.FuncJob {
var taskFunc cron.FuncJob = nil;
var handler Handler = nil
switch taskModel.Protocol {
case models.HTTP:
taskFunc = func() {
var handler Handler = new(HTTPHandler)
createTaskLog(taskModel)
handler.Run(taskModel)
}
handler = new(HTTPHandler)
case models.SSH:
taskFunc = func() {
var handler Handler = new(SSHHandler)
createTaskLog(taskModel)
handler.Run(taskModel)
}
handler = new(SSHHandler)
}
taskFunc := func() {
taskLogId, err := createTaskLog(taskModel.Id)
if err != nil {
utils.RecordLog("写入任务日志失败-", err)
return
}
// err != nil 执行失败
result, err := handler.Run(taskModel)
_, err = updateTaskLog(int(taskLogId), result, err)
if err != nil {
utils.RecordLog("更新任务日志失败-", err)
}
}
return taskFunc
}
}
Loading…
Cancel
Save