diff --git a/spug_api/apps/exec/management/commands/runworker.py b/spug_api/apps/exec/management/commands/runworker.py index 70418ee..3356a69 100644 --- a/spug_api/apps/exec/management/commands/runworker.py +++ b/spug_api/apps/exec/management/commands/runworker.py @@ -3,6 +3,7 @@ # Released under the AGPL-3.0 License. from django.core.management.base import BaseCommand from django.conf import settings +from django.db import connections from django_redis import get_redis_connection from concurrent.futures import ThreadPoolExecutor from apps.schedule.executors import schedule_worker_handler @@ -23,6 +24,9 @@ class Worker: self.rds = get_redis_connection() self._executor = ThreadPoolExecutor(max_workers=max(50, os.cpu_count() * 20)) + def job_done(self, future): + connections.close_all() + def run(self): logging.warning('Running worker') self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY) @@ -30,11 +34,12 @@ class Worker: key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) key = key.decode() if key == SCHEDULE_WORKER_KEY: - self._executor.submit(schedule_worker_handler, job) + future = self._executor.submit(schedule_worker_handler, job) elif key == MONITOR_WORKER_KEY: - self._executor.submit(monitor_worker_handler, job) + future = self._executor.submit(monitor_worker_handler, job) else: - self._executor.submit(exec_worker_handler, job) + future = self._executor.submit(exec_worker_handler, job) + future.add_done_callback(self.job_done) class Command(BaseCommand): diff --git a/spug_api/apps/monitor/scheduler.py b/spug_api/apps/monitor/scheduler.py index f258eaf..a4c21d9 100644 --- a/spug_api/apps/monitor/scheduler.py +++ b/spug_api/apps/monitor/scheduler.py @@ -4,10 +4,10 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR +from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from django_redis import get_redis_connection from django.utils.functional import SimpleLazyObject -from django.db import close_old_connections +from django.db import connections from apps.monitor.models import Detection from apps.notify.models import Notify from django.conf import settings @@ -27,11 +27,10 @@ class Scheduler: self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) self.scheduler.add_listener( self._handle_event, - EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES + EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED ) def _handle_event(self, event): - close_old_connections() obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first()) if event.code == EVENT_SCHEDULER_SHUTDOWN: logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}') @@ -42,9 +41,9 @@ class Scheduler: elif event.code == EVENT_JOB_ERROR: logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}') + connections.close_all() def _dispatch(self, task_id, tp, targets, extra, threshold, quiet): - close_old_connections() Detection.objects.filter(pk=task_id).update(latest_run_time=human_datetime()) rds_cli = get_redis_connection() for t in json.loads(targets): diff --git a/spug_api/apps/schedule/scheduler.py b/spug_api/apps/schedule/scheduler.py index bac7410..b9b1948 100644 --- a/spug_api/apps/schedule/scheduler.py +++ b/spug_api/apps/schedule/scheduler.py @@ -6,10 +6,10 @@ from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.cron import CronTrigger -from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR +from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from django_redis import get_redis_connection from django.utils.functional import SimpleLazyObject -from django.db import close_old_connections +from django.db import connections from apps.schedule.models import Task, History from apps.schedule.utils import send_fail_notify from apps.notify.models import Notify @@ -41,7 +41,7 @@ class Scheduler: self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) self.scheduler.add_listener( self._handle_event, - EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES + EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED ) @classmethod @@ -64,7 +64,6 @@ class Scheduler: raise TypeError(f'unknown schedule policy: {trigger!r}') def _handle_event(self, event): - close_old_connections() obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first()) if event.code == EVENT_SCHEDULER_SHUTDOWN: logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}') @@ -75,13 +74,13 @@ class Scheduler: elif event.code == EVENT_JOB_ERROR: logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') send_fail_notify(obj, f'执行异常:{event.exception}') + connections.close_all() def _init_builtin_jobs(self): self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20) self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=1) def _dispatch(self, task_id, command, targets): - close_old_connections() output = {x: None for x in targets} history = History.objects.create( task_id=task_id, diff --git a/spug_api/apps/schedule/views.py b/spug_api/apps/schedule/views.py index c0bb032..73faed8 100644 --- a/spug_api/apps/schedule/views.py +++ b/spug_api/apps/schedule/views.py @@ -67,16 +67,18 @@ class Schedule(View): Argument('is_active', type=bool, required=False) ).parse(request.body, True) if error is None: - Task.objects.filter(pk=form.id).update(**form) + task = Task.objects.get(pk=form.id) if form.get('is_active') is not None: + task.is_active = form.is_active + task.latest_id = None if form.is_active: - task = Task.objects.filter(pk=form.id).first() message = {'id': form.id, 'action': 'add'} message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets'))) else: message = {'id': form.id, 'action': 'remove'} rds_cli = get_redis_connection() rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(message)) + task.save() return json_response(error=error) def delete(self, request):