perf: 支持终断批量快捷命令执行的任务

pull/12815/head
wangruidong 2024-03-13 18:18:24 +08:00 committed by Bryan
parent 80035e7cb6
commit dcd7f9f7e6
3 changed files with 49 additions and 3 deletions

View File

@ -1,5 +1,6 @@
import json import json
import os import os
from psutil import NoSuchProcess
from celery.result import AsyncResult from celery.result import AsyncResult
from django.conf import settings from django.conf import settings
@ -198,7 +199,7 @@ class JobExecutionViewSet(OrgBulkModelViewSet):
return Response({'error': serializer.errors}, status=400) return Response({'error': serializer.errors}, status=400)
task_id = serializer.validated_data['task_id'] task_id = serializer.validated_data['task_id']
try: try:
instance = get_object_or_404(JobExecution, task_id=task_id, creator=request.user) instance = get_object_or_404(JobExecution, pk=task_id, creator=request.user)
except Http404: except Http404:
return Response( return Response(
{'error': _('The task is being created and cannot be interrupted. Please try again later.')}, {'error': _('The task is being created and cannot be interrupted. Please try again later.')},
@ -207,7 +208,10 @@ class JobExecutionViewSet(OrgBulkModelViewSet):
task = AsyncResult(task_id, app=app) task = AsyncResult(task_id, app=app)
inspect = app.control.inspect() inspect = app.control.inspect()
for worker in inspect.registered().keys(): for worker in inspect.registered().keys():
if not worker.startswith('ansible'):
continue
if task_id not in [at['id'] for at in inspect.active().get(worker, [])]: if task_id not in [at['id'] for at in inspect.active().get(worker, [])]:
# 在队列中未执行使用revoke执行 # 在队列中未执行使用revoke执行
task.revoke(terminate=True) task.revoke(terminate=True)
@ -239,7 +243,7 @@ class JobExecutionTaskDetail(APIView):
task_id = str(kwargs.get('task_id')) task_id = str(kwargs.get('task_id'))
with tmp_to_org(org): with tmp_to_org(org):
execution = get_object_or_404(JobExecution, task_id=task_id) execution = get_object_or_404(JobExecution, pk=task_id)
return Response(data={ return Response(data={
'status': execution.status, 'status': execution.status,

View File

@ -555,10 +555,12 @@ class JobExecution(JMSOrgBaseModel):
ssh_tunnel.local_gateway_clean(runner) ssh_tunnel.local_gateway_clean(runner)
def stop(self): def stop(self):
from ops.signal_handlers import job_execution_stop_pub_sub
with open(os.path.join(self.private_dir, 'local.pid')) as f: with open(os.path.join(self.private_dir, 'local.pid')) as f:
try: try:
pid = f.read() pid = f.read()
os.kill(int(pid), 9) job_execution_stop_pub_sub.publish(int(pid))
except Exception as e: except Exception as e:
print(e) print(e)
self.set_error('Job stop by "kill -9 {}"'.format(pid)) self.set_error('Job stop by "kill -9 {}"'.format(pid))

View File

@ -1,4 +1,6 @@
import ast import ast
import psutil
from psutil import NoSuchProcess
import time import time
from celery import signals from celery import signals
@ -8,9 +10,11 @@ from django.db.models.signals import pre_save
from django.db.utils import ProgrammingError from django.db.utils import ProgrammingError
from django.dispatch import receiver from django.dispatch import receiver
from django.utils import translation, timezone from django.utils import translation, timezone
from django.utils.functional import LazyObject
from common.db.utils import close_old_connections, get_logger from common.db.utils import close_old_connections, get_logger
from common.signals import django_ready from common.signals import django_ready
from common.utils.connection import RedisPubSub
from orgs.utils import get_current_org_id, set_current_org from orgs.utils import get_current_org_id, set_current_org
from .celery import app from .celery import app
from .models import CeleryTaskExecution, CeleryTask, Job from .models import CeleryTaskExecution, CeleryTask, Job
@ -144,3 +148,39 @@ def task_sent_handler(headers=None, body=None, **kwargs):
} }
CeleryTaskExecution.objects.create(**data) CeleryTaskExecution.objects.create(**data)
CeleryTask.objects.filter(name=task).update(date_last_publish=timezone.now()) CeleryTask.objects.filter(name=task).update(date_last_publish=timezone.now())
@receiver(django_ready)
def subscribe_stop_job_execution(sender, **kwargs):
logger.info("Start subscribe for stop job execution")
def on_stop(pid):
logger.info(f"Stop job execution {pid} start")
try:
current_process = psutil.Process(pid)
except NoSuchProcess as e:
logger.error(e)
return
children = current_process.children(recursive=True)
logger.debug(f"Job execution process children: {children}")
for child in children:
if child.pid == 1:
continue
if child.name() != 'ssh':
continue
try:
child.kill()
logger.debug(f"Kill job execution process {pid} children process {child.pid} success")
except Exception as e:
logger.error(e)
job_execution_stop_pub_sub.subscribe(on_stop)
class JobExecutionPubSub(LazyObject):
def _setup(self):
self._wrapped = RedisPubSub('fm.job_execution_stop')
job_execution_stop_pub_sub = JobExecutionPubSub()