ansilbe host模块重构

pull/21/merge
ouqiang 2017-03-24 13:06:53 +08:00
parent 7a443c3ac3
commit e11e8cc1cf
12 changed files with 132 additions and 99 deletions

1
.gitignore vendored
View File

@ -28,5 +28,4 @@ data/*
log/*
conf/install.lock
conf/app.ini
conf/ansible_hosts.ini
public/js/vue.js

View File

@ -9,6 +9,7 @@ import (
"github.com/ouqiang/cron-scheduler/modules/app"
"fmt"
"os"
"github.com/ouqiang/cron-scheduler/routers"
)
// web服务器默认端口
@ -36,7 +37,7 @@ func run(ctx *cli.Context) {
runScheduler()
m := macaron.Classic()
// 注册路由
registerRouter(m)
routers.Register(m)
// 注册中间件
registerMiddleware(m)
port := parsePort(ctx)
@ -49,15 +50,6 @@ func runScheduler() {
os.Exit(1)
}
// 路由注册
func registerRouter(m *macaron.Macaron) {
// 所有GET方法自动注册HEAD方法
m.SetAutoHead(true)
m.Get("/", func(ctx *macaron.Context) (string) {
return "go home"
})
}
// 中间件注册
func registerMiddleware(m *macaron.Macaron) {
m.Use(macaron.Logger())

View File

View File

@ -18,7 +18,7 @@ const AppVersion = "0.0.1"
func main() {
app := cli.NewApp()
app.Name = "cron-scheduler"
app.Usage = "crons-scheduler service"
app.Usage = "cron-scheduler service"
app.Version = AppVersion
app.Commands = []cli.Command{
cmd.CmdWeb,

View File

@ -25,7 +25,7 @@ const (
const (
Page = 1 // 当前页数
PageSize = 20 // 每页多少条数据
MaxPageSize = 1000 // 每次最多取多少条
MaxPageSize = 100000 // 每次最多取多少条
)
// 创建Db

View File

@ -6,18 +6,25 @@ import (
type Protocol int8
type TaskType int8
const (
HTTP Protocol = 1
SSH Protocol = 2
)
const (
Timing TaskType = 1
Delay TaskType = 2
)
// 任务
type Task struct {
Id int `xorm:"int pk autoincr"`
Name string `xorm:"varchar(64) notnull"` // 任务名称
Spec string `xorm:"varchar(64) notnull"` // crontab 时间格式
Protocol Protocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh
Type int8 `xorm:"tinyint notnull default 1"` // 任务类型 1: 定时任务 2: 延时任务
Type TaskType `xorm:"tinyint notnull default 1"` // 任务类型 1: 定时任务 2: 延时任务
Command string `xorm:"varchar(512) notnull"` // URL地址或shell命令
Timeout int `xorm:"mediumint notnull default 0"` // 定时任务:执行超时时间(单位秒)0不限制 延时任务: 延时timeout秒后执行
SshHosts string `xorm:"varchar(512) notnull defalut '' "` // SSH主机名, host id逗号分隔

View File

@ -48,13 +48,7 @@ func ExecCommand(hosts string, module string, args... string) (output string, er
err = errors.New("参数不完整")
return
}
hostFile, err := DefaultHosts.GetHostFile()
if err != nil {
return
}
defer func() {
os.Remove(hostFile)
}()
hostFile := DefaultHosts.GetFilename()
commandArgs := []string{hosts, "-i", hostFile, "-m", module}
if len(args) != 0 {
commandArgs = append(commandArgs, "-a")
@ -76,14 +70,10 @@ func ExecPlaybook(playbook Playbook) (result string, err error) {
if err != nil {
return
}
hostFile, err := DefaultHosts.GetHostFile()
if err != nil {
return
}
hostFile := DefaultHosts.GetFilename()
defer func() {
playbookFile.Close()
os.Remove(playbookFile.Name())
os.Remove(hostFile)
}()
_, err = playbookFile.Write(data)
if err != nil {

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"bytes"
"strconv"
"github.com/ouqiang/cron-scheduler/modules/utils"
)
// 主机名
@ -13,27 +14,38 @@ var DefaultHosts *Hosts
type Hosts struct {
sync.RWMutex
hosts []models.Host
filename string
}
func(h *Hosts) Get() []models.Host {
h.RLock()
defer h.RUnlock()
func NewHosts(hostFilename string) *Hosts {
h := &Hosts{sync.RWMutex{}, hostFilename}
return h.hosts
}
func(h *Hosts) Set(hostsModel []models.Host) {
h.Lock()
defer h.Unlock()
h.hosts = hostsModel
return h
}
// 获取hosts文件名
func(h *Hosts) GetHostFile() (filename string ,err error) {
func(h *Hosts) GetFilename() string {
h.RLock()
defer h.RUnlock()
return h.filename
}
// 写入hosts
func(h *Hosts) Write() {
host := new(models.Host)
hostModels, err := host.List()
if err != nil {
utils.RecordLog(err)
return
}
if len(hostModels) == 0 {
utils.RecordLog("hosts内容为空")
return
}
buffer := bytes.Buffer{}
for _, hostModel := range(h.hosts) {
for _, hostModel := range(hostModels) {
buffer.WriteString(strconv.Itoa(int(hostModel.Id)))
buffer.WriteString(" ansible_ssh_host=")
buffer.WriteString(hostModel.Name)
@ -47,19 +59,9 @@ func(h *Hosts) GetHostFile() (filename string ,err error) {
}
buffer.WriteString("\n")
}
tmpFile, err := ioutil.TempFile(GetTmpDir(), "host")
if err != nil {
return
}
defer func() {
tmpFile.Close()
}()
_, err = tmpFile.WriteString(buffer.String())
if err == nil {
filename = tmpFile.Name()
}
h.Lock()
defer h.Unlock()
err = ioutil.WriteFile(h.filename, buffer.Bytes(), 0644)
return
}

View File

@ -5,9 +5,9 @@ import (
"runtime"
"github.com/ouqiang/cron-scheduler/modules/utils"
"github.com/ouqiang/cron-scheduler/modules/ansible"
"github.com/ouqiang/cron-scheduler/modules/crontask"
"github.com/ouqiang/cron-scheduler/models"
"github.com/ouqiang/cron-scheduler/modules/ansible"
)
var (
@ -36,7 +36,7 @@ func init() {
os.Setenv("ANSIBLE_CONFIG", ConfDir)
Installed = IsInstalled()
if Installed {
initResource()
InitResource()
}
}
@ -78,17 +78,15 @@ func CreateInstallLock() error {
// 初始化资源
func initResource() {
func InitResource() {
// 初始化定时任务
crontask.DefaultCronTask = crontask.NewCronTask()
// 初始化DB
models.Db = models.CreateDb(AppConfig)
ansible.DefaultHosts = &ansible.Hosts{}
hostModel := new(models.Host)
hosts, err := hostModel.List()
if err != nil {
utils.RecordLog(err)
} else {
ansible.DefaultHosts.Set(hosts)
}
// 初始化ansible Hosts
ansible.DefaultHosts = ansible.NewHosts(AnsibleHosts)
ansible.DefaultHosts.Write()
os.Exit(1)
}
// 检测目录是否存在

View File

@ -4,24 +4,29 @@ import (
"github.com/robfig/cron"
"errors"
"sync"
"strings"
)
var DefaultCronTask *CronTask
type CronMap map[string]*cron.Cron
type CronTask struct {
sync.RWMutex
tasks map[string]*cron.Cron
tasks CronMap
}
func NewCronTask() *CronTask {
return &CronTask {
sync.RWMutex{},
make(map[string]*cron.Cron),
make(CronMap),
}
}
// 新增定时任务,如果name存在则添加失败
func(cronTask *CronTask) Add(name string, spec string, cmd func() ) error {
// name 任务名称
// spec crontab时间格式定义 可定义多个时间\n分隔
func(cronTask *CronTask) Add(name string, spec string, cmd cron.FuncJob ) (err error) {
if name == "" || spec == "" || cmd == nil {
return errors.New("参数不完整")
}
@ -32,13 +37,22 @@ func(cronTask *CronTask) Add(name string, spec string, cmd func() ) error {
cronTask.Lock()
defer cronTask.Unlock()
cronTask.tasks[name] = cron.New()
err := cronTask.tasks[name].AddFunc(spec, cmd)
specs := strings.Split(spec, "\n")
for _, item := range(specs) {
_, err = cron.Parse(item)
if err != nil {
return err
}
}
for _, item := range(specs) {
err = cronTask.tasks[name].AddFunc(item, cmd)
}
return err
}
// 任务不存在则新增,任务已存在则替换任务
func(cronTask *CronTask) addOrReplace(name string, spec string, cmd func() ) error {
// 任务不存在则新增,任务已存在则删除后新增
func(cronTask *CronTask) AddOrReplace(name string, spec string, cmd cron.FuncJob) error {
if cronTask.IsExist(name) {
cronTask.Delete(name)
}

13
routers/routers.go Normal file
View File

@ -0,0 +1,13 @@
package routers
import "gopkg.in/macaron.v1"
// 路由注册
func Register(m *macaron.Macaron) {
// 所有GET方法自动注册HEAD方法
m.SetAutoHead(true)
// 首页
m.Get("/", func(ctx *macaron.Context) (string) {
return "go home"
})
}

View File

@ -8,6 +8,8 @@ import (
"strconv"
"time"
"github.com/ouqiang/cron-scheduler/modules/crontask"
"github.com/robfig/cron"
"errors"
)
type Task struct {}
@ -18,9 +20,11 @@ func(task *Task) Initialize() {
taskList, err := taskModel.List()
if err != nil {
utils.RecordLog("获取任务列表错误-", err.Error())
return
}
if len(taskList) == 0 {
utils.RecordLog("任务列表为空")
return
}
for _, item := range(taskList) {
task.Add(item)
@ -28,24 +32,18 @@ func(task *Task) Initialize() {
}
// 添加任务
func(task *Task) Add(taskModel models.Task) {
var taskFunc func() = nil;
switch taskModel.Protocol {
case models.HTTP:
taskFunc = func() {
var handler Handler = new(HTTPHandler)
handler.Run(taskModel)
}
case models.SSH:
taskFunc = func() {
var handler Handler = new(SSHHandler)
handler.Run(taskModel)
}
default:
utils.RecordLog("任务协议不存在-协议编号: ", taskModel.Protocol)
func(task *Task) Add(taskModel models.Task) {
taskFunc := createHandlerJob(taskModel)
if taskFunc == nil {
utils.RecordLog("添加任务#不存在的任务协议编号", taskModel.Protocol)
return
}
if (taskFunc != nil) {
crontask.DefaultCronTask.Add(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc)
// 定时任务
if taskModel.Type == models.Timing {
crontask.DefaultCronTask.AddOrReplace(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc)
} else if taskModel.Type == models.Delay {
// 延时任务
time.AfterFunc(time.Duration(taskModel.Timeout), taskFunc)
}
}
@ -80,15 +78,6 @@ func(h *HTTPHandler) Run(taskModel models.Task) {
utils.RecordLog("读取HTTP请求返回值失败-", err.Error())
}
_, err = taskModel.Update(
taskModel.Id,
models.CommonMap{
"status": 0,
"result" : string(body),
});
if err != nil {
utils.RecordLog("更新任务日志失败-", err.Error())
}
}
// SSH任务
@ -98,11 +87,40 @@ func(ssh *SSHHandler) Run(taskModel models.Task) {
}
// 延时任务
type DelayHandler struct {}
func createTaskLog(taskModel models.Task) (int64, error) {
taskLogModel := new(models.TaskLog)
taskLogModel.TaskId = taskModel.Id
taskLogModel.StartTime = time.Now()
taskLogModel.Status = models.Running
insertId, err := taskLogModel.Create()
func (handler *DelayHandler) Run(taskModel models.Task) {
return insertId, err
}
func updateTaskLog(taskModel models.Task, result string) {
taskLogModel := new(models.TaskLog)
taskLogModel.TaskId= taskModel.Id
taskLogModel.StartTime = time.Now()
}
func createHandlerJob(taskModel models.Task) cron.FuncJob {
var taskFunc cron.FuncJob = nil;
switch taskModel.Protocol {
case models.HTTP:
taskFunc = func() {
var handler Handler = new(HTTPHandler)
createTaskLog(taskModel)
handler.Run(taskModel)
}
case models.SSH:
taskFunc = func() {
var handler Handler = new(SSHHandler)
createTaskLog(taskModel)
handler.Run(taskModel)
}
}
return taskFunc
}