feat: ansible receptor kill 进程

pull/13002/head
Aaron3S 2024-04-11 21:44:08 +08:00 committed by Bryan
parent 6a12bc39e9
commit 6052306c04
6 changed files with 84 additions and 31 deletions

View File

@ -121,7 +121,7 @@ RUN set -ex \
COPY --from=stage-2 /opt/py3 /opt/py3 COPY --from=stage-2 /opt/py3 /opt/py3
COPY --from=stage-1 /opt/jumpserver/release/jumpserver /opt/jumpserver COPY --from=stage-1 /opt/jumpserver/release/jumpserver /opt/jumpserver
COPY --from=stage-1 /opt/jumpserver/release/jumpserver/apps/ops/ansible/ansible.cfg /etc/ansible COPY --from=stage-1 /opt/jumpserver/release/jumpserver/apps/ops/ansible/ansible.cfg /etc/ansible/
WORKDIR /opt/jumpserver WORKDIR /opt/jumpserver

View File

@ -48,6 +48,11 @@ class DefaultCallback:
event = data.get('event', None) event = data.get('event', None)
if not event: if not event:
return return
pid = data.get('pid', None)
if pid:
self.write_pid(pid)
event_data = data.get('event_data', {}) event_data = data.get('event_data', {})
host = event_data.get('remote_addr', '') host = event_data.get('remote_addr', '')
task = event_data.get('task', '') task = event_data.get('task', '')
@ -156,3 +161,8 @@ class DefaultCallback:
status = data.get('status', '') status = data.get('status', '')
self.status = self.STATUS_MAPPER.get(status, 'unknown') self.status = self.STATUS_MAPPER.get(status, 'unknown')
self.private_data_dir = data.get("private_data_dir", None) self.private_data_dir = data.get("private_data_dir", None)
def write_pid(self, pid):
pid_filepath = os.path.join(self.private_data_dir, 'local.pid')
with open(pid_filepath, 'w') as f:
f.write(str(pid))

View File

@ -31,6 +31,20 @@ def nodes():
return receptor_ctl.simple_command("status").get("Advertisements", None) return receptor_ctl.simple_command("status").get("Advertisements", None)
def kill_process(pid):
submit_result = receptor_ctl.submit_work(worktype="kill", node="primary", payload=str(pid))
unit_id = submit_result["unitid"]
result_socket, result_file = receptor_ctl.get_work_results(unit_id=unit_id, return_sockfile=True,
return_socket=True)
while not result_socket.close():
buf = result_file.read()
if not buf:
break
print(buf.decode('utf8'))
def run(**kwargs): def run(**kwargs):
receptor_runner = AnsibleReceptorRunner(**kwargs) receptor_runner = AnsibleReceptorRunner(**kwargs)
return receptor_runner.run() return receptor_runner.run()

View File

@ -524,15 +524,16 @@ class JobExecution(JMSOrgBaseModel):
ssh_tunnel.local_gateway_clean(runner) ssh_tunnel.local_gateway_clean(runner)
def stop(self): def stop(self):
unit_id_path = os.path.join(self.private_dir, "local.unitid") from ops.signal_handlers import job_execution_stop_pub_sub
if os.path.exists(unit_id_path): pid_path = os.path.join(self.private_dir, "local.pid")
with open(unit_id_path) as f: if os.path.exists(pid_path):
with open(pid_path) as f:
try: try:
unit_id = f.read() pid = f.read()
receptor_runner.cancel(unit_id) job_execution_stop_pub_sub.publish(int(pid))
except Exception as e: except Exception as e:
print(e) print(e)
self.set_error('Job stop by "receptor worker cancel, unit id is {}"'.format(unit_id)) self.set_error('Job stop by "user cancel"')
class Meta: class Meta:
verbose_name = _("Job Execution") verbose_name = _("Job Execution")

View File

@ -1,7 +1,6 @@
import ast import ast
import time import time
import psutil
from celery import signals from celery import signals
from django.core.cache import cache from django.core.cache import cache
from django.db import transaction from django.db import transaction
@ -10,13 +9,13 @@ from django.db.utils import ProgrammingError
from django.dispatch import receiver from django.dispatch import receiver
from django.utils import translation, timezone from django.utils import translation, timezone
from django.utils.functional import LazyObject from django.utils.functional import LazyObject
from psutil import NoSuchProcess
from common.db.utils import close_old_connections, get_logger from common.db.utils import close_old_connections, get_logger
from common.signals import django_ready from common.signals import django_ready
from common.utils.connection import RedisPubSub from common.utils.connection import RedisPubSub
from jumpserver.utils import get_current_request from jumpserver.utils import get_current_request
from orgs.utils import get_current_org_id, set_current_org from orgs.utils import get_current_org_id, set_current_org
from .ansible.receptor.receptor_runner import kill_process
from .celery import app from .celery import app
from .models import CeleryTaskExecution, CeleryTask, Job from .models import CeleryTaskExecution, CeleryTask, Job
@ -160,24 +159,7 @@ def subscribe_stop_job_execution(sender, **kwargs):
def on_stop(pid): def on_stop(pid):
logger.info(f"Stop job execution {pid} start") logger.info(f"Stop job execution {pid} start")
try: kill_process(pid)
current_process = psutil.Process(pid)
except NoSuchProcess as e:
logger.error(e)
return
children = current_process.children(recursive=True)
logger.debug(f"Job execution process children: {children}")
for child in children:
if child.pid == 1:
continue
if child.name() != 'ssh':
continue
try:
child.kill()
logger.debug(f"Kill job execution process {pid} children process {child.pid} success")
except Exception as e:
logger.error(e)
job_execution_stop_pub_sub.subscribe(on_stop) job_execution_stop_pub_sub.subscribe(on_stop)

View File

@ -2,16 +2,22 @@
# coding: utf-8 # coding: utf-8
import argparse import argparse
import logging
import shutil import shutil
import subprocess import subprocess
import os import os
import signal import signal
import tempfile import tempfile
import psutil
from psutil import NoSuchProcess
ANSIBLE_RUNNER_COMMAND = "ansible-runner" ANSIBLE_RUNNER_COMMAND = "ansible-runner"
DEFAULT_SHARE_DIR = "data/share" DEFAULT_SHARE_DIR = "data/share"
DEFAULT_CONTROL_SOCK_PATH = os.path.join(DEFAULT_SHARE_DIR, "control.sock") DEFAULT_CONTROL_SOCK_PATH = os.path.join(DEFAULT_SHARE_DIR, "control.sock")
logger = logging.getLogger(__name__)
class ReceptorService: class ReceptorService:
def __init__(self): def __init__(self):
@ -19,6 +25,8 @@ class ReceptorService:
self.receptor_command = [ self.receptor_command = [
'receptor', 'receptor',
'--local-only', '--local-only',
'--log-level',
'level=Debug',
'--node', 'id=primary', '--node', 'id=primary',
'--control-service', '--control-service',
'service=control', 'service=control',
@ -27,6 +35,11 @@ class ReceptorService:
'worktype={}'.format(ANSIBLE_RUNNER_COMMAND), 'worktype={}'.format(ANSIBLE_RUNNER_COMMAND),
'command={}'.format(ANSIBLE_RUNNER_COMMAND), 'command={}'.format(ANSIBLE_RUNNER_COMMAND),
'params=worker', 'params=worker',
'allowruntimeparams=true',
'--work-command',
'worktype={}'.format("kill"),
'command={}'.format("python"),
"params=receptor kill",
'allowruntimeparams=true' 'allowruntimeparams=true'
] ]
@ -52,7 +65,6 @@ class ReceptorService:
except ValueError: except ValueError:
print("\n- PID file is corrupted, starting Receptor...") print("\n- PID file is corrupted, starting Receptor...")
os.remove(self.pid_file) os.remove(self.pid_file)
process = subprocess.Popen(self.receptor_command) process = subprocess.Popen(self.receptor_command)
with open(self.pid_file, 'w') as f: with open(self.pid_file, 'w') as f:
f.write(str(process.pid)) f.write(str(process.pid))
@ -99,7 +111,8 @@ class ReceptorService:
print("\n- Receptor service is not running.") print("\n- Receptor service is not running.")
def handle_receptor_action(action): def handle_receptor_action(args):
action = args.action
srv = ReceptorService() srv = ReceptorService()
if action == "start": if action == "start":
srv.start() srv.start()
@ -109,6 +122,37 @@ def handle_receptor_action(action):
srv.restart() srv.restart()
elif action == "status": elif action == "status":
srv.status() srv.status()
elif action == "kill":
kill_progress_tree()
def kill_progress_tree(pid=None):
if not pid:
try:
pid_input = input()
pid = int(pid_input)
except Exception as e:
logger.error(e)
return
logger.info("progress {} will be kill".format(pid))
try:
current_process = psutil.Process(pid)
except NoSuchProcess as e:
logger.error(e)
return
children = current_process.children(recursive=True)
for child in children:
if child.pid == 1:
continue
if child.name() != 'ssh':
continue
try:
child.kill()
except Exception as e:
logger.error(e)
if __name__ == '__main__': if __name__ == '__main__':
@ -119,9 +163,11 @@ if __name__ == '__main__':
) )
parser.add_argument( parser.add_argument(
'action', type=str, 'action', type=str,
choices=("start", "stop", "restart", "status"), choices=("start", "stop", "restart", "status", "kill"),
help="Action to run" help="Action to run"
) )
# parser.add_argument('--pid', type=int, default=42, help='what PID you want to kill')
args = parser.parse_args() args = parser.parse_args()
handle_receptor_action(args.action) handle_receptor_action(args)