mirror of https://github.com/jumpserver/jumpserver
perf: remove ansible receptcel
parent
657f7f822b
commit
828582333d
|
@ -1,7 +1,7 @@
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.utils.functional import LazyObject
|
from django.utils.functional import LazyObject
|
||||||
|
|
||||||
from ops.ansible import AnsibleReceptorRunner, AnsibleNativeRunner
|
from ops.ansible import AnsibleNativeRunner
|
||||||
from ops.ansible.runners.base import BaseRunner
|
from ops.ansible.runners.base import BaseRunner
|
||||||
|
|
||||||
__all__ = ['interface']
|
__all__ = ['interface']
|
||||||
|
@ -14,8 +14,7 @@ class _LazyRunnerInterface(LazyObject):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_interface():
|
def make_interface():
|
||||||
runner_type = AnsibleReceptorRunner \
|
runner_type = AnsibleNativeRunner
|
||||||
if settings.RECEPTOR_ENABLED else AnsibleNativeRunner
|
|
||||||
gateway_host = settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST \
|
gateway_host = settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST \
|
||||||
if settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST else '127.0.0.1'
|
if settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST else '127.0.0.1'
|
||||||
return RunnerInterface(runner_type=runner_type, gateway_proxy_host=gateway_host)
|
return RunnerInterface(runner_type=runner_type, gateway_proxy_host=gateway_host)
|
||||||
|
|
|
@ -1,3 +1,2 @@
|
||||||
from .base import *
|
from .base import *
|
||||||
from .native import *
|
from .native import *
|
||||||
from .receptor import *
|
|
||||||
|
|
|
@ -1,100 +0,0 @@
|
||||||
import concurrent.futures
|
|
||||||
import os
|
|
||||||
import queue
|
|
||||||
import socket
|
|
||||||
|
|
||||||
import ansible_runner
|
|
||||||
|
|
||||||
from ops.ansible.cleaner import cleanup_post_run
|
|
||||||
from ops.ansible.runners.receptorctl.receptorctl import ReceptorCtl
|
|
||||||
from ops.ansible.runners.base import BaseRunner
|
|
||||||
|
|
||||||
__all__ = ['AnsibleReceptorRunner']
|
|
||||||
|
|
||||||
receptor_ctl = ReceptorCtl()
|
|
||||||
|
|
||||||
|
|
||||||
class AnsibleReceptorRunner(BaseRunner):
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
super().__init__(**kwargs)
|
|
||||||
self.unit_id = None
|
|
||||||
self.stdout_queue = None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def kill_precess(cls, pid):
|
|
||||||
return receptor_ctl.kill_process(pid)
|
|
||||||
|
|
||||||
def write_unit_id(self):
|
|
||||||
if not self.unit_id:
|
|
||||||
return
|
|
||||||
private_dir = self.runner_params.get("private_data_dir", "")
|
|
||||||
with open(os.path.join(private_dir, "local.unitid"), "w") as f:
|
|
||||||
f.write(self.unit_id)
|
|
||||||
f.flush()
|
|
||||||
|
|
||||||
@cleanup_post_run
|
|
||||||
def run(self):
|
|
||||||
input, output = socket.socketpair()
|
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
|
||||||
transmitter_future = executor.submit(self.transmit, input)
|
|
||||||
result = receptor_ctl.submit_work(payload=output.makefile('rb'),
|
|
||||||
node='primary', worktype='ansible-runner')
|
|
||||||
|
|
||||||
input.close()
|
|
||||||
output.close()
|
|
||||||
|
|
||||||
self.unit_id = result['unitid']
|
|
||||||
self.write_unit_id()
|
|
||||||
|
|
||||||
transmitter_future.result()
|
|
||||||
|
|
||||||
result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
|
|
||||||
|
|
||||||
self.stdout_queue = queue.Queue()
|
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
|
||||||
processor_future = executor.submit(self.processor, result_file)
|
|
||||||
|
|
||||||
while not processor_future.done() or \
|
|
||||||
not self.stdout_queue.empty():
|
|
||||||
msg = self.stdout_queue.get()
|
|
||||||
if msg is None:
|
|
||||||
break
|
|
||||||
print(msg)
|
|
||||||
|
|
||||||
return processor_future.result()
|
|
||||||
|
|
||||||
def transmit(self, _socket):
|
|
||||||
try:
|
|
||||||
ansible_runner.run(
|
|
||||||
streamer='transmit',
|
|
||||||
_output=_socket.makefile('wb'),
|
|
||||||
**self.runner_params
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
_socket.shutdown(socket.SHUT_WR)
|
|
||||||
|
|
||||||
def get_event_handler(self):
|
|
||||||
_event_handler = super().get_event_handler()
|
|
||||||
|
|
||||||
def _handler(data, **kwargs):
|
|
||||||
stdout = data.get('stdout', '')
|
|
||||||
if stdout:
|
|
||||||
self.stdout_queue.put(stdout)
|
|
||||||
_event_handler(data, **kwargs)
|
|
||||||
|
|
||||||
return _handler
|
|
||||||
|
|
||||||
def processor(self, _result_file):
|
|
||||||
try:
|
|
||||||
return ansible_runner.interface.run(
|
|
||||||
quite=True,
|
|
||||||
streamer='process',
|
|
||||||
_input=_result_file,
|
|
||||||
event_handler=self.get_event_handler(),
|
|
||||||
status_handler=self.get_status_handler(),
|
|
||||||
**self.runner_params,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
self.stdout_queue.put(None)
|
|
|
@ -1,38 +0,0 @@
|
||||||
from django.conf import settings
|
|
||||||
from receptorctl import ReceptorControl
|
|
||||||
|
|
||||||
|
|
||||||
class ReceptorCtl:
|
|
||||||
@property
|
|
||||||
def ctl(self):
|
|
||||||
return ReceptorControl("tcp://{}".format(settings.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS))
|
|
||||||
|
|
||||||
def cancel(self, unit_id):
|
|
||||||
return self.ctl.simple_command("work cancel {}".format(unit_id))
|
|
||||||
|
|
||||||
def nodes(self):
|
|
||||||
return self.ctl.simple_command("status").get("Advertisements", None)
|
|
||||||
|
|
||||||
def submit_work(self,
|
|
||||||
worktype,
|
|
||||||
payload,
|
|
||||||
node=None,
|
|
||||||
tlsclient=None,
|
|
||||||
ttl=None,
|
|
||||||
signwork=False,
|
|
||||||
params=None, ):
|
|
||||||
return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params)
|
|
||||||
|
|
||||||
def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True):
|
|
||||||
return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile)
|
|
||||||
|
|
||||||
def kill_process(self, pid):
|
|
||||||
submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid))
|
|
||||||
unit_id = submit_result["unitid"]
|
|
||||||
result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True,
|
|
||||||
return_socket=True)
|
|
||||||
while not result_socket.close():
|
|
||||||
buf = result_file.read()
|
|
||||||
if not buf:
|
|
||||||
break
|
|
||||||
print(buf.decode('utf8'))
|
|
|
@ -52,28 +52,27 @@ p {
|
||||||
<div class="group">
|
<div class="group">
|
||||||
<h2>JumpServer {% trans 'Offline video player' %} v0.1.9</h2>
|
<h2>JumpServer {% trans 'Offline video player' %} v0.1.9</h2>
|
||||||
<ul>
|
<ul>
|
||||||
<li><a href="/download/public/JumpServer-Video-Player-{{ VIDEO_PLAYER_VERSION }}.dmg">jumpserver-video-player.dmg</a></li>
|
<li><a href="/download/public/JumpServer.Video.Player-{{ VIDEO_PLAYER_VERSION }}.dmg">jumpserver-video-player.dmg</a></li>
|
||||||
<li><a href="/download/public/JumpServer-Video-Player.Setup.{{ VIDEO_PLAYER_VERSION }}.exe">jumpserver-video-player.exe</a></li>
|
<li><a href="/download/public/JumpServer.Video.Player.Setup.{{ VIDEO_PLAYER_VERSION }}.exe">jumpserver-video-player.exe</a></li>
|
||||||
</ul>
|
</ul>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
<style>
|
<style>
|
||||||
ul {
|
ul {
|
||||||
list-style-type: disc;
|
list-style-type: disc;
|
||||||
}
|
}
|
||||||
|
|
||||||
li a {
|
li a {
|
||||||
color: blue;
|
color: blue;
|
||||||
}
|
}
|
||||||
|
|
||||||
.group {
|
.group {
|
||||||
padding-top: 40px;
|
padding-top: 40px;
|
||||||
}
|
}
|
||||||
|
|
||||||
.group ul {
|
.group ul {
|
||||||
padding-top: 10px;
|
padding-top: 10px;
|
||||||
}
|
}
|
||||||
</style>
|
</style>
|
||||||
|
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
Loading…
Reference in New Issue