pull/418/head
vapao 2021-12-08 15:21:24 +08:00
parent 69a8940b5e
commit 71776a4228
4 changed files with 31 additions and 47 deletions

View File

@ -1,7 +1,7 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com> # Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License. # Released under the AGPL-3.0 License.
from django.db import close_old_connections from django.db import connections
from apps.account.models import History from apps.account.models import History
from apps.alarm.models import Alarm from apps.alarm.models import Alarm
from apps.schedule.models import Task from apps.schedule.models import Task
@ -14,7 +14,7 @@ from threading import Thread
def auto_run_by_day(): def auto_run_by_day():
close_old_connections() try:
date = datetime.now() - timedelta(days=30) date = datetime.now() - timedelta(days=30)
History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete()
Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete()
@ -29,10 +29,12 @@ def auto_run_by_day():
History.objects.filter(task_id=task.id, id__lt=record.id).delete() History.objects.filter(task_id=task.id, id__lt=record.id).delete()
except IndexError: except IndexError:
pass pass
finally:
connections.close_all()
def auto_run_by_minute(): def auto_run_by_minute():
close_old_connections() try:
now = datetime.now() now = datetime.now()
for req in DeployRequest.objects.filter(status='2'): for req in DeployRequest.objects.filter(status='2'):
if (now - parse_time(req.do_at)).seconds > 3600: if (now - parse_time(req.do_at)).seconds > 3600:
@ -44,3 +46,5 @@ def auto_run_by_minute():
req.do_by = req.created_by req.do_by = req.created_by
req.save() req.save()
Thread(target=dispatch, args=(req,)).start() Thread(target=dispatch, args=(req,)).start()
finally:
connections.close_all()

View File

@ -42,12 +42,12 @@ def schedule_worker_handler(job):
if host_id == 'local': if host_id == 'local':
code, duration, out = local_executor(command) code, duration, out = local_executor(command)
else: else:
close_old_connections()
host = Host.objects.filter(pk=host_id).first() host = Host.objects.filter(pk=host_id).first()
if not host: if not host:
code, duration, out = 1, 0, f'unknown host id for {host_id!r}' code, duration, out = 1, 0, f'unknown host id for {host_id!r}'
else: else:
code, duration, out = host_executor(host, command) code, duration, out = host_executor(host, command)
close_old_connections() close_old_connections()
with transaction.atomic(): with transaction.atomic():
history = History.objects.select_for_update().get(pk=history_id) history = History.objects.select_for_update().get(pk=history_id)

View File

@ -6,13 +6,9 @@ from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from django_redis import get_redis_connection from django_redis import get_redis_connection
from django.utils.functional import SimpleLazyObject
from django.db import connections from django.db import connections
from apps.schedule.models import Task, History from apps.schedule.models import Task, History
from apps.schedule.utils import send_fail_notify
from apps.notify.models import Notify
from apps.schedule.builtin import auto_run_by_day, auto_run_by_minute from apps.schedule.builtin import auto_run_by_day, auto_run_by_minute
from django.conf import settings from django.conf import settings
from libs import AttrDict, human_datetime from libs import AttrDict, human_datetime
@ -40,10 +36,6 @@ class Scheduler:
def __init__(self): def __init__(self):
self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)})
self.scheduler.add_listener(
self._handle_event,
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED
)
@classmethod @classmethod
def covert_week(cls, week_str): def covert_week(cls, week_str):
@ -64,19 +56,6 @@ class Scheduler:
else: else:
raise TypeError(f'unknown schedule policy: {trigger!r}') raise TypeError(f'unknown schedule policy: {trigger!r}')
def _handle_event(self, event):
obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first())
if event.code == EVENT_SCHEDULER_SHUTDOWN:
logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
Notify.make_notify('schedule', '1', '调度器已关闭', '调度器意外关闭你可以在github上提交issue')
elif event.code == EVENT_JOB_MAX_INSTANCES:
logging.warning(f'EVENT_JOB_MAX_INSTANCES: {event}')
send_fail_notify(obj, '达到调度实例上限,一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
elif event.code == EVENT_JOB_ERROR:
logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
send_fail_notify(obj, f'执行异常:{event.exception}')
connections.close_all()
def _init_builtin_jobs(self): def _init_builtin_jobs(self):
self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20) self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20)
self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=1) self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=1)
@ -93,6 +72,7 @@ class Scheduler:
rds_cli = get_redis_connection() rds_cli = get_redis_connection()
for t in targets: for t in targets:
rds_cli.rpush(SCHEDULE_WORKER_KEY, json.dumps([history.id, t, command])) rds_cli.rpush(SCHEDULE_WORKER_KEY, json.dumps([history.id, t, command]))
connections.close_all()
def _init(self): def _init(self):
self.scheduler.start() self.scheduler.start()
@ -105,6 +85,7 @@ class Scheduler:
id=str(task.id), id=str(task.id),
args=(task.id, task.command, json.loads(task.targets)), args=(task.id, task.command, json.loads(task.targets)),
) )
connections.close_all()
def run(self): def run(self):
rds_cli = get_redis_connection() rds_cli = get_redis_connection()

View File

@ -3,7 +3,6 @@
# Released under the AGPL-3.0 License. # Released under the AGPL-3.0 License.
from libs.utils import human_datetime from libs.utils import human_datetime
from libs.spug import Notification from libs.spug import Notification
from threading import Thread
import json import json
@ -12,7 +11,7 @@ def send_fail_notify(task, msg=None):
mode = rst_notify.get('mode') mode = rst_notify.get('mode')
url = rst_notify.get('value') url = rst_notify.get('value')
if mode != '0' and url: if mode != '0' and url:
Thread(target=_do_notify, args=(task, mode, url, msg)).start() _do_notify(task, mode, url, msg)
def _do_notify(task, mode, url, msg): def _do_notify(task, mode, url, msg):