mirror of https://github.com/openspug/spug
Merge branch '2.0'
commit
ed99d65c99
|
@ -3,9 +3,10 @@
|
|||
# Released under the MIT License.
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from apscheduler import events
|
||||
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
|
||||
from django_redis import get_redis_connection
|
||||
from django.utils.functional import SimpleLazyObject
|
||||
from django.db import close_old_connections
|
||||
from apps.monitor.models import Detection
|
||||
from apps.alarm.models import Alarm
|
||||
from apps.monitor.executors import dispatch
|
||||
|
@ -25,7 +26,9 @@ class Scheduler:
|
|||
|
||||
def __init__(self):
|
||||
self.scheduler = BackgroundScheduler(timezone=self.timezone)
|
||||
self.scheduler.add_listener(self._handle_event, )
|
||||
self.scheduler.add_listener(
|
||||
self._handle_event,
|
||||
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED)
|
||||
|
||||
def _record_alarm(self, obj, status):
|
||||
duration = seconds_to_human(time.time() - obj.latest_fault_time)
|
||||
|
@ -63,17 +66,18 @@ class Scheduler:
|
|||
self._do_notify('1', obj)
|
||||
|
||||
def _handle_event(self, event):
|
||||
close_old_connections()
|
||||
obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first())
|
||||
if event.code == events.EVENT_SCHEDULER_SHUTDOWN:
|
||||
if event.code == EVENT_SCHEDULER_SHUTDOWN:
|
||||
logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
|
||||
Notify.make_notify('monitor', '1', '调度器已关闭', '调度器意外关闭,你可以在github上提交issue', False)
|
||||
elif event.code == events.EVENT_JOB_MAX_INSTANCES:
|
||||
elif event.code == EVENT_JOB_MAX_INSTANCES:
|
||||
logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}')
|
||||
Notify.make_notify('monitor', '1', f'{obj.name} - 达到调度实例上限', '一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
|
||||
elif event.code == events.EVENT_JOB_ERROR:
|
||||
elif event.code == EVENT_JOB_ERROR:
|
||||
logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
|
||||
Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}')
|
||||
elif event.code == events.EVENT_JOB_EXECUTED:
|
||||
elif event.code == EVENT_JOB_EXECUTED:
|
||||
obj = Detection.objects.filter(pk=event.job_id).first()
|
||||
old_status = obj.latest_status
|
||||
obj.latest_status = 0 if event.retval else 1
|
||||
|
|
|
@ -6,6 +6,7 @@ from threading import Thread
|
|||
from libs.ssh import SSH
|
||||
from apps.host.models import Host
|
||||
from apps.setting.utils import AppSetting
|
||||
from django.db import close_old_connections
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
|
@ -26,10 +27,11 @@ def host_executor(q, host, pkey, command):
|
|||
cli = SSH(host.hostname, host.port, host.username, pkey=pkey)
|
||||
exit_code, out = cli.exec_command(command)
|
||||
finally:
|
||||
q.put((host.id, exit_code, round(time.time() - now, 3), out.decode()))
|
||||
q.put((host.id, exit_code, round(time.time() - now, 3), out.decode() if out else None))
|
||||
|
||||
|
||||
def dispatch(command, targets):
|
||||
close_old_connections()
|
||||
threads, pkey, q = [], AppSetting.get('private_key'), Queue()
|
||||
for t in targets:
|
||||
if t == 'local':
|
||||
|
|
|
@ -4,9 +4,10 @@
|
|||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from apscheduler.triggers.date import DateTrigger
|
||||
from apscheduler import events
|
||||
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
|
||||
from django_redis import get_redis_connection
|
||||
from django.utils.functional import SimpleLazyObject
|
||||
from django.db import close_old_connections
|
||||
from apps.schedule.models import Task
|
||||
from apps.notify.models import Notify
|
||||
from apps.schedule.executors import dispatch
|
||||
|
@ -26,7 +27,9 @@ class Scheduler:
|
|||
|
||||
def __init__(self):
|
||||
self.scheduler = BackgroundScheduler(timezone=self.timezone)
|
||||
self.scheduler.add_listener(self._handle_event, )
|
||||
self.scheduler.add_listener(
|
||||
self._handle_event,
|
||||
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED)
|
||||
|
||||
@classmethod
|
||||
def parse_trigger(cls, trigger, trigger_args):
|
||||
|
@ -38,17 +41,18 @@ class Scheduler:
|
|||
raise TypeError(f'unknown schedule policy: {trigger!r}')
|
||||
|
||||
def _handle_event(self, event):
|
||||
close_old_connections()
|
||||
obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first())
|
||||
if event.code == events.EVENT_SCHEDULER_SHUTDOWN:
|
||||
if event.code == EVENT_SCHEDULER_SHUTDOWN:
|
||||
logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
|
||||
Notify.make_notify('schedule', '1', '调度器已关闭', '调度器意外关闭,你可以在github上提交issue')
|
||||
elif event.code == events.EVENT_JOB_MAX_INSTANCES:
|
||||
elif event.code == EVENT_JOB_MAX_INSTANCES:
|
||||
logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}')
|
||||
Notify.make_notify('schedule', '1', f'{obj.name} - 达到调度实例上限', '一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
|
||||
elif event.code == events.EVENT_JOB_ERROR:
|
||||
elif event.code == EVENT_JOB_ERROR:
|
||||
logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
|
||||
Notify.make_notify('schedule', '1', f'{obj.name} - 执行异常', f'{event.exception}')
|
||||
elif event.code == events.EVENT_JOB_EXECUTED:
|
||||
elif event.code == EVENT_JOB_EXECUTED:
|
||||
if event.retval:
|
||||
score = 0
|
||||
for item in event.retval:
|
||||
|
|
|
@ -85,7 +85,6 @@ class ComTable extends React.Component {
|
|||
}, {
|
||||
title: '更新于',
|
||||
dataIndex: 'latest_run_time',
|
||||
render: value => value ? moment(value).fromNow() : null
|
||||
}, {
|
||||
title: '操作',
|
||||
render: info => (
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
import { observable } from "mobx";
|
||||
import http from 'libs/http';
|
||||
import moment from "moment";
|
||||
|
||||
class Store {
|
||||
@observable records = [];
|
||||
|
@ -18,7 +19,13 @@ class Store {
|
|||
fetchRecords = () => {
|
||||
this.isFetching = true;
|
||||
http.get('/api/monitor/')
|
||||
.then(res => this.records = res)
|
||||
.then(res => {
|
||||
res.map(item => {
|
||||
const value = item['latest_run_time'];
|
||||
item['latest_run_time'] = value ? moment(value).fromNow() : null
|
||||
});
|
||||
this.records = res
|
||||
})
|
||||
.finally(() => this.isFetching = false)
|
||||
};
|
||||
|
||||
|
|
|
@ -58,9 +58,8 @@ class ComTable extends React.Component {
|
|||
}
|
||||
},
|
||||
}, {
|
||||
title: '最近时间',
|
||||
title: '更新于',
|
||||
dataIndex: 'latest_run_time',
|
||||
render: value => value ? moment(value).fromNow() : 'N/A'
|
||||
}, {
|
||||
title: '描述信息',
|
||||
dataIndex: 'desc',
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
import { observable } from "mobx";
|
||||
import http from 'libs/http';
|
||||
import moment from "moment";
|
||||
|
||||
class Store {
|
||||
@observable records = [];
|
||||
|
@ -23,6 +24,10 @@ class Store {
|
|||
this.isFetching = true;
|
||||
http.get('/api/schedule/')
|
||||
.then(({types, tasks}) => {
|
||||
tasks.map(item => {
|
||||
const value = item['latest_run_time'];
|
||||
item['latest_run_time'] = value ? moment(value).fromNow() : null
|
||||
});
|
||||
this.records = tasks;
|
||||
this.types = types
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue