diff --git a/Dockerfile-ce b/Dockerfile-ce index e2360cd49..cbdfbfb63 100644 --- a/Dockerfile-ce +++ b/Dockerfile-ce @@ -121,7 +121,7 @@ RUN set -ex \ COPY --from=stage-2 /opt/py3 /opt/py3 COPY --from=stage-1 /opt/jumpserver/release/jumpserver /opt/jumpserver -COPY --from=stage-1 /opt/jumpserver/release/jumpserver/apps/ops/ansible/ansible.cfg /etc/ansible +COPY --from=stage-1 /opt/jumpserver/release/jumpserver/apps/ops/ansible/ansible.cfg /etc/ansible/ WORKDIR /opt/jumpserver diff --git a/apps/ops/ansible/callback.py b/apps/ops/ansible/callback.py index 7ccd0888f..80094332e 100644 --- a/apps/ops/ansible/callback.py +++ b/apps/ops/ansible/callback.py @@ -48,6 +48,11 @@ class DefaultCallback: event = data.get('event', None) if not event: return + + pid = data.get('pid', None) + if pid: + self.write_pid(pid) + event_data = data.get('event_data', {}) host = event_data.get('remote_addr', '') task = event_data.get('task', '') @@ -156,3 +161,8 @@ class DefaultCallback: status = data.get('status', '') self.status = self.STATUS_MAPPER.get(status, 'unknown') self.private_data_dir = data.get("private_data_dir", None) + + def write_pid(self, pid): + pid_filepath = os.path.join(self.private_data_dir, 'local.pid') + with open(pid_filepath, 'w') as f: + f.write(str(pid)) \ No newline at end of file diff --git a/apps/ops/ansible/receptor/receptor_runner.py b/apps/ops/ansible/receptor/receptor_runner.py index 4e1b1c410..091c86c36 100644 --- a/apps/ops/ansible/receptor/receptor_runner.py +++ b/apps/ops/ansible/receptor/receptor_runner.py @@ -31,6 +31,20 @@ def nodes(): return receptor_ctl.simple_command("status").get("Advertisements", None) +def kill_process(pid): + submit_result = receptor_ctl.submit_work(worktype="kill", node="primary", payload=str(pid)) + + unit_id = submit_result["unitid"] + + result_socket, result_file = receptor_ctl.get_work_results(unit_id=unit_id, return_sockfile=True, + return_socket=True) + while not result_socket.close(): + buf = result_file.read() + if not buf: + break + print(buf.decode('utf8')) + + def run(**kwargs): receptor_runner = AnsibleReceptorRunner(**kwargs) return receptor_runner.run() diff --git a/apps/ops/models/job.py b/apps/ops/models/job.py index 759596d54..cb81a565d 100644 --- a/apps/ops/models/job.py +++ b/apps/ops/models/job.py @@ -524,15 +524,16 @@ class JobExecution(JMSOrgBaseModel): ssh_tunnel.local_gateway_clean(runner) def stop(self): - unit_id_path = os.path.join(self.private_dir, "local.unitid") - if os.path.exists(unit_id_path): - with open(unit_id_path) as f: + from ops.signal_handlers import job_execution_stop_pub_sub + pid_path = os.path.join(self.private_dir, "local.pid") + if os.path.exists(pid_path): + with open(pid_path) as f: try: - unit_id = f.read() - receptor_runner.cancel(unit_id) + pid = f.read() + job_execution_stop_pub_sub.publish(int(pid)) except Exception as e: print(e) - self.set_error('Job stop by "receptor worker cancel, unit id is {}"'.format(unit_id)) + self.set_error('Job stop by "user cancel"') class Meta: verbose_name = _("Job Execution") diff --git a/apps/ops/signal_handlers.py b/apps/ops/signal_handlers.py index 5df6f6cf3..0c9e560f1 100644 --- a/apps/ops/signal_handlers.py +++ b/apps/ops/signal_handlers.py @@ -1,7 +1,6 @@ import ast import time -import psutil from celery import signals from django.core.cache import cache from django.db import transaction @@ -10,13 +9,13 @@ from django.db.utils import ProgrammingError from django.dispatch import receiver from django.utils import translation, timezone from django.utils.functional import LazyObject -from psutil import NoSuchProcess from common.db.utils import close_old_connections, get_logger from common.signals import django_ready from common.utils.connection import RedisPubSub from jumpserver.utils import get_current_request from orgs.utils import get_current_org_id, set_current_org +from .ansible.receptor.receptor_runner import kill_process from .celery import app from .models import CeleryTaskExecution, CeleryTask, Job @@ -160,24 +159,7 @@ def subscribe_stop_job_execution(sender, **kwargs): def on_stop(pid): logger.info(f"Stop job execution {pid} start") - try: - current_process = psutil.Process(pid) - except NoSuchProcess as e: - logger.error(e) - return - - children = current_process.children(recursive=True) - logger.debug(f"Job execution process children: {children}") - for child in children: - if child.pid == 1: - continue - if child.name() != 'ssh': - continue - try: - child.kill() - logger.debug(f"Kill job execution process {pid} children process {child.pid} success") - except Exception as e: - logger.error(e) + kill_process(pid) job_execution_stop_pub_sub.subscribe(on_stop) diff --git a/receptor b/receptor index e6a1493cb..baf037101 100755 --- a/receptor +++ b/receptor @@ -2,16 +2,22 @@ # coding: utf-8 import argparse +import logging import shutil import subprocess import os import signal import tempfile +import psutil +from psutil import NoSuchProcess + ANSIBLE_RUNNER_COMMAND = "ansible-runner" DEFAULT_SHARE_DIR = "data/share" DEFAULT_CONTROL_SOCK_PATH = os.path.join(DEFAULT_SHARE_DIR, "control.sock") +logger = logging.getLogger(__name__) + class ReceptorService: def __init__(self): @@ -19,6 +25,8 @@ class ReceptorService: self.receptor_command = [ 'receptor', '--local-only', + '--log-level', + 'level=Debug', '--node', 'id=primary', '--control-service', 'service=control', @@ -27,6 +35,11 @@ class ReceptorService: 'worktype={}'.format(ANSIBLE_RUNNER_COMMAND), 'command={}'.format(ANSIBLE_RUNNER_COMMAND), 'params=worker', + 'allowruntimeparams=true', + '--work-command', + 'worktype={}'.format("kill"), + 'command={}'.format("python"), + "params=receptor kill", 'allowruntimeparams=true' ] @@ -52,7 +65,6 @@ class ReceptorService: except ValueError: print("\n- PID file is corrupted, starting Receptor...") os.remove(self.pid_file) - process = subprocess.Popen(self.receptor_command) with open(self.pid_file, 'w') as f: f.write(str(process.pid)) @@ -99,7 +111,8 @@ class ReceptorService: print("\n- Receptor service is not running.") -def handle_receptor_action(action): +def handle_receptor_action(args): + action = args.action srv = ReceptorService() if action == "start": srv.start() @@ -109,6 +122,37 @@ def handle_receptor_action(action): srv.restart() elif action == "status": srv.status() + elif action == "kill": + kill_progress_tree() + + +def kill_progress_tree(pid=None): + if not pid: + try: + pid_input = input() + pid = int(pid_input) + except Exception as e: + logger.error(e) + return + + logger.info("progress {} will be kill".format(pid)) + + try: + current_process = psutil.Process(pid) + except NoSuchProcess as e: + logger.error(e) + return + + children = current_process.children(recursive=True) + for child in children: + if child.pid == 1: + continue + if child.name() != 'ssh': + continue + try: + child.kill() + except Exception as e: + logger.error(e) if __name__ == '__main__': @@ -119,9 +163,11 @@ if __name__ == '__main__': ) parser.add_argument( 'action', type=str, - choices=("start", "stop", "restart", "status"), + choices=("start", "stop", "restart", "status", "kill"), help="Action to run" ) + # parser.add_argument('--pid', type=int, default=42, help='what PID you want to kill') + args = parser.parse_args() - handle_receptor_action(args.action) + handle_receptor_action(args)