mirror of https://github.com/jumpserver/jumpserver
fix: 修复 receptor_ctl 的并发安全问题
parent
cade2cfa13
commit
138a3a2f46
|
@ -5,44 +5,48 @@ import socket
|
|||
|
||||
from django.conf import settings
|
||||
import ansible_runner
|
||||
from django.utils.functional import LazyObject
|
||||
from receptorctl import ReceptorControl
|
||||
|
||||
from ops.ansible.cleaner import WorkPostRunCleaner, cleanup_post_run
|
||||
|
||||
|
||||
class WarpedReceptorctl(LazyObject):
|
||||
def _setup(self):
|
||||
self._wrapped = self.get_receptorctl()
|
||||
|
||||
@staticmethod
|
||||
def get_receptorctl():
|
||||
class ReceptorCtl:
|
||||
@property
|
||||
def ctl(self):
|
||||
return ReceptorControl(settings.ANSIBLE_RECEPTOR_SOCK_PATH)
|
||||
|
||||
def cancel(self, unit_id):
|
||||
return self.ctl.simple_command("work cancel {}".format(unit_id))
|
||||
|
||||
receptor_ctl = WarpedReceptorctl()
|
||||
def nodes(self):
|
||||
return self.ctl.simple_command("status").get("Advertisements", None)
|
||||
|
||||
def submit_work(self,
|
||||
worktype,
|
||||
payload,
|
||||
node=None,
|
||||
tlsclient=None,
|
||||
ttl=None,
|
||||
signwork=False,
|
||||
params=None, ):
|
||||
return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params)
|
||||
|
||||
def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True):
|
||||
return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile)
|
||||
|
||||
def kill_process(self, pid):
|
||||
submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid))
|
||||
unit_id = submit_result["unitid"]
|
||||
result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True,
|
||||
return_socket=True)
|
||||
while not result_socket.close():
|
||||
buf = result_file.read()
|
||||
if not buf:
|
||||
break
|
||||
print(buf.decode('utf8'))
|
||||
|
||||
|
||||
def cancel(unit_id):
|
||||
return receptor_ctl.simple_command("work cancel {}".format(unit_id))
|
||||
|
||||
|
||||
def nodes():
|
||||
return receptor_ctl.simple_command("status").get("Advertisements", None)
|
||||
|
||||
|
||||
def kill_process(pid):
|
||||
submit_result = receptor_ctl.submit_work(worktype="kill", node="primary", payload=str(pid))
|
||||
|
||||
unit_id = submit_result["unitid"]
|
||||
|
||||
result_socket, result_file = receptor_ctl.get_work_results(unit_id=unit_id, return_sockfile=True,
|
||||
return_socket=True)
|
||||
while not result_socket.close():
|
||||
buf = result_file.read()
|
||||
if not buf:
|
||||
break
|
||||
print(buf.decode('utf8'))
|
||||
receptor_ctl = ReceptorCtl()
|
||||
|
||||
|
||||
def run(**kwargs):
|
||||
|
@ -74,7 +78,7 @@ class AnsibleReceptorRunner(WorkPostRunCleaner):
|
|||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
transmitter_future = executor.submit(self.transmit, input)
|
||||
result = receptor_ctl.submit_work(payload=output.makefile('rb'),
|
||||
node='primary', worktype='ansible-runner')
|
||||
node='primary', worktype='ansible-runner')
|
||||
input.close()
|
||||
output.close()
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ from common.signals import django_ready
|
|||
from common.utils.connection import RedisPubSub
|
||||
from jumpserver.utils import get_current_request
|
||||
from orgs.utils import get_current_org_id, set_current_org
|
||||
from .ansible.receptor.receptor_runner import kill_process
|
||||
from .ansible.receptor.receptor_runner import receptor_ctl
|
||||
from .celery import app
|
||||
from .models import CeleryTaskExecution, CeleryTask, Job
|
||||
|
||||
|
@ -159,7 +159,7 @@ def subscribe_stop_job_execution(sender, **kwargs):
|
|||
|
||||
def on_stop(pid):
|
||||
logger.info(f"Stop job execution {pid} start")
|
||||
kill_process(pid)
|
||||
receptor_ctl.kill_process(pid)
|
||||
|
||||
job_execution_stop_pub_sub.subscribe(on_stop)
|
||||
|
||||
|
|
Loading…
Reference in New Issue