diff --git a/spug_api/apps/schedule/builtin.py b/spug_api/apps/schedule/builtin.py index 4f5bfc9..64d4828 100644 --- a/spug_api/apps/schedule/builtin.py +++ b/spug_api/apps/schedule/builtin.py @@ -1,7 +1,7 @@ # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. -from django.db import close_old_connections +from django.db import connections from apps.account.models import History from apps.alarm.models import Alarm from apps.schedule.models import Task @@ -14,33 +14,37 @@ from threading import Thread def auto_run_by_day(): - close_old_connections() - date = datetime.now() - timedelta(days=30) - History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() - Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() try: - record = ExecHistory.objects.all()[50] - ExecHistory.objects.filter(id__lt=record.id).delete() - except IndexError: - pass - for task in Task.objects.all(): + date = datetime.now() - timedelta(days=30) + History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() + Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() try: - record = History.objects.filter(task_id=task.id)[50] - History.objects.filter(task_id=task.id, id__lt=record.id).delete() + record = ExecHistory.objects.all()[50] + ExecHistory.objects.filter(id__lt=record.id).delete() except IndexError: pass + for task in Task.objects.all(): + try: + record = History.objects.filter(task_id=task.id)[50] + History.objects.filter(task_id=task.id, id__lt=record.id).delete() + except IndexError: + pass + finally: + connections.close_all() def auto_run_by_minute(): - close_old_connections() - now = datetime.now() - for req in DeployRequest.objects.filter(status='2'): - if (now - parse_time(req.do_at)).seconds > 3600: - req.status = '-3' + try: + now = datetime.now() + for req in DeployRequest.objects.filter(status='2'): + if (now - parse_time(req.do_at)).seconds > 3600: + req.status = '-3' + req.save() + for req in DeployRequest.objects.filter(status='1', plan__lte=now): + req.status = '2' + req.do_at = human_datetime() + req.do_by = req.created_by req.save() - for req in DeployRequest.objects.filter(status='1', plan__lte=now): - req.status = '2' - req.do_at = human_datetime() - req.do_by = req.created_by - req.save() - Thread(target=dispatch, args=(req,)).start() + Thread(target=dispatch, args=(req,)).start() + finally: + connections.close_all() diff --git a/spug_api/apps/schedule/executors.py b/spug_api/apps/schedule/executors.py index 1575a1a..ff8b7a7 100644 --- a/spug_api/apps/schedule/executors.py +++ b/spug_api/apps/schedule/executors.py @@ -42,12 +42,12 @@ def schedule_worker_handler(job): if host_id == 'local': code, duration, out = local_executor(command) else: - close_old_connections() host = Host.objects.filter(pk=host_id).first() if not host: code, duration, out = 1, 0, f'unknown host id for {host_id!r}' else: code, duration, out = host_executor(host, command) + close_old_connections() with transaction.atomic(): history = History.objects.select_for_update().get(pk=history_id) diff --git a/spug_api/apps/schedule/scheduler.py b/spug_api/apps/schedule/scheduler.py index f948cfc..94e2d63 100644 --- a/spug_api/apps/schedule/scheduler.py +++ b/spug_api/apps/schedule/scheduler.py @@ -6,13 +6,9 @@ from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.cron import CronTrigger -from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from django_redis import get_redis_connection -from django.utils.functional import SimpleLazyObject from django.db import connections from apps.schedule.models import Task, History -from apps.schedule.utils import send_fail_notify -from apps.notify.models import Notify from apps.schedule.builtin import auto_run_by_day, auto_run_by_minute from django.conf import settings from libs import AttrDict, human_datetime @@ -40,10 +36,6 @@ class Scheduler: def __init__(self): self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) - self.scheduler.add_listener( - self._handle_event, - EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED - ) @classmethod def covert_week(cls, week_str): @@ -64,19 +56,6 @@ class Scheduler: else: raise TypeError(f'unknown schedule policy: {trigger!r}') - def _handle_event(self, event): - obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first()) - if event.code == EVENT_SCHEDULER_SHUTDOWN: - logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}') - Notify.make_notify('schedule', '1', '调度器已关闭', '调度器意外关闭,你可以在github上提交issue') - elif event.code == EVENT_JOB_MAX_INSTANCES: - logging.warning(f'EVENT_JOB_MAX_INSTANCES: {event}') - send_fail_notify(obj, '达到调度实例上限,一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时') - elif event.code == EVENT_JOB_ERROR: - logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') - send_fail_notify(obj, f'执行异常:{event.exception}') - connections.close_all() - def _init_builtin_jobs(self): self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20) self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=1) @@ -93,6 +72,7 @@ class Scheduler: rds_cli = get_redis_connection() for t in targets: rds_cli.rpush(SCHEDULE_WORKER_KEY, json.dumps([history.id, t, command])) + connections.close_all() def _init(self): self.scheduler.start() @@ -105,6 +85,7 @@ class Scheduler: id=str(task.id), args=(task.id, task.command, json.loads(task.targets)), ) + connections.close_all() def run(self): rds_cli = get_redis_connection() diff --git a/spug_api/apps/schedule/utils.py b/spug_api/apps/schedule/utils.py index 2752700..5186d86 100644 --- a/spug_api/apps/schedule/utils.py +++ b/spug_api/apps/schedule/utils.py @@ -3,7 +3,6 @@ # Released under the AGPL-3.0 License. from libs.utils import human_datetime from libs.spug import Notification -from threading import Thread import json @@ -12,7 +11,7 @@ def send_fail_notify(task, msg=None): mode = rst_notify.get('mode') url = rst_notify.get('value') if mode != '0' and url: - Thread(target=_do_notify, args=(task, mode, url, msg)).start() + _do_notify(task, mode, url, msg) def _do_notify(task, mode, url, msg):