diff --git a/Dockerfile-ce b/Dockerfile-ce index 850c56218..18ac4c7b1 100644 --- a/Dockerfile-ce +++ b/Dockerfile-ce @@ -109,7 +109,15 @@ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked,id=core-apt \ && echo "no" | dpkg-reconfigure dash \ && echo "zh_CN.UTF-8" | dpkg-reconfigure locales \ && sed -i "s@# export @export @g" ~/.bashrc \ - && sed -i "s@# alias @alias @g" ~/.bashrc + && sed -i "s@# alias @alias @g" ~/.bashrc \ + +ARG RECEPTOR_VERSION=v1.4.5 +RUN set -ex \ + && wget -O /opt/receptor.tar.gz https://github.com/ansible/receptor/releases/download/${RECEPTOR_VERSION}/receptor_${RECEPTOR_VERSION/v/}_linux_${TARGETARCH}.tar.gz \ + && tar -xf /opt/receptor.tar.gz -C /usr/local/bin/ \ + && chown root:root /usr/local/bin/receptor \ + && chmod 755 /usr/local/bin/receptor \ + && rm -f /opt/receptor.tar.gz COPY --from=stage-2 /opt/py3 /opt/py3 COPY --from=stage-1 /opt/jumpserver/release/jumpserver /opt/jumpserver diff --git a/apps/common/management/commands/services/command.py b/apps/common/management/commands/services/command.py index 487d9ce5f..35658d7ec 100644 --- a/apps/common/management/commands/services/command.py +++ b/apps/common/management/commands/services/command.py @@ -17,6 +17,7 @@ class Services(TextChoices): web = 'web', 'web' celery = 'celery', 'celery' task = 'task', 'task' + receptor = 'receptor', 'receptor' all = 'all', 'all' @classmethod @@ -27,7 +28,8 @@ class Services(TextChoices): cls.flower: services.FlowerService, cls.celery_default: services.CeleryDefaultService, cls.celery_ansible: services.CeleryAnsibleService, - cls.beat: services.BeatService + cls.beat: services.BeatService, + cls.receptor: services.ReceptorService } return services_map.get(name) @@ -43,9 +45,13 @@ class Services(TextChoices): def task_services(cls): return cls.celery_services() + [cls.beat] + @classmethod + def receptor_services(cls): + return [cls.receptor] + @classmethod def all_services(cls): - return cls.web_services() + cls.task_services() + return cls.web_services() + cls.task_services() + cls.receptor_services() @classmethod def export_services_values(cls): diff --git a/apps/common/management/commands/services/services/__init__.py b/apps/common/management/commands/services/services/__init__.py index 35329a7d4..85a4ead63 100644 --- a/apps/common/management/commands/services/services/__init__.py +++ b/apps/common/management/commands/services/services/__init__.py @@ -3,3 +3,4 @@ from .celery_ansible import * from .celery_default import * from .flower import * from .gunicorn import * +from .receptor import * diff --git a/apps/common/management/commands/services/services/receptor.py b/apps/common/management/commands/services/services/receptor.py new file mode 100644 index 000000000..38976343f --- /dev/null +++ b/apps/common/management/commands/services/services/receptor.py @@ -0,0 +1,32 @@ +from .base import BaseService +from ..hands import * + +__all__ = ['ReceptorService'] + +ANSIBLE_RUNNER_COMMAND = "ansible-runner" + + +class ReceptorService(BaseService): + @property + def cmd(self): + print("\n- Start Receptor as Ansible Runner Sandbox") + + cmd = [ + 'receptor', + '--local-only', + '--node', 'id=primary', + '--control-service', + 'service=control', + 'filename=/opt/jumpserver/data/share/control.sock', + '--work-command', + 'worktype={}'.format(ANSIBLE_RUNNER_COMMAND), + 'command={}'.format(ANSIBLE_RUNNER_COMMAND), + 'params=worker', + 'allowruntimeparams=true' + ] + + return cmd + + @property + def cwd(self): + return APPS_DIR diff --git a/apps/jumpserver/conf.py b/apps/jumpserver/conf.py index 26d813b90..8af9c9a6d 100644 --- a/apps/jumpserver/conf.py +++ b/apps/jumpserver/conf.py @@ -613,7 +613,11 @@ class Config(dict): 'FILE_UPLOAD_SIZE_LIMIT_MB': 200, - 'TICKET_APPLY_ASSET_SCOPE': 'all' + 'TICKET_APPLY_ASSET_SCOPE': 'all', + + # Ansible Receptor + 'ANSIBLE_RECEPTOR_ENABLE': True, + 'ANSIBLE_RECEPTOR_SOCK_PATH': '/opt/jumpserver/data/share/control.sock' } old_config_map = { diff --git a/apps/jumpserver/settings/custom.py b/apps/jumpserver/settings/custom.py index b564cba25..453648240 100644 --- a/apps/jumpserver/settings/custom.py +++ b/apps/jumpserver/settings/custom.py @@ -230,3 +230,7 @@ VIRTUAL_APP_ENABLED = CONFIG.VIRTUAL_APP_ENABLED FILE_UPLOAD_SIZE_LIMIT_MB = CONFIG.FILE_UPLOAD_SIZE_LIMIT_MB TICKET_APPLY_ASSET_SCOPE = CONFIG.TICKET_APPLY_ASSET_SCOPE + +# Ansible Receptor +ANSIBLE_RECEPTOR_ENABLE = CONFIG.ANSIBLE_RECEPTOR_ENABLE +ANSIBLE_RECEPTOR_SOCK_PATH = CONFIG.ANSIBLE_RECEPTOR_SOCK_PATH diff --git a/apps/ops/ansible/receptor/__init__.py b/apps/ops/ansible/receptor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/ops/ansible/receptor/receptor_runner.py b/apps/ops/ansible/receptor/receptor_runner.py new file mode 100644 index 000000000..3f4893a97 --- /dev/null +++ b/apps/ops/ansible/receptor/receptor_runner.py @@ -0,0 +1,89 @@ +import concurrent.futures +import queue +import socket + +import ansible_runner +from receptorctl import ReceptorControl + +receptor_ctl = ReceptorControl('control.sock') + + +def init_receptor_ctl(sock_path): + global receptor_ctl + receptor_ctl = ReceptorControl(sock_path) + + +def nodes(): + return receptor_ctl.simple_command("status").get("Advertisements", None) + + +def run(**kwargs): + receptor_runner = AnsibleReceptorRunner(**kwargs) + return receptor_runner.run() + + +class AnsibleReceptorRunner: + def __init__(self, **kwargs): + self.runner_params = kwargs + self.unit_id = None + + def run(self): + input, output = socket.socketpair() + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + transmitter_future = executor.submit(self.transmit, input) + result = receptor_ctl.submit_work(payload=output.makefile('rb'), + node='primary', worktype='ansible-runner') + input.close() + output.close() + + self.unit_id = result['unitid'] + + transmitter_future.result() + + result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) + + stdout_queue = queue.Queue() + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + processor_future = executor.submit(self.processor, result_file, stdout_queue) + + while not processor_future.done() or \ + not stdout_queue.empty(): + msg = stdout_queue.get() + if msg is None: + break + print(msg) + + return processor_future.result() + + def transmit(self, _socket): + try: + ansible_runner.run( + streamer='transmit', + _output=_socket.makefile('wb'), + **self.runner_params + ) + finally: + _socket.shutdown(socket.SHUT_WR) + + def processor(self, _result_file, stdout_queue): + try: + original_handler = self.runner_params.pop("event_handler", None) + + def event_handler(data, **kwargs): + stdout = data.get('stdout', '') + if stdout: + stdout_queue.put(stdout) + if original_handler: + original_handler(data, **kwargs) + + return ansible_runner.interface.run( + quite=True, + streamer='process', + _input=_result_file, + event_handler=event_handler, + **self.runner_params, + ) + finally: + stdout_queue.put(None) diff --git a/apps/ops/ansible/runner.py b/apps/ops/ansible/runner.py index 7dd40b390..d7800aecf 100644 --- a/apps/ops/ansible/runner.py +++ b/apps/ops/ansible/runner.py @@ -1,3 +1,4 @@ +import logging import os import shutil import uuid @@ -5,15 +6,35 @@ import uuid import ansible_runner from django.conf import settings from django.utils._os import safe_join +from django.utils.functional import LazyObject from .callback import DefaultCallback +from .receptor import receptor_runner from ..utils import get_ansible_log_verbosity +logger = logging.getLogger(__file__) + class CommandInBlackListException(Exception): pass +class AnsibleWrappedRunner(LazyObject): + def _setup(self): + self._wrapped = self.get_runner() + + @staticmethod + def get_runner(): + if settings.ANSIBLE_RECEPTOR_ENABLE and settings.ANSIBLE_RECEPTOR_SOCK_PATH: + logger.info("Ansible receptor enabled, run ansible task via receptor") + receptor_runner.init_receptor_ctl(settings.ANSIBLE_RECEPTOR_SOCK_PATH) + return receptor_runner + return ansible_runner + + +runner = AnsibleWrappedRunner() + + class AdHocRunner: cmd_modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell') @@ -30,6 +51,8 @@ class AdHocRunner: self.extra_vars = extra_vars self.dry_run = dry_run self.timeout = timeout + # enable local connection + self.extra_vars.update({"LOCAL_CONNECTION_ENABLED": "1"}) def check_module(self): if self.module not in self.cmd_modules_choices: @@ -48,7 +71,7 @@ class AdHocRunner: if os.path.exists(private_env): shutil.rmtree(private_env) - ansible_runner.run( + runner.run( timeout=self.timeout if self.timeout > 0 else None, extravars=self.extra_vars, host_pattern=self.pattern, @@ -81,7 +104,7 @@ class PlaybookRunner: if os.path.exists(private_env): shutil.rmtree(private_env) - ansible_runner.run( + runner.run( private_data_dir=self.project_dir, inventory=self.inventory, playbook=self.playbook, @@ -112,7 +135,7 @@ class UploadFileRunner: def run(self, verbosity=0, **kwargs): verbosity = get_ansible_log_verbosity(verbosity) - ansible_runner.run( + runner.run( host_pattern="*", inventory=self.inventory, module='copy', diff --git a/jms b/jms index 0b2cf94d0..ff96723b7 100755 --- a/jms +++ b/jms @@ -188,7 +188,7 @@ if __name__ == '__main__': ) parser.add_argument( "services", type=str, default='all', nargs="*", - choices=("all", "web", "task"), + choices=("all", "web", "task", "receptor"), help="The service to start", ) parser.add_argument('-d', '--daemon', nargs="?", const=True) diff --git a/poetry.lock b/poetry.lock index d81b5bee4..858c0795e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "adal" @@ -4166,6 +4166,7 @@ files = [ {file = "msgpack-1.0.8-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fbb160554e319f7b22ecf530a80a3ff496d38e8e07ae763b9e82fadfe96f273"}, {file = "msgpack-1.0.8-cp39-cp39-win32.whl", hash = "sha256:f9af38a89b6a5c04b7d18c492c8ccf2aee7048aff1ce8437c4683bb5a1df893d"}, {file = "msgpack-1.0.8-cp39-cp39-win_amd64.whl", hash = "sha256:ed59dd52075f8fc91da6053b12e8c89e37aa043f8986efd89e61fae69dc1b011"}, + {file = "msgpack-1.0.8-py3-none-any.whl", hash = "sha256:24f727df1e20b9876fa6e95f840a2a2651e34c0ad147676356f4bf5fbb0206ca"}, {file = "msgpack-1.0.8.tar.gz", hash = "sha256:95c02b0e27e706e48d0e5426d1710ca78e0f0628d6e89d5b5a5b91a5f12274f3"}, ] @@ -6399,6 +6400,27 @@ type = "legacy" url = "https://pypi.tuna.tsinghua.edu.cn/simple" reference = "tsinghua" +[[package]] +name = "receptorctl" +version = "1.4.5" +description = "\"Receptorctl is a front-end CLI and importable Python library that interacts with Receptor over its control socket interface.\"" +optional = false +python-versions = "*" +files = [ + {file = "receptorctl-1.4.5-py3-none-any.whl", hash = "sha256:e12a6b6f703c1bc7ec13bbf46adf1c3c0e5785af4136fc776fbc68b349a6dc8c"}, + {file = "receptorctl-1.4.5.tar.gz", hash = "sha256:d1765a1d68e82d101d500385be8830c647c14dba783c5c01a915015dc8484a30"}, +] + +[package.dependencies] +click = "*" +python-dateutil = "*" +pyyaml = "*" + +[package.source] +type = "legacy" +url = "https://pypi.tuna.tsinghua.edu.cn/simple" +reference = "tsinghua" + [[package]] name = "redis" version = "5.0.3" @@ -7887,4 +7909,4 @@ reference = "tsinghua" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "07285278374c49f35ed9f53742b82c07a9f515ccf2d4f0db8738ea67c0adca85" +content-hash = "1a8e1ea4acc0bfded274acb3b0faa65693a067bf280affaa195fe5cfb970777a" diff --git a/pyproject.toml b/pyproject.toml index 1df83239a..040b7a3c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -149,6 +149,7 @@ xlsxwriter = "^3.1.9" exchangelib = "^5.1.0" xmlsec = "^1.3.13" lxml = "4.9.3" +receptorctl = "^1.4.5" [tool.poetry.group.xpack.dependencies]