fix issues

pull/345/head
vapao 2021-07-12 15:35:12 +08:00
parent 8c297dd205
commit 438976a067
4 changed files with 20 additions and 15 deletions

View File

@ -3,6 +3,7 @@
# Released under the AGPL-3.0 License. # Released under the AGPL-3.0 License.
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.conf import settings from django.conf import settings
from django.db import connections
from django_redis import get_redis_connection from django_redis import get_redis_connection
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from apps.schedule.executors import schedule_worker_handler from apps.schedule.executors import schedule_worker_handler
@ -23,6 +24,9 @@ class Worker:
self.rds = get_redis_connection() self.rds = get_redis_connection()
self._executor = ThreadPoolExecutor(max_workers=max(50, os.cpu_count() * 20)) self._executor = ThreadPoolExecutor(max_workers=max(50, os.cpu_count() * 20))
def job_done(self, future):
connections.close_all()
def run(self): def run(self):
logging.warning('Running worker') logging.warning('Running worker')
self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY) self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY)
@ -30,11 +34,12 @@ class Worker:
key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
key = key.decode() key = key.decode()
if key == SCHEDULE_WORKER_KEY: if key == SCHEDULE_WORKER_KEY:
self._executor.submit(schedule_worker_handler, job) future = self._executor.submit(schedule_worker_handler, job)
elif key == MONITOR_WORKER_KEY: elif key == MONITOR_WORKER_KEY:
self._executor.submit(monitor_worker_handler, job) future = self._executor.submit(monitor_worker_handler, job)
else: else:
self._executor.submit(exec_worker_handler, job) future = self._executor.submit(exec_worker_handler, job)
future.add_done_callback(self.job_done)
class Command(BaseCommand): class Command(BaseCommand):

View File

@ -4,10 +4,10 @@
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from django_redis import get_redis_connection from django_redis import get_redis_connection
from django.utils.functional import SimpleLazyObject from django.utils.functional import SimpleLazyObject
from django.db import close_old_connections from django.db import connections
from apps.monitor.models import Detection from apps.monitor.models import Detection
from apps.notify.models import Notify from apps.notify.models import Notify
from django.conf import settings from django.conf import settings
@ -27,11 +27,10 @@ class Scheduler:
self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)})
self.scheduler.add_listener( self.scheduler.add_listener(
self._handle_event, self._handle_event,
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED
) )
def _handle_event(self, event): def _handle_event(self, event):
close_old_connections()
obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first()) obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first())
if event.code == EVENT_SCHEDULER_SHUTDOWN: if event.code == EVENT_SCHEDULER_SHUTDOWN:
logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}') logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
@ -42,9 +41,9 @@ class Scheduler:
elif event.code == EVENT_JOB_ERROR: elif event.code == EVENT_JOB_ERROR:
logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}') Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}')
connections.close_all()
def _dispatch(self, task_id, tp, targets, extra, threshold, quiet): def _dispatch(self, task_id, tp, targets, extra, threshold, quiet):
close_old_connections()
Detection.objects.filter(pk=task_id).update(latest_run_time=human_datetime()) Detection.objects.filter(pk=task_id).update(latest_run_time=human_datetime())
rds_cli = get_redis_connection() rds_cli = get_redis_connection()
for t in json.loads(targets): for t in json.loads(targets):

View File

@ -6,10 +6,10 @@ from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from django_redis import get_redis_connection from django_redis import get_redis_connection
from django.utils.functional import SimpleLazyObject from django.utils.functional import SimpleLazyObject
from django.db import close_old_connections from django.db import connections
from apps.schedule.models import Task, History from apps.schedule.models import Task, History
from apps.schedule.utils import send_fail_notify from apps.schedule.utils import send_fail_notify
from apps.notify.models import Notify from apps.notify.models import Notify
@ -41,7 +41,7 @@ class Scheduler:
self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)})
self.scheduler.add_listener( self.scheduler.add_listener(
self._handle_event, self._handle_event,
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED
) )
@classmethod @classmethod
@ -64,7 +64,6 @@ class Scheduler:
raise TypeError(f'unknown schedule policy: {trigger!r}') raise TypeError(f'unknown schedule policy: {trigger!r}')
def _handle_event(self, event): def _handle_event(self, event):
close_old_connections()
obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first()) obj = SimpleLazyObject(lambda: Task.objects.filter(pk=event.job_id).first())
if event.code == EVENT_SCHEDULER_SHUTDOWN: if event.code == EVENT_SCHEDULER_SHUTDOWN:
logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}') logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
@ -75,13 +74,13 @@ class Scheduler:
elif event.code == EVENT_JOB_ERROR: elif event.code == EVENT_JOB_ERROR:
logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
send_fail_notify(obj, f'执行异常:{event.exception}') send_fail_notify(obj, f'执行异常:{event.exception}')
connections.close_all()
def _init_builtin_jobs(self): def _init_builtin_jobs(self):
self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20) self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20)
self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=1) self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=1)
def _dispatch(self, task_id, command, targets): def _dispatch(self, task_id, command, targets):
close_old_connections()
output = {x: None for x in targets} output = {x: None for x in targets}
history = History.objects.create( history = History.objects.create(
task_id=task_id, task_id=task_id,

View File

@ -67,16 +67,18 @@ class Schedule(View):
Argument('is_active', type=bool, required=False) Argument('is_active', type=bool, required=False)
).parse(request.body, True) ).parse(request.body, True)
if error is None: if error is None:
Task.objects.filter(pk=form.id).update(**form) task = Task.objects.get(pk=form.id)
if form.get('is_active') is not None: if form.get('is_active') is not None:
task.is_active = form.is_active
task.latest_id = None
if form.is_active: if form.is_active:
task = Task.objects.filter(pk=form.id).first()
message = {'id': form.id, 'action': 'add'} message = {'id': form.id, 'action': 'add'}
message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets'))) message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets')))
else: else:
message = {'id': form.id, 'action': 'remove'} message = {'id': form.id, 'action': 'remove'}
rds_cli = get_redis_connection() rds_cli = get_redis_connection()
rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(message)) rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(message))
task.save()
return json_response(error=error) return json_response(error=error)
def delete(self, request): def delete(self, request):