F 修复服务数据库连接可能意外丢失的问题

pull/22/head
vapao 2020-01-21 23:57:09 +08:00
parent a9d1e281bb
commit 4e5fe4046f
3 changed files with 21 additions and 13 deletions

View File

@ -3,9 +3,10 @@
# Released under the MIT License.
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler import events
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from django_redis import get_redis_connection
from django.utils.functional import SimpleLazyObject
from django.db import close_old_connections
from apps.monitor.models import Detection
from apps.alarm.models import Alarm
from apps.monitor.executors import dispatch
@ -25,7 +26,9 @@ class Scheduler:
def __init__(self):
self.scheduler = BackgroundScheduler(timezone=self.timezone)
self.scheduler.add_listener(self._handle_event, )
self.scheduler.add_listener(
self._handle_event,
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED)
def _record_alarm(self, obj, status):
duration = seconds_to_human(time.time() - obj.latest_fault_time)
@ -63,17 +66,18 @@ class Scheduler:
self._do_notify('1', obj)
def _handle_event(self, event):
close_old_connections()
obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first())
if event.code == events.EVENT_SCHEDULER_SHUTDOWN:
if event.code == EVENT_SCHEDULER_SHUTDOWN:
logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
Notify.make_notify('monitor', '1', '调度器已关闭', '调度器意外关闭你可以在github上提交issue', False)
elif event.code == events.EVENT_JOB_MAX_INSTANCES:
elif event.code == EVENT_JOB_MAX_INSTANCES:
logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}')
Notify.make_notify('monitor', '1', f'{obj.name} - 达到调度实例上限', '一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
elif event.code == events.EVENT_JOB_ERROR:
elif event.code == EVENT_JOB_ERROR:
logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}')
elif event.code == events.EVENT_JOB_EXECUTED:
elif event.code == EVENT_JOB_EXECUTED:
obj = Detection.objects.filter(pk=event.job_id).first()
old_status = obj.latest_status
obj.latest_status = 0 if event.retval else 1

View File

@ -26,7 +26,7 @@ def host_executor(q, host, pkey, command):
cli = SSH(host.hostname, host.port, host.username, pkey=pkey)
exit_code, out = cli.exec_command(command)
finally:
q.put((host.id, exit_code, round(time.time() - now, 3), out.decode()))
q.put((host.id, exit_code, round(time.time() - now, 3), out.decode() if out else None))
def dispatch(command, targets):

View File

@ -4,9 +4,10 @@
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler import events
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from django_redis import get_redis_connection
from django.utils.functional import SimpleLazyObject
from django.db import close_old_connections
from apps.schedule.models import Task
from apps.notify.models import Notify
from apps.schedule.executors import dispatch
@ -26,7 +27,9 @@ class Scheduler:
def __init__(self):
self.scheduler = BackgroundScheduler(timezone=self.timezone)
self.scheduler.add_listener(self._handle_event, )
self.scheduler.add_listener(
self._handle_event,
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED)
@classmethod
def parse_trigger(cls, trigger, trigger_args):
@ -38,17 +41,18 @@ class Scheduler:
raise TypeError(f'unknown schedule policy: {trigger!r}')
def _handle_event(self, event):
close_old_connections()
obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first())
if event.code == events.EVENT_SCHEDULER_SHUTDOWN:
if event.code == EVENT_SCHEDULER_SHUTDOWN:
logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
Notify.make_notify('schedule', '1', '调度器已关闭', '调度器意外关闭你可以在github上提交issue')
elif event.code == events.EVENT_JOB_MAX_INSTANCES:
elif event.code == EVENT_JOB_MAX_INSTANCES:
logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}')
Notify.make_notify('schedule', '1', f'{obj.name} - 达到调度实例上限', '一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
elif event.code == events.EVENT_JOB_ERROR:
elif event.code == EVENT_JOB_ERROR:
logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
Notify.make_notify('schedule', '1', f'{obj.name} - 执行异常', f'{event.exception}')
elif event.code == events.EVENT_JOB_EXECUTED:
elif event.code == EVENT_JOB_EXECUTED:
if event.retval:
score = 0
for item in event.retval: