Merge pull request #13094 from jumpserver/dev

v3.10.9 (dev to master)
pull/13097/head
Bryan 2024-04-22 19:39:53 +08:00 committed by GitHub
commit 82e7f020ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 403 additions and 254 deletions

View File

@ -12,7 +12,8 @@ from sshtunnel import SSHTunnelForwarder
from assets.automations.methods import platform_automation_methods
from common.utils import get_logger, lazyproperty, is_openssh_format_key, ssh_pubkey_gen
from ops.ansible import JMSInventory, SuperPlaybookRunner, DefaultCallback
from ops.ansible import JMSInventory, DefaultCallback, SuperPlaybookRunner
from ops.ansible.interface import interface
logger = get_logger(__name__)
@ -54,7 +55,9 @@ class SSHTunnelManager:
not_valid.append(k)
else:
local_bind_port = server.local_bind_port
host['ansible_host'] = jms_asset['address'] = host['login_host'] = 'jms_celery'
host['ansible_host'] = jms_asset['address'] = host[
'login_host'] = interface.get_gateway_proxy_host()
host['ansible_port'] = jms_asset['port'] = host['login_port'] = local_bind_port
servers.append(server)

View File

@ -1,8 +1,7 @@
from django.db import models
from django.utils.translation import gettext_lazy as _
from assets.const import AllTypes
from assets.const import Protocol
from assets.const import AllTypes, Category, Protocol
from common.db.fields import JsonDictTextField
from common.db.models import JMSBaseModel
@ -119,6 +118,15 @@ class Platform(LabeledMixin, JMSBaseModel):
)
return linux.id
def is_huawei(self):
if self.category != Category.DEVICE:
return False
if 'huawei' in self.name.lower():
return True
if '华为' in self.name:
return True
return False
def __str__(self):
return self.name

View File

@ -88,8 +88,7 @@ class KubernetesClient:
try:
data = getattr(self, func_name)(*args)
except Exception as e:
logger.error(e)
raise e
logger.error(f'K8S tree get {tp} error: {e}')
if self.server:
self.server.stop()

View File

@ -9,7 +9,7 @@ from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.serializers import ValidationError
from common.exceptions import UnexpectError
from common.exceptions import JMSException, UnexpectError
from common.utils import get_logger
from users.models.user import User
from .. import errors
@ -61,6 +61,8 @@ class MFASendCodeApi(AuthMixin, CreateAPIView):
try:
mfa_backend.send_challenge()
except JMSException:
raise
except Exception as e:
raise UnexpectError(str(e))

View File

@ -204,12 +204,14 @@ class ConnectionToken(JMSOrgBaseModel):
host, account, lock_key = bulk_get(host_account, ('host', 'account', 'lock_key'))
gateway = host.domain.select_gateway() if host.domain else None
platform = host.platform
data = {
'id': lock_key,
'applet': applet,
'host': host,
'gateway': gateway,
'platform': platform,
'account': account,
'remote_app_option': self.get_remote_app_option()
}

View File

@ -161,6 +161,7 @@ class ConnectTokenAppletOptionSerializer(serializers.Serializer):
host = _ConnectionTokenAssetSerializer(read_only=True)
account = _ConnectionTokenAccountSerializer(read_only=True)
gateway = _ConnectionTokenGatewaySerializer(read_only=True)
platform = _ConnectionTokenPlatformSerializer(read_only=True)
remote_app_option = serializers.JSONField(read_only=True)

View File

@ -1,6 +1,6 @@
from contextlib import contextmanager
from django.db import connections
from django.db import connections, transaction
from django.utils.encoding import force_str
from common.utils import get_logger, signer, crypto
@ -58,6 +58,17 @@ def safe_db_connection():
close_old_connections()
@contextmanager
def open_db_connection(alias='default'):
connection = transaction.get_connection(alias)
try:
connection.connect()
with transaction.atomic():
yield connection
finally:
connection.close()
class Encryptor:
def __init__(self, value):
self.value = force_str(value)

View File

@ -12,6 +12,7 @@ from functools import wraps
from django.db import transaction
from .utils import logger
from .db.utils import open_db_connection
def on_transaction_commit(func):
@ -146,7 +147,9 @@ ignore_err_exceptions = (
def _run_func_with_org(key, org, func, *args, **kwargs):
from orgs.utils import set_current_org
try:
with transaction.atomic():
with open_db_connection() as conn:
# 保证执行时使用的是新的 connection 数据库连接
# 避免出现 MySQL server has gone away 的情况
set_current_org(org)
func(*args, **kwargs)
except Exception as e:

View File

@ -30,14 +30,14 @@ class SendAndVerifyCodeUtil(object):
self.other_args = kwargs
def gen_and_send_async(self):
ttl = self.__ttl()
if ttl > 0:
logger.warning('Send sms too frequently, delay {}'.format(ttl))
raise CodeSendTooFrequently(ttl)
return send_async.apply_async(kwargs={"sender": self}, priority=100)
def gen_and_send(self):
ttl = self.__ttl()
if ttl > 0:
logger.error('Send sms too frequently, delay {}'.format(ttl))
raise CodeSendTooFrequently(ttl)
try:
if not self.code:
self.code = self.__generate()

View File

@ -617,8 +617,10 @@ class Config(dict):
'TICKET_APPLY_ASSET_SCOPE': 'all',
# Ansible Receptor
'ANSIBLE_RECEPTOR_ENABLE': True,
'ANSIBLE_RECEPTOR_SOCK_PATH': '{}/data/share/control.sock'.format(PROJECT_DIR)
'ANSIBLE_RECEPTOR_ENABLED': True,
'ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST': 'jms_celery',
'ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS': 'jms_receptor:7521'
}
old_config_map = {

View File

@ -232,5 +232,6 @@ FILE_UPLOAD_SIZE_LIMIT_MB = CONFIG.FILE_UPLOAD_SIZE_LIMIT_MB
TICKET_APPLY_ASSET_SCOPE = CONFIG.TICKET_APPLY_ASSET_SCOPE
# Ansible Receptor
ANSIBLE_RECEPTOR_ENABLE = CONFIG.ANSIBLE_RECEPTOR_ENABLE
ANSIBLE_RECEPTOR_SOCK_PATH = CONFIG.ANSIBLE_RECEPTOR_SOCK_PATH
ANSIBLE_RECEPTOR_ENABLED = CONFIG.ANSIBLE_RECEPTOR_ENABLED
ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST = CONFIG.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST
ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS = CONFIG.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS

View File

@ -17,7 +17,7 @@ LOGGING = {
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
'format': '%(levelname)s %(asctime)s %(pathname)s:%(lineno)d %(message)s'
},
'main': {
'datefmt': '%Y-%m-%d %H:%M:%S',

26
apps/libs/process/ssh.py Normal file
View File

@ -0,0 +1,26 @@
import logging
import psutil
from psutil import NoSuchProcess
logger = logging.getLogger(__name__)
def _should_kill(process):
return process.pid != 1 and process.name() == 'ssh'
def kill_ansible_ssh_process(pid):
try:
process = psutil.Process(pid)
except NoSuchProcess as e:
logger.error(f"No such process: {e}")
return
for child in process.children(recursive=True):
if not _should_kill(child):
return
try:
child.kill()
except Exception as e:
logger.error(f"Failed to kill process {child.pid}: {e}")

View File

@ -2,5 +2,7 @@
from .callback import *
from .inventory import *
from .runner import *
from .runners import *
from .exceptions import *
from .runner import *
from .interface import *

View File

@ -165,4 +165,4 @@ class DefaultCallback:
def write_pid(self, pid):
pid_filepath = os.path.join(self.private_data_dir, 'local.pid')
with open(pid_filepath, 'w') as f:
f.write(str(pid))
f.write(str(pid))

View File

@ -4,6 +4,20 @@ from functools import wraps
from settings.api import settings
__all__ = ["WorkPostRunCleaner", "cleanup_post_run"]
class WorkPostRunCleaner:
@property
def clean_dir(self):
raise NotImplemented
def clean_post_run(self):
if settings.DEBUG_DEV:
return
if self.clean_dir and os.path.exists(self.clean_dir):
shutil.rmtree(self.clean_dir)
def cleanup_post_run(func):
def get_instance(*args):
@ -22,15 +36,3 @@ def cleanup_post_run(func):
instance.clean_post_run()
return wrapper
class WorkPostRunCleaner:
@property
def clean_dir(self):
raise NotImplemented
def clean_post_run(self):
if settings.DEBUG_DEV:
return
if self.clean_dir and os.path.exists(self.clean_dir):
shutil.rmtree(self.clean_dir)

View File

@ -0,0 +1,5 @@
__all__ = ['CommandInBlackListException']
class CommandInBlackListException(Exception):
pass

View File

@ -0,0 +1,46 @@
from django.conf import settings
from django.utils.functional import LazyObject
from ops.ansible import AnsibleReceptorRunner, AnsibleNativeRunner
from ops.ansible.runners.base import BaseRunner
__all__ = ['interface']
class _LazyRunnerInterface(LazyObject):
def _setup(self):
self._wrapped = self.make_interface()
@staticmethod
def make_interface():
runner_type = AnsibleReceptorRunner \
if settings.ANSIBLE_RECEPTOR_ENABLED else AnsibleNativeRunner
gateway_host = settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST \
if settings.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST else '127.0.0.1'
return RunnerInterface(runner_type=runner_type, gateway_proxy_host=gateway_host)
interface = _LazyRunnerInterface()
class RunnerInterface:
def __init__(self, runner_type, gateway_proxy_host='127.0.0.1'):
if not issubclass(runner_type, BaseRunner):
raise TypeError(f'{runner_type} can not cast to {BaseRunner}')
self._runner_type = runner_type
self._gateway_proxy_host = gateway_proxy_host
def get_gateway_proxy_host(self):
return self._gateway_proxy_host
def get_runner_type(self):
return self._runner_type
def kill_process(self, pid):
return self._runner_type.kill_precess(pid)
def run(self, **kwargs):
runner_type = self.get_runner_type()
runner = runner_type(**kwargs)
return runner.run()

View File

@ -5,6 +5,7 @@ import re
from collections import defaultdict
from django.utils.translation import gettext as _
from assets.const.category import Category
__all__ = ['JMSInventory']
@ -124,7 +125,7 @@ class JMSInventory:
else:
host.update(self.make_account_ansible_vars(account, path_dir))
if platform.name == 'Huawei':
if platform.is_huawei():
host['ansible_connection'] = 'network_cli'
host['ansible_network_os'] = 'asa'

View File

@ -1,147 +0,0 @@
import concurrent.futures
import os
import queue
import socket
from django.conf import settings
import ansible_runner
from receptorctl import ReceptorControl
from ops.ansible.cleaner import WorkPostRunCleaner, cleanup_post_run
class ReceptorCtl:
@property
def ctl(self):
return ReceptorControl(settings.ANSIBLE_RECEPTOR_SOCK_PATH)
def cancel(self, unit_id):
return self.ctl.simple_command("work cancel {}".format(unit_id))
def nodes(self):
return self.ctl.simple_command("status").get("Advertisements", None)
def submit_work(self,
worktype,
payload,
node=None,
tlsclient=None,
ttl=None,
signwork=False,
params=None, ):
return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params)
def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True):
return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile)
def kill_process(self, pid):
submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid))
unit_id = submit_result["unitid"]
result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True,
return_socket=True)
while not result_socket.close():
buf = result_file.read()
if not buf:
break
print(buf.decode('utf8'))
receptor_ctl = ReceptorCtl()
def run(**kwargs):
receptor_runner = AnsibleReceptorRunner(**kwargs)
return receptor_runner.run()
class AnsibleReceptorRunner(WorkPostRunCleaner):
def __init__(self, **kwargs):
self.runner_params = kwargs
self.unit_id = None
self.clean_workspace = kwargs.pop("clean_workspace", True)
def write_unit_id(self):
if not self.unit_id:
return
private_dir = self.runner_params.get("private_data_dir", "")
with open(os.path.join(private_dir, "local.unitid"), "w") as f:
f.write(self.unit_id)
f.flush()
@property
def clean_dir(self):
if not self.clean_workspace:
return None
return self.runner_params.get("private_data_dir", None)
@cleanup_post_run
def run(self):
input, output = socket.socketpair()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
transmitter_future = executor.submit(self.transmit, input)
result = receptor_ctl.submit_work(payload=output.makefile('rb'),
node='primary', worktype='ansible-runner')
input.close()
output.close()
self.unit_id = result['unitid']
self.write_unit_id()
transmitter_future.result()
result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
stdout_queue = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
processor_future = executor.submit(self.processor, result_file, stdout_queue)
while not processor_future.done() or \
not stdout_queue.empty():
msg = stdout_queue.get()
if msg is None:
break
print(msg)
return processor_future.result()
def transmit(self, _socket):
try:
ansible_runner.run(
streamer='transmit',
_output=_socket.makefile('wb'),
**self.runner_params
)
finally:
_socket.shutdown(socket.SHUT_WR)
def processor(self, _result_file, stdout_queue):
try:
original_event_handler = self.runner_params.pop("event_handler", None)
original_status_handler = self.runner_params.pop("status_handler", None)
def event_handler(data, **kwargs):
stdout = data.get('stdout', '')
if stdout:
stdout_queue.put(stdout)
if original_event_handler:
original_event_handler(data, **kwargs)
def status_handler(data, **kwargs):
private_data_dir = self.runner_params.get("private_data_dir", None)
if private_data_dir:
data["private_data_dir"] = private_data_dir
if original_status_handler:
original_status_handler(data, **kwargs)
return ansible_runner.interface.run(
quite=True,
streamer='process',
_input=_result_file,
event_handler=event_handler,
status_handler=status_handler,
**self.runner_params,
)
finally:
stdout_queue.put(None)

View File

@ -1,43 +1,24 @@
import logging
import os
import shutil
import uuid
import ansible_runner
from django.conf import settings
from django.utils._os import safe_join
from django.utils.functional import LazyObject
from .interface import interface
from .callback import DefaultCallback
from .receptor import receptor_runner
from .exception import CommandInBlackListException
from ..utils import get_ansible_log_verbosity
logger = logging.getLogger(__file__)
class CommandInBlackListException(Exception):
pass
class AnsibleWrappedRunner(LazyObject):
def _setup(self):
self._wrapped = self.get_runner()
@staticmethod
def get_runner():
if settings.ANSIBLE_RECEPTOR_ENABLE and settings.ANSIBLE_RECEPTOR_SOCK_PATH:
return receptor_runner
return ansible_runner
runner = AnsibleWrappedRunner()
__all__ = ['AdHocRunner', 'PlaybookRunner', 'SuperPlaybookRunner', 'UploadFileRunner']
class AdHocRunner:
cmd_modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell')
def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/', extra_vars={},
def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/', extra_vars=None,
dry_run=False, timeout=-1):
if extra_vars is None:
extra_vars = {}
self.id = uuid.uuid4()
self.inventory = inventory
self.pattern = pattern
@ -69,7 +50,7 @@ class AdHocRunner:
if os.path.exists(private_env):
shutil.rmtree(private_env)
runner.run(
interface.run(
timeout=self.timeout if self.timeout > 0 else None,
extravars=self.extra_vars,
host_pattern=self.pattern,
@ -112,7 +93,7 @@ class PlaybookRunner:
if os.path.exists(private_env):
shutil.rmtree(private_env)
runner.run(
interface.run(
private_data_dir=self.project_dir,
inventory=self.inventory,
playbook=self.playbook,
@ -144,7 +125,7 @@ class UploadFileRunner:
def run(self, verbosity=0, **kwargs):
verbosity = get_ansible_log_verbosity(verbosity)
runner.run(
interface.run(
private_data_dir=self.project_dir,
host_pattern="*",
inventory=self.inventory,
@ -160,11 +141,3 @@ class UploadFileRunner:
except OSError as e:
print(f"del upload tmp dir {self.src_paths} failed! {e}")
return self.cb
class CommandRunner(AdHocRunner):
def __init__(self, inventory, command, pattern='*', project_dir='/tmp/'):
super().__init__(inventory, 'shell', command, pattern, project_dir)
def run(self, verbosity=0, **kwargs):
return super().run(verbosity, **kwargs)

View File

@ -0,0 +1,3 @@
from .base import *
from .native import *
from .receptor import *

View File

@ -0,0 +1,42 @@
from ops.ansible.cleaner import WorkPostRunCleaner, cleanup_post_run
class BaseRunner(WorkPostRunCleaner):
def __init__(self, **kwargs):
self.runner_params = kwargs
self.clean_workspace = kwargs.pop("clean_workspace", True)
@classmethod
def kill_precess(cls, pid):
return NotImplementedError
@property
def clean_dir(self):
if not self.clean_workspace:
return None
return self.private_data_dir
@property
def private_data_dir(self):
return self.runner_params.get('private_data_dir', None)
def get_event_handler(self):
_event_handler = self.runner_params.pop("event_handler", None)
return _event_handler
def get_status_handler(self):
_status_handler = self.runner_params.pop("status_handler", None)
if not _status_handler:
return
def _handler(data, **kwargs):
if self.private_data_dir:
data["private_data_dir"] = self.private_data_dir
_status_handler(data, **kwargs)
return _handler
def run(self):
raise NotImplementedError()

View File

@ -0,0 +1,24 @@
import ansible_runner
from libs.process.ssh import kill_ansible_ssh_process
from ops.ansible.cleaner import cleanup_post_run
from ops.ansible.runners.base import BaseRunner
__all__ = ['AnsibleNativeRunner']
class AnsibleNativeRunner(BaseRunner):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@classmethod
def kill_precess(cls, pid):
return kill_ansible_ssh_process(pid)
@cleanup_post_run
def run(self):
ansible_runner.run(
event_handler=self.get_event_handler(),
status_handler=self.get_status_handler(),
**self.runner_params,
)

View File

@ -0,0 +1,100 @@
import concurrent.futures
import os
import queue
import socket
import ansible_runner
from ops.ansible.cleaner import cleanup_post_run
from ops.ansible.runners.receptorctl.receptorctl import ReceptorCtl
from ops.ansible.runners.base import BaseRunner
__all__ = ['AnsibleReceptorRunner']
receptor_ctl = ReceptorCtl()
class AnsibleReceptorRunner(BaseRunner):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.unit_id = None
self.stdout_queue = None
@classmethod
def kill_precess(cls, pid):
return receptor_ctl.kill_process(pid)
def write_unit_id(self):
if not self.unit_id:
return
private_dir = self.runner_params.get("private_data_dir", "")
with open(os.path.join(private_dir, "local.unitid"), "w") as f:
f.write(self.unit_id)
f.flush()
@cleanup_post_run
def run(self):
input, output = socket.socketpair()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
transmitter_future = executor.submit(self.transmit, input)
result = receptor_ctl.submit_work(payload=output.makefile('rb'),
node='primary', worktype='ansible-runner')
input.close()
output.close()
self.unit_id = result['unitid']
self.write_unit_id()
transmitter_future.result()
result_file = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
self.stdout_queue = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
processor_future = executor.submit(self.processor, result_file)
while not processor_future.done() or \
not self.stdout_queue.empty():
msg = self.stdout_queue.get()
if msg is None:
break
print(msg)
return processor_future.result()
def transmit(self, _socket):
try:
ansible_runner.run(
streamer='transmit',
_output=_socket.makefile('wb'),
**self.runner_params
)
finally:
_socket.shutdown(socket.SHUT_WR)
def get_event_handler(self):
_event_handler = super().get_event_handler()
def _handler(data, **kwargs):
stdout = data.get('stdout', '')
if stdout:
self.stdout_queue.put(stdout)
_event_handler(data, **kwargs)
return _handler
def processor(self, _result_file):
try:
return ansible_runner.interface.run(
quite=True,
streamer='process',
_input=_result_file,
event_handler=self.get_event_handler(),
status_handler=self.get_status_handler(),
**self.runner_params,
)
finally:
self.stdout_queue.put(None)

View File

@ -0,0 +1,38 @@
from django.conf import settings
from receptorctl import ReceptorControl
class ReceptorCtl:
@property
def ctl(self):
return ReceptorControl("tcp://{}".format(settings.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS))
def cancel(self, unit_id):
return self.ctl.simple_command("work cancel {}".format(unit_id))
def nodes(self):
return self.ctl.simple_command("status").get("Advertisements", None)
def submit_work(self,
worktype,
payload,
node=None,
tlsclient=None,
ttl=None,
signwork=False,
params=None, ):
return self.ctl.submit_work(worktype, payload, node, tlsclient, ttl, signwork, params)
def get_work_results(self, unit_id, startpos=0, return_socket=False, return_sockfile=True):
return self.ctl.get_work_results(unit_id, startpos, return_socket, return_sockfile)
def kill_process(self, pid):
submit_result = self.submit_work(worktype="kill", node="primary", payload=str(pid))
unit_id = submit_result["unitid"]
result_socket, result_file = self.get_work_results(unit_id=unit_id, return_sockfile=True,
return_socket=True)
while not result_socket.close():
buf = result_file.read()
if not buf:
break
print(buf.decode('utf8'))

View File

@ -304,6 +304,6 @@ class UsernameHintsAPI(APIView):
.filter(username__icontains=query) \
.filter(asset__in=assets) \
.values('username') \
.annotate(total=Count('username', distinct=True)) \
.order_by('total', '-username')[:10]
.annotate(total=Count('username')) \
.order_by('-total', '-username')[:10]
return Response(data=top_accounts)

View File

@ -22,8 +22,10 @@ from acls.models import CommandFilterACL
from assets.models import Asset
from assets.automations.base.manager import SSHTunnelManager
from common.db.encoder import ModelJSONFieldEncoder
from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException, UploadFileRunner
from ops.ansible.receptor import receptor_runner
from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, UploadFileRunner
"""stop all ssh child processes of the given ansible process pid."""
from ops.ansible.exception import CommandInBlackListException
from ops.mixin import PeriodTaskModelMixin
from ops.variables import *
from ops.const import Types, RunasPolicies, JobStatus, JobModules

View File

@ -1,4 +1,3 @@
import ast
import json
import time
@ -17,9 +16,9 @@ from common.signals import django_ready
from common.utils.connection import RedisPubSub
from jumpserver.utils import get_current_request
from orgs.utils import get_current_org_id, set_current_org
from .ansible.receptor.receptor_runner import receptor_ctl
from .celery import app
from .models import CeleryTaskExecution, CeleryTask, Job
from .ansible.runner import interface
logger = get_logger(__name__)
@ -134,9 +133,10 @@ def task_sent_handler(headers=None, body=None, **kwargs):
args, kwargs, __ = body
try:
args = list(args)
args = json.loads(json.dumps(list(args), cls=JSONEncoder))
kwargs = json.loads(json.dumps(kwargs, cls=JSONEncoder))
except Exception as e:
logger.error('Parse task args or kwargs error (Need handle): {}'.format(e))
args = []
kwargs = {}
@ -151,11 +151,13 @@ def task_sent_handler(headers=None, body=None, **kwargs):
request = get_current_request()
if request and request.user.is_authenticated:
data['creator'] = request.user
try:
CeleryTaskExecution.objects.create(**data)
except Exception as e:
logger.error(e)
CeleryTask.objects.filter(name=task).update(date_last_publish=timezone.now())
with transaction.atomic():
try:
CeleryTaskExecution.objects.create(**data)
except Exception as e:
logger.error('Create celery task execution error: {}'.format(e))
CeleryTask.objects.filter(name=task).update(date_last_publish=timezone.now())
@receiver(django_ready)
@ -164,7 +166,7 @@ def subscribe_stop_job_execution(sender, **kwargs):
def on_stop(pid):
logger.info(f"Stop job execution {pid} start")
receptor_ctl.kill_process(pid)
interface.kill_process(pid)
job_execution_stop_pub_sub.subscribe(on_stop)

View File

@ -87,6 +87,8 @@ class OrgResourceStatisticsRefreshUtil:
if not cache_field_name:
return
org = getattr(instance, 'org', None)
if not org:
return
cache_field_name = tuple(cache_field_name)
cls.refresh_org_fields.delay(org_fields=((org, cache_field_name),))

View File

@ -42,7 +42,10 @@
$.fn.select2.defaults.set('language', getUserLang())
const md = window.markdownit();
const markdownContent = document.querySelector('script[type="text/markdown"]').textContent;
document.getElementById('markdown-output').innerHTML = md.render(markdownContent);
const markdownRef = document.getElementById('markdown-output')
if (markdownRef) {
markdownRef.innerHTML = md.render(markdownContent);
}
});
</script>

View File

@ -118,11 +118,18 @@
})
}
function onError (responseText, responseJson, status) {
setTimeout(function () {
toastr.error(responseJson.detail);
});
};
requestApi({
url: url,
method: "POST",
body: JSON.stringify(data),
success: onSuccess,
error: onError,
flash_message: false
})
}

View File

@ -70,7 +70,7 @@ class SessionCommandSerializerMixin(serializers.Serializer):
id = serializers.UUIDField(read_only=True)
# 限制 64 字符,不能直接迁移成 128 字符,命令表数据量会比较大
account = serializers.CharField(label=_("Account "))
output = serializers.CharField(max_length=2048, allow_blank=True, label=_("Output"))
output = serializers.CharField(allow_blank=True, label=_("Output"))
timestamp = serializers.IntegerField(label=_('Timestamp'))
timestamp_display = serializers.DateTimeField(read_only=True, label=_('Datetime'))
remote_addr = serializers.CharField(read_only=True, label=_('Remote Address'))

View File

@ -9,8 +9,7 @@ import os
import signal
import tempfile
import psutil
from psutil import NoSuchProcess
from apps.libs.process.ssh import kill_ansible_ssh_process
ANSIBLE_RUNNER_COMMAND = "ansible-runner"
@ -22,6 +21,8 @@ DEFAULT_SHARE_DIR = os.path.join(PROJECT_DIR, "data", "share")
DEFAULT_ANSIBLE_MODULES_DIR = os.path.join(APPS_DIR, "libs", "ansible", "modules")
DEFAULT_CONTROL_SOCK_PATH = os.path.join(DEFAULT_SHARE_DIR, "control.sock")
DEFAULT_TCP_LISTEN_ADDRESS = "0.0.0.0:7521"
logger = logging.getLogger(__name__)
os.chdir(APPS_DIR)
@ -34,9 +35,10 @@ class ReceptorService:
'receptor',
'--local-only',
'--node', 'id=primary',
'--log-level', 'level=Error',
'--control-service',
'service=control',
'filename={}'.format(DEFAULT_CONTROL_SOCK_PATH),
'tcplisten={}'.format(DEFAULT_TCP_LISTEN_ADDRESS),
'--work-command',
'worktype={}'.format(ANSIBLE_RUNNER_COMMAND),
'command={}'.format(ANSIBLE_RUNNER_COMMAND),
@ -49,6 +51,7 @@ class ReceptorService:
'allowruntimeparams=true'
]
@staticmethod
def before_start():
os.makedirs(os.path.join(DEFAULT_SHARE_DIR), exist_ok=True)
@ -141,29 +144,12 @@ def kill_progress_tree(pid=None):
try:
pid_input = input()
pid = int(pid_input)
logger.info("progress {} will be kill".format(pid))
kill_ansible_ssh_process(pid)
except Exception as e:
logger.error(e)
return
logger.info("progress {} will be kill".format(pid))
try:
current_process = psutil.Process(pid)
except NoSuchProcess as e:
logger.error(e)
return
children = current_process.children(recursive=True)
for child in children:
if child.pid == 1:
continue
if child.name() != 'ssh':
continue
try:
child.kill()
except Exception as e:
logger.error(e)
if __name__ == '__main__':
parser = argparse.ArgumentParser(