From 52922088a92f97d2bc3f117f81ab83be1eb31e98 Mon Sep 17 00:00:00 2001 From: fit2bot <68588906+fit2bot@users.noreply.github.com> Date: Mon, 22 Apr 2024 13:51:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E7=BB=93=E6=9E=84=EF=BC=8Creceptor=E5=BC=80=E5=85=B3=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=BA=20tcp=20=E9=80=9A=E4=BF=A1=20(#1307?= =?UTF-8?q?8)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 优化代码结构,receptor开关,修改为 tcp 通信 * fix: 修改导包路径 * fix: 修复错别字 * fix: 修改导包路径 * perf: 优化代码 * fix: 修复任务不执行的问题 * perf: 优化配置项名称 * perf: 优化代码结构 * perf: 优化代码 --------- Co-authored-by: Aaron3S --- apps/assets/automations/base/manager.py | 7 +- apps/jumpserver/conf.py | 6 +- apps/jumpserver/settings/custom.py | 5 +- .../receptor => libs/process}/__init__.py | 0 apps/libs/process/ssh.py | 26 ++++ apps/ops/ansible/__init__.py | 4 +- apps/ops/ansible/callback.py | 2 +- apps/ops/ansible/cleaner.py | 26 ++-- apps/ops/ansible/exception.py | 5 + apps/ops/ansible/interface.py | 46 ++++++ apps/ops/ansible/receptor/receptor_runner.py | 147 ------------------ apps/ops/ansible/runner.py | 45 ++---- apps/ops/ansible/runners/__init__.py | 3 + apps/ops/ansible/runners/base.py | 42 +++++ apps/ops/ansible/runners/native.py | 24 +++ apps/ops/ansible/runners/receptor.py | 100 ++++++++++++ .../ansible/runners/receptorctl/__init__.py | 0 .../runners/receptorctl/receptorctl.py | 38 +++++ apps/ops/models/job.py | 6 +- apps/ops/signal_handlers.py | 5 +- receptor | 30 +--- 21 files changed, 337 insertions(+), 230 deletions(-) rename apps/{ops/ansible/receptor => libs/process}/__init__.py (100%) create mode 100644 apps/libs/process/ssh.py create mode 100644 apps/ops/ansible/exception.py create mode 100644 apps/ops/ansible/interface.py delete mode 100644 apps/ops/ansible/receptor/receptor_runner.py create mode 100644 apps/ops/ansible/runners/__init__.py create mode 100644 apps/ops/ansible/runners/base.py create mode 100644 apps/ops/ansible/runners/native.py create mode 100644 apps/ops/ansible/runners/receptor.py create mode 100644 apps/ops/ansible/runners/receptorctl/__init__.py create mode 100644 apps/ops/ansible/runners/receptorctl/receptorctl.py diff --git a/apps/assets/automations/base/manager.py b/apps/assets/automations/base/manager.py index d94e5583c..570f6f195 100644 --- a/apps/assets/automations/base/manager.py +++ b/apps/assets/automations/base/manager.py @@ -12,7 +12,8 @@ from sshtunnel import SSHTunnelForwarder from assets.automations.methods import platform_automation_methods from common.utils import get_logger, lazyproperty, is_openssh_format_key, ssh_pubkey_gen -from ops.ansible import JMSInventory, SuperPlaybookRunner, DefaultCallback +from ops.ansible import JMSInventory, DefaultCallback, SuperPlaybookRunner +from ops.ansible.interface import interface logger = get_logger(__name__) @@ -54,7 +55,9 @@ class SSHTunnelManager: not_valid.append(k) else: local_bind_port = server.local_bind_port - host['ansible_host'] = jms_asset['address'] = host['login_host'] = 'jms_celery' + + host['ansible_host'] = jms_asset['address'] = host[ + 'login_host'] = interface.get_gateway_proxy_host() host['ansible_port'] = jms_asset['port'] = host['login_port'] = local_bind_port servers.append(server) diff --git a/apps/jumpserver/conf.py b/apps/jumpserver/conf.py index 7534683b2..1ee650b39 100644 --- a/apps/jumpserver/conf.py +++ b/apps/jumpserver/conf.py @@ -617,8 +617,10 @@ class Config(dict): 'TICKET_APPLY_ASSET_SCOPE': 'all', # Ansible Receptor - 'ANSIBLE_RECEPTOR_ENABLE': True, - 'ANSIBLE_RECEPTOR_SOCK_PATH': '{}/data/share/control.sock'.format(PROJECT_DIR) + 'ANSIBLE_RECEPTOR_ENABLED': True, + 'ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST': 'jms_celery', + 'ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS': 'jms_receptor:7521' + } old_config_map = { diff --git a/apps/jumpserver/settings/custom.py b/apps/jumpserver/settings/custom.py index 453648240..5a4418270 100644 --- a/apps/jumpserver/settings/custom.py +++ b/apps/jumpserver/settings/custom.py @@ -232,5 +232,6 @@ FILE_UPLOAD_SIZE_LIMIT_MB = CONFIG.FILE_UPLOAD_SIZE_LIMIT_MB TICKET_APPLY_ASSET_SCOPE = CONFIG.TICKET_APPLY_ASSET_SCOPE # Ansible Receptor -ANSIBLE_RECEPTOR_ENABLE = CONFIG.ANSIBLE_RECEPTOR_ENABLE -ANSIBLE_RECEPTOR_SOCK_PATH = CONFIG.ANSIBLE_RECEPTOR_SOCK_PATH +ANSIBLE_RECEPTOR_ENABLED = CONFIG.ANSIBLE_RECEPTOR_ENABLED +ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST = CONFIG.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST +ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS = CONFIG.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS diff --git a/apps/ops/ansible/receptor/__init__.py b/apps/libs/process/__init__.py similarity index 100% rename from apps/ops/ansible/receptor/__init__.py rename to apps/libs/process/__init__.py diff --git a/apps/libs/process/ssh.py b/apps/libs/process/ssh.py new file mode 100644 index 000000000..baf0f776f --- /dev/null +++ b/apps/libs/process/ssh.py @@ -0,0 +1,26 @@ +import logging + +import psutil +from psutil import NoSuchProcess + +logger = logging.getLogger(__name__) + + +def _should_kill(process): + return process.pid != 1 and process.name() == 'ssh' + + +def kill_ansible_ssh_process(pid): + try: + process = psutil.Process(pid) + except NoSuchProcess as e: + logger.error(f"No such process: {e}") + return + + for child in process.children(recursive=True): + if not _should_kill(child): + return + try: + child.kill() + except Exception as e: + logger.error(f"Failed to kill process {child.pid}: {e}") diff --git a/apps/ops/ansible/__init__.py b/apps/ops/ansible/__init__.py index a175387eb..df052a0a0 100644 --- a/apps/ops/ansible/__init__.py +++ b/apps/ops/ansible/__init__.py @@ -2,5 +2,7 @@ from .callback import * from .inventory import * -from .runner import * +from .runners import * from .exceptions import * +from .runner import * +from .interface import * \ No newline at end of file diff --git a/apps/ops/ansible/callback.py b/apps/ops/ansible/callback.py index 80094332e..c11941be4 100644 --- a/apps/ops/ansible/callback.py +++ b/apps/ops/ansible/callback.py @@ -165,4 +165,4 @@ class DefaultCallback: def write_pid(self, pid): pid_filepath = os.path.join(self.private_data_dir, 'local.pid') with open(pid_filepath, 'w') as f: - f.write(str(pid)) \ No newline at end of file + f.write(str(pid)) diff --git a/apps/ops/ansible/cleaner.py b/apps/ops/ansible/cleaner.py index 5a2f29d2f..309ea5df4 100644 --- a/apps/ops/ansible/cleaner.py +++ b/apps/ops/ansible/cleaner.py @@ -4,6 +4,20 @@ from functools import wraps from settings.api import settings +__all__ = ["WorkPostRunCleaner", "cleanup_post_run"] + + +class WorkPostRunCleaner: + @property + def clean_dir(self): + raise NotImplemented + + def clean_post_run(self): + if settings.DEBUG_DEV: + return + if self.clean_dir and os.path.exists(self.clean_dir): + shutil.rmtree(self.clean_dir) + def cleanup_post_run(func): def get_instance(*args): @@ -22,15 +36,3 @@ def cleanup_post_run(func): instance.clean_post_run() return wrapper - - -class WorkPostRunCleaner: - @property - def clean_dir(self): - raise NotImplemented - - def clean_post_run(self): - if settings.DEBUG_DEV: - return - if self.clean_dir and os.path.exists(self.clean_dir): - shutil.rmtree(self.clean_dir) diff --git a/apps/ops/ansible/exception.py b/apps/ops/ansible/exception.py new file mode 100644 index 000000000..dc5926f05 --- /dev/null +++ b/apps/ops/ansible/exception.py @@ -0,0 +1,5 @@ +__all__ = ['CommandInBlackListException'] + + +class CommandInBlackListException(Exception): + pass diff --git a/apps/ops/ansible/interface.py b/apps/ops/ansible/interface.py new file mode 100644 index 000000000..065e80abc --- /dev/null +++ b/apps/ops/ansible/interface.py @@ -0,0 +1,46 @@ +from django.conf import settings +from django.utils.functional import LazyObject + +from ops.ansible import AnsibleReceptorRunner, AnsibleNativeRunner +from ops.ansible.runners.base import BaseRunner + +__all__ = ['interface'] + + +class _LazyRunnerInterface(LazyObject): + + def _setup(self): + self._wrapped = self.make_interface() + + @staticmethod + def make_interface(): + runner_type = AnsibleReceptorRunner \ + if settings.ANSIBLE_RECEPTOR_ENABLED else AnsibleNativeRunner + gateway_host = settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST \ + if settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST else '127.0.0.1' + return RunnerInterface(runner_type=runner_type, gateway_proxy_host=gateway_host) + + +interface = _LazyRunnerInterface() + + +class RunnerInterface: + def __init__(self, runner_type, gateway_proxy_host='127.0.0.1'): + if not issubclass(runner_type, BaseRunner): + raise TypeError(f'{runner_type} can not cast to {BaseRunner}') + self._runner_type = runner_type + self._gateway_proxy_host = gateway_proxy_host + + def get_gateway_proxy_host(self): + return self._gateway_proxy_host + + def get_runner_type(self): + return self._runner_type + + def kill_process(self, pid): + return self._runner_type.kill_precess(pid) + + def run(self, **kwargs): + runner_type = self.get_runner_type() + runner = runner_type(**kwargs) + return runner.run() diff --git a/apps/ops/ansible/receptor/receptor_runner.py b/apps/ops/ansible/receptor/receptor_runner.py deleted file mode 100644 index de07859ae..000000000 --- a/apps/ops/ansible/receptor/receptor_runner.py +++ /dev/null @@ -1,147 +0,0 @@ -import concurrent.futures -import os -import queue -import socket - -from django.conf import settings -import ansible_runner -from receptorctl import ReceptorControl - -from ops.ansible.cleaner import WorkPostRunCleaner, cleanup_post_run - - -class ReceptorCtl: - @property - def ctl(self): - return ReceptorControl(settings.ANSIBLE_RECEPTOR_SOCK_PATH) - - def cancel(self, unit_id): - return self.ctl.simple_command("work cancel {}".format(unit_id)) - - def nodes(self): - return self.ctl.simple_command("status").get("Advertisements", None) - - def submit_work(self, - worktype, - payload, - node=None, - tlsclient=None, - ttl=None, - signwork=False, - params=None, ): - return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params) - - def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True): - return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile) - - def kill_process(self, pid): - submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid)) - unit_id = submit_result["unitid"] - result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True, - return_socket=True) - while not result_socket.close(): - buf = result_file.read() - if not buf: - break - print(buf.decode('utf8')) - - -receptor_ctl = ReceptorCtl() - - -def run(**kwargs): - receptor_runner = AnsibleReceptorRunner(**kwargs) - return receptor_runner.run() - - -class AnsibleReceptorRunner(WorkPostRunCleaner): - def __init__(self, **kwargs): - self.runner_params = kwargs - self.unit_id = None - self.clean_workspace = kwargs.pop("clean_workspace", True) - - def write_unit_id(self): - if not self.unit_id: - return - private_dir = self.runner_params.get("private_data_dir", "") - with open(os.path.join(private_dir, "local.unitid"), "w") as f: - f.write(self.unit_id) - f.flush() - - @property - def clean_dir(self): - if not self.clean_workspace: - return None - return self.runner_params.get("private_data_dir", None) - - @cleanup_post_run - def run(self): - input, output = socket.socketpair() - - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - transmitter_future = executor.submit(self.transmit, input) - result = receptor_ctl.submit_work(payload=output.makefile('rb'), - node='primary', worktype='ansible-runner') - input.close() - output.close() - - self.unit_id = result['unitid'] - self.write_unit_id() - - transmitter_future.result() - - result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) - - stdout_queue = queue.Queue() - - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - processor_future = executor.submit(self.processor, result_file, stdout_queue) - - while not processor_future.done() or \ - not stdout_queue.empty(): - msg = stdout_queue.get() - if msg is None: - break - print(msg) - - return processor_future.result() - - def transmit(self, _socket): - try: - ansible_runner.run( - streamer='transmit', - _output=_socket.makefile('wb'), - **self.runner_params - ) - finally: - _socket.shutdown(socket.SHUT_WR) - - def processor(self, _result_file, stdout_queue): - try: - original_event_handler = self.runner_params.pop("event_handler", None) - original_status_handler = self.runner_params.pop("status_handler", None) - - def event_handler(data, **kwargs): - stdout = data.get('stdout', '') - if stdout: - stdout_queue.put(stdout) - if original_event_handler: - original_event_handler(data, **kwargs) - - def status_handler(data, **kwargs): - private_data_dir = self.runner_params.get("private_data_dir", None) - if private_data_dir: - data["private_data_dir"] = private_data_dir - if original_status_handler: - original_status_handler(data, **kwargs) - - return ansible_runner.interface.run( - quite=True, - streamer='process', - _input=_result_file, - event_handler=event_handler, - status_handler=status_handler, - **self.runner_params, - ) - finally: - stdout_queue.put(None) diff --git a/apps/ops/ansible/runner.py b/apps/ops/ansible/runner.py index 1d808f196..6af2fffb4 100644 --- a/apps/ops/ansible/runner.py +++ b/apps/ops/ansible/runner.py @@ -1,43 +1,24 @@ -import logging import os import shutil import uuid -import ansible_runner from django.conf import settings from django.utils._os import safe_join -from django.utils.functional import LazyObject - +from .interface import interface from .callback import DefaultCallback -from .receptor import receptor_runner +from .exception import CommandInBlackListException from ..utils import get_ansible_log_verbosity -logger = logging.getLogger(__file__) - - -class CommandInBlackListException(Exception): - pass - - -class AnsibleWrappedRunner(LazyObject): - def _setup(self): - self._wrapped = self.get_runner() - - @staticmethod - def get_runner(): - if settings.ANSIBLE_RECEPTOR_ENABLE and settings.ANSIBLE_RECEPTOR_SOCK_PATH: - return receptor_runner - return ansible_runner - - -runner = AnsibleWrappedRunner() +__all__ = ['AdHocRunner', 'PlaybookRunner', 'SuperPlaybookRunner', 'UploadFileRunner'] class AdHocRunner: cmd_modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell') - def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/', extra_vars={}, + def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/', extra_vars=None, dry_run=False, timeout=-1): + if extra_vars is None: + extra_vars = {} self.id = uuid.uuid4() self.inventory = inventory self.pattern = pattern @@ -69,7 +50,7 @@ class AdHocRunner: if os.path.exists(private_env): shutil.rmtree(private_env) - runner.run( + interface.run( timeout=self.timeout if self.timeout > 0 else None, extravars=self.extra_vars, host_pattern=self.pattern, @@ -112,7 +93,7 @@ class PlaybookRunner: if os.path.exists(private_env): shutil.rmtree(private_env) - runner.run( + interface.run( private_data_dir=self.project_dir, inventory=self.inventory, playbook=self.playbook, @@ -144,7 +125,7 @@ class UploadFileRunner: def run(self, verbosity=0, **kwargs): verbosity = get_ansible_log_verbosity(verbosity) - runner.run( + interface.run( private_data_dir=self.project_dir, host_pattern="*", inventory=self.inventory, @@ -160,11 +141,3 @@ class UploadFileRunner: except OSError as e: print(f"del upload tmp dir {self.src_paths} failed! {e}") return self.cb - - -class CommandRunner(AdHocRunner): - def __init__(self, inventory, command, pattern='*', project_dir='/tmp/'): - super().__init__(inventory, 'shell', command, pattern, project_dir) - - def run(self, verbosity=0, **kwargs): - return super().run(verbosity, **kwargs) diff --git a/apps/ops/ansible/runners/__init__.py b/apps/ops/ansible/runners/__init__.py new file mode 100644 index 000000000..155e1b8e1 --- /dev/null +++ b/apps/ops/ansible/runners/__init__.py @@ -0,0 +1,3 @@ +from .base import * +from .native import * +from .receptor import * diff --git a/apps/ops/ansible/runners/base.py b/apps/ops/ansible/runners/base.py new file mode 100644 index 000000000..2cadbb368 --- /dev/null +++ b/apps/ops/ansible/runners/base.py @@ -0,0 +1,42 @@ +from ops.ansible.cleaner import WorkPostRunCleaner, cleanup_post_run + + +class BaseRunner(WorkPostRunCleaner): + + def __init__(self, **kwargs): + self.runner_params = kwargs + self.clean_workspace = kwargs.pop("clean_workspace", True) + + @classmethod + def kill_precess(cls, pid): + return NotImplementedError + + @property + def clean_dir(self): + if not self.clean_workspace: + return None + return self.private_data_dir + + @property + def private_data_dir(self): + return self.runner_params.get('private_data_dir', None) + + def get_event_handler(self): + _event_handler = self.runner_params.pop("event_handler", None) + return _event_handler + + def get_status_handler(self): + _status_handler = self.runner_params.pop("status_handler", None) + + if not _status_handler: + return + + def _handler(data, **kwargs): + if self.private_data_dir: + data["private_data_dir"] = self.private_data_dir + _status_handler(data, **kwargs) + + return _handler + + def run(self): + raise NotImplementedError() diff --git a/apps/ops/ansible/runners/native.py b/apps/ops/ansible/runners/native.py new file mode 100644 index 000000000..00f541ae8 --- /dev/null +++ b/apps/ops/ansible/runners/native.py @@ -0,0 +1,24 @@ +import ansible_runner + +from libs.process.ssh import kill_ansible_ssh_process +from ops.ansible.cleaner import cleanup_post_run +from ops.ansible.runners.base import BaseRunner + +__all__ = ['AnsibleNativeRunner'] + + +class AnsibleNativeRunner(BaseRunner): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @classmethod + def kill_precess(cls, pid): + return kill_ansible_ssh_process(pid) + + @cleanup_post_run + def run(self): + ansible_runner.run( + event_handler=self.get_event_handler(), + status_handler=self.get_status_handler(), + **self.runner_params, + ) diff --git a/apps/ops/ansible/runners/receptor.py b/apps/ops/ansible/runners/receptor.py new file mode 100644 index 000000000..1cc1de12c --- /dev/null +++ b/apps/ops/ansible/runners/receptor.py @@ -0,0 +1,100 @@ +import concurrent.futures +import os +import queue +import socket + +import ansible_runner + +from ops.ansible.cleaner import cleanup_post_run +from ops.ansible.runners.receptorctl.receptorctl import ReceptorCtl +from ops.ansible.runners.base import BaseRunner + +__all__ = ['AnsibleReceptorRunner'] + +receptor_ctl = ReceptorCtl() + + +class AnsibleReceptorRunner(BaseRunner): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.unit_id = None + self.stdout_queue = None + + @classmethod + def kill_precess(cls, pid): + return receptor_ctl.kill_process(pid) + + def write_unit_id(self): + if not self.unit_id: + return + private_dir = self.runner_params.get("private_data_dir", "") + with open(os.path.join(private_dir, "local.unitid"), "w") as f: + f.write(self.unit_id) + f.flush() + + @cleanup_post_run + def run(self): + input, output = socket.socketpair() + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + transmitter_future = executor.submit(self.transmit, input) + result = receptor_ctl.submit_work(payload=output.makefile('rb'), + node='primary', worktype='ansible-runner') + + input.close() + output.close() + + self.unit_id = result['unitid'] + self.write_unit_id() + + transmitter_future.result() + + result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) + + self.stdout_queue = queue.Queue() + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + processor_future = executor.submit(self.processor, result_file) + + while not processor_future.done() or \ + not self.stdout_queue.empty(): + msg = self.stdout_queue.get() + if msg is None: + break + print(msg) + + return processor_future.result() + + def transmit(self, _socket): + try: + ansible_runner.run( + streamer='transmit', + _output=_socket.makefile('wb'), + **self.runner_params + ) + finally: + _socket.shutdown(socket.SHUT_WR) + + def get_event_handler(self): + _event_handler = super().get_event_handler() + + def _handler(data, **kwargs): + stdout = data.get('stdout', '') + if stdout: + self.stdout_queue.put(stdout) + _event_handler(data, **kwargs) + + return _handler + + def processor(self, _result_file): + try: + return ansible_runner.interface.run( + quite=True, + streamer='process', + _input=_result_file, + event_handler=self.get_event_handler(), + status_handler=self.get_status_handler(), + **self.runner_params, + ) + finally: + self.stdout_queue.put(None) diff --git a/apps/ops/ansible/runners/receptorctl/__init__.py b/apps/ops/ansible/runners/receptorctl/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/ops/ansible/runners/receptorctl/receptorctl.py b/apps/ops/ansible/runners/receptorctl/receptorctl.py new file mode 100644 index 000000000..0c3d44c10 --- /dev/null +++ b/apps/ops/ansible/runners/receptorctl/receptorctl.py @@ -0,0 +1,38 @@ +from django.conf import settings +from receptorctl import ReceptorControl + + +class ReceptorCtl: + @property + def ctl(self): + return ReceptorControl("tcp://{}".format(settings.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS)) + + def cancel(self, unit_id): + return self.ctl.simple_command("work cancel {}".format(unit_id)) + + def nodes(self): + return self.ctl.simple_command("status").get("Advertisements", None) + + def submit_work(self, + worktype, + payload, + node=None, + tlsclient=None, + ttl=None, + signwork=False, + params=None, ): + return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params) + + def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True): + return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile) + + def kill_process(self, pid): + submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid)) + unit_id = submit_result["unitid"] + result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True, + return_socket=True) + while not result_socket.close(): + buf = result_file.read() + if not buf: + break + print(buf.decode('utf8')) diff --git a/apps/ops/models/job.py b/apps/ops/models/job.py index e7245568b..f1fd8adb9 100644 --- a/apps/ops/models/job.py +++ b/apps/ops/models/job.py @@ -22,8 +22,10 @@ from acls.models import CommandFilterACL from assets.models import Asset from assets.automations.base.manager import SSHTunnelManager from common.db.encoder import ModelJSONFieldEncoder -from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException, UploadFileRunner -from ops.ansible.receptor import receptor_runner +from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, UploadFileRunner + +"""stop all ssh child processes of the given ansible process pid.""" +from ops.ansible.exception import CommandInBlackListException from ops.mixin import PeriodTaskModelMixin from ops.variables import * from ops.const import Types, RunasPolicies, JobStatus, JobModules diff --git a/apps/ops/signal_handlers.py b/apps/ops/signal_handlers.py index ad3428100..c070fd094 100644 --- a/apps/ops/signal_handlers.py +++ b/apps/ops/signal_handlers.py @@ -1,4 +1,3 @@ -import ast import json import time @@ -17,9 +16,9 @@ from common.signals import django_ready from common.utils.connection import RedisPubSub from jumpserver.utils import get_current_request from orgs.utils import get_current_org_id, set_current_org -from .ansible.receptor.receptor_runner import receptor_ctl from .celery import app from .models import CeleryTaskExecution, CeleryTask, Job +from .ansible.runner import interface logger = get_logger(__name__) @@ -167,7 +166,7 @@ def subscribe_stop_job_execution(sender, **kwargs): def on_stop(pid): logger.info(f"Stop job execution {pid} start") - receptor_ctl.kill_process(pid) + interface.kill_process(pid) job_execution_stop_pub_sub.subscribe(on_stop) diff --git a/receptor b/receptor index f9a17b711..d32f40889 100755 --- a/receptor +++ b/receptor @@ -9,8 +9,7 @@ import os import signal import tempfile -import psutil -from psutil import NoSuchProcess +from apps.libs.process.ssh import kill_ansible_ssh_process ANSIBLE_RUNNER_COMMAND = "ansible-runner" @@ -22,6 +21,8 @@ DEFAULT_SHARE_DIR = os.path.join(PROJECT_DIR, "data", "share") DEFAULT_ANSIBLE_MODULES_DIR = os.path.join(APPS_DIR, "libs", "ansible", "modules") DEFAULT_CONTROL_SOCK_PATH = os.path.join(DEFAULT_SHARE_DIR, "control.sock") +DEFAULT_TCP_LISTEN_ADDRESS = "0.0.0.0:7521" + logger = logging.getLogger(__name__) os.chdir(APPS_DIR) @@ -34,9 +35,10 @@ class ReceptorService: 'receptor', '--local-only', '--node', 'id=primary', + '--log-level', 'level=Error', '--control-service', 'service=control', - 'filename={}'.format(DEFAULT_CONTROL_SOCK_PATH), + 'tcplisten={}'.format(DEFAULT_TCP_LISTEN_ADDRESS), '--work-command', 'worktype={}'.format(ANSIBLE_RUNNER_COMMAND), 'command={}'.format(ANSIBLE_RUNNER_COMMAND), @@ -49,6 +51,7 @@ class ReceptorService: 'allowruntimeparams=true' ] + @staticmethod def before_start(): os.makedirs(os.path.join(DEFAULT_SHARE_DIR), exist_ok=True) @@ -141,29 +144,12 @@ def kill_progress_tree(pid=None): try: pid_input = input() pid = int(pid_input) + logger.info("progress {} will be kill".format(pid)) + kill_ansible_ssh_process(pid) except Exception as e: logger.error(e) return - logger.info("progress {} will be kill".format(pid)) - - try: - current_process = psutil.Process(pid) - except NoSuchProcess as e: - logger.error(e) - return - - children = current_process.children(recursive=True) - for child in children: - if child.pid == 1: - continue - if child.name() != 'ssh': - continue - try: - child.kill() - except Exception as e: - logger.error(e) - if __name__ == '__main__': parser = argparse.ArgumentParser(