mirror of https://github.com/jumpserver/jumpserver
Browse Source
* feat: 支持 ansible 沙盒运行 * feat: 修改 receptor sock 默认路径 * feat: 增加 adhoc 执行命令的 local connection 权限 --------- Co-authored-by: Aaron3S <chenyang@fit2cloud.com> Co-authored-by: Bai <baijiangjie@gmail.com>pull/12949/head^2
fit2bot
8 months ago
committed by
GitHub
12 changed files with 200 additions and 10 deletions
@ -0,0 +1,32 @@
|
||||
from .base import BaseService |
||||
from ..hands import * |
||||
|
||||
__all__ = ['ReceptorService'] |
||||
|
||||
ANSIBLE_RUNNER_COMMAND = "ansible-runner" |
||||
|
||||
|
||||
class ReceptorService(BaseService): |
||||
@property |
||||
def cmd(self): |
||||
print("\n- Start Receptor as Ansible Runner Sandbox") |
||||
|
||||
cmd = [ |
||||
'receptor', |
||||
'--local-only', |
||||
'--node', 'id=primary', |
||||
'--control-service', |
||||
'service=control', |
||||
'filename=/opt/jumpserver/data/share/control.sock', |
||||
'--work-command', |
||||
'worktype={}'.format(ANSIBLE_RUNNER_COMMAND), |
||||
'command={}'.format(ANSIBLE_RUNNER_COMMAND), |
||||
'params=worker', |
||||
'allowruntimeparams=true' |
||||
] |
||||
|
||||
return cmd |
||||
|
||||
@property |
||||
def cwd(self): |
||||
return APPS_DIR |
@ -0,0 +1,89 @@
|
||||
import concurrent.futures |
||||
import queue |
||||
import socket |
||||
|
||||
import ansible_runner |
||||
from receptorctl import ReceptorControl |
||||
|
||||
receptor_ctl = ReceptorControl('control.sock') |
||||
|
||||
|
||||
def init_receptor_ctl(sock_path): |
||||
global receptor_ctl |
||||
receptor_ctl = ReceptorControl(sock_path) |
||||
|
||||
|
||||
def nodes(): |
||||
return receptor_ctl.simple_command("status").get("Advertisements", None) |
||||
|
||||
|
||||
def run(**kwargs): |
||||
receptor_runner = AnsibleReceptorRunner(**kwargs) |
||||
return receptor_runner.run() |
||||
|
||||
|
||||
class AnsibleReceptorRunner: |
||||
def __init__(self, **kwargs): |
||||
self.runner_params = kwargs |
||||
self.unit_id = None |
||||
|
||||
def run(self): |
||||
input, output = socket.socketpair() |
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: |
||||
transmitter_future = executor.submit(self.transmit, input) |
||||
result = receptor_ctl.submit_work(payload=output.makefile('rb'), |
||||
node='primary', worktype='ansible-runner') |
||||
input.close() |
||||
output.close() |
||||
|
||||
self.unit_id = result['unitid'] |
||||
|
||||
transmitter_future.result() |
||||
|
||||
result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) |
||||
|
||||
stdout_queue = queue.Queue() |
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: |
||||
processor_future = executor.submit(self.processor, result_file, stdout_queue) |
||||
|
||||
while not processor_future.done() or \ |
||||
not stdout_queue.empty(): |
||||
msg = stdout_queue.get() |
||||
if msg is None: |
||||
break |
||||
print(msg) |
||||
|
||||
return processor_future.result() |
||||
|
||||
def transmit(self, _socket): |
||||
try: |
||||
ansible_runner.run( |
||||
streamer='transmit', |
||||
_output=_socket.makefile('wb'), |
||||
**self.runner_params |
||||
) |
||||
finally: |
||||
_socket.shutdown(socket.SHUT_WR) |
||||
|
||||
def processor(self, _result_file, stdout_queue): |
||||
try: |
||||
original_handler = self.runner_params.pop("event_handler", None) |
||||
|
||||
def event_handler(data, **kwargs): |
||||
stdout = data.get('stdout', '') |
||||
if stdout: |
||||
stdout_queue.put(stdout) |
||||
if original_handler: |
||||
original_handler(data, **kwargs) |
||||
|
||||
return ansible_runner.interface.run( |
||||
quite=True, |
||||
streamer='process', |
||||
_input=_result_file, |
||||
event_handler=event_handler, |
||||
**self.runner_params, |
||||
) |
||||
finally: |
||||
stdout_queue.put(None) |
Loading…
Reference in new issue