mirror of https://github.com/jumpserver/jumpserver
Merge pull request #5362 from jumpserver/pr@dev@fix_adhoc_excution
fix: 修复多个 AdHocExecution 在一个 celery task 执行时日志错误pull/5385/head
commit
7eedc0635e
|
@ -1,7 +1,3 @@
|
|||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from common.db.models import ChoiceSet
|
||||
|
||||
|
||||
ADMIN = 'Admin'
|
||||
USER = 'User'
|
||||
|
|
|
@ -125,3 +125,5 @@ CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO"
|
|||
# CELERY_WORKER_HIJACK_ROOT_LOGGER = True
|
||||
# CELERY_WORKER_MAX_TASKS_PER_CHILD = 40
|
||||
CELERY_TASK_SOFT_TIME_LIMIT = 3600
|
||||
|
||||
ANSIBLE_LOG_DIR = os.path.join(PROJECT_DIR, 'data', 'ansible')
|
||||
|
|
|
@ -60,6 +60,10 @@ class CallbackMixin:
|
|||
self.results_raw[t][host][task_name] = task_result
|
||||
self.clean_result(t, host, task_name, task_result)
|
||||
|
||||
def close(self):
|
||||
if hasattr(self._display, 'close'):
|
||||
self._display.close()
|
||||
|
||||
|
||||
class AdHocResultCallback(CallbackMixin, CallbackModule, CMDCallBackModule):
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
import errno
|
||||
import sys
|
||||
import os
|
||||
|
||||
from ansible.utils.display import Display
|
||||
from ansible.utils.color import stringc
|
||||
from ansible.utils.singleton import Singleton
|
||||
|
||||
from .utils import get_ansible_task_log_path
|
||||
|
||||
|
||||
class UnSingleton(Singleton):
|
||||
def __init__(cls, name, bases, dct):
|
||||
type.__init__(cls, name, bases, dct)
|
||||
|
||||
def __call__(cls, *args, **kwargs):
|
||||
return type.__call__(cls, *args, **kwargs)
|
||||
|
||||
|
||||
class AdHocDisplay(Display, metaclass=UnSingleton):
|
||||
def __init__(self, execution_id, verbosity=0):
|
||||
super().__init__(verbosity=verbosity)
|
||||
if execution_id:
|
||||
log_path = get_ansible_task_log_path(execution_id)
|
||||
else:
|
||||
log_path = os.devnull
|
||||
self.log_file = open(log_path, mode='a')
|
||||
|
||||
def close(self):
|
||||
self.log_file.close()
|
||||
|
||||
def set_cowsay_info(self):
|
||||
# 中断 cowsay 的测试,会频繁开启子进程
|
||||
return
|
||||
|
||||
def _write_to_screen(self, msg, stderr):
|
||||
if not stderr:
|
||||
screen = sys.stdout
|
||||
else:
|
||||
screen = sys.stderr
|
||||
|
||||
screen.write(msg)
|
||||
|
||||
try:
|
||||
screen.flush()
|
||||
except IOError as e:
|
||||
# Ignore EPIPE in case fileobj has been prematurely closed, eg.
|
||||
# when piping to "head -n1"
|
||||
if e.errno != errno.EPIPE:
|
||||
raise
|
||||
|
||||
def _write_to_log_file(self, msg):
|
||||
# 这里先不 flush,log 文件不需要那么及时。
|
||||
self.log_file.write(msg)
|
||||
|
||||
def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False):
|
||||
if color:
|
||||
msg = stringc(msg, color)
|
||||
|
||||
if not msg.endswith(u'\n'):
|
||||
msg2 = msg + u'\n'
|
||||
else:
|
||||
msg2 = msg
|
||||
|
||||
self._write_to_screen(msg2, stderr)
|
||||
self._write_to_log_file(msg2)
|
|
@ -1,6 +1,7 @@
|
|||
# ~*~ coding: utf-8 ~*~
|
||||
|
||||
import os
|
||||
|
||||
import shutil
|
||||
from collections import namedtuple
|
||||
|
||||
|
@ -18,6 +19,7 @@ from .callback import (
|
|||
)
|
||||
from common.utils import get_logger
|
||||
from .exceptions import AnsibleError
|
||||
from .display import AdHocDisplay
|
||||
|
||||
|
||||
__all__ = ["AdHocRunner", "PlayBookRunner", "CommandRunner"]
|
||||
|
@ -130,8 +132,8 @@ class AdHocRunner:
|
|||
loader=self.loader, inventory=self.inventory
|
||||
)
|
||||
|
||||
def get_result_callback(self, file_obj=None):
|
||||
return self.__class__.results_callback_class()
|
||||
def get_result_callback(self, execution_id=None):
|
||||
return self.__class__.results_callback_class(display=AdHocDisplay(execution_id))
|
||||
|
||||
@staticmethod
|
||||
def check_module_args(module_name, module_args=''):
|
||||
|
@ -189,7 +191,7 @@ class AdHocRunner:
|
|||
'ssh_args': '-C -o ControlMaster=no'
|
||||
}
|
||||
|
||||
def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no'):
|
||||
def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no', execution_id=None):
|
||||
"""
|
||||
:param tasks: [{'action': {'module': 'shell', 'args': 'ls'}, ...}, ]
|
||||
:param pattern: all, *, or others
|
||||
|
@ -198,7 +200,7 @@ class AdHocRunner:
|
|||
:return:
|
||||
"""
|
||||
self.check_pattern(pattern)
|
||||
self.results_callback = self.get_result_callback()
|
||||
self.results_callback = self.get_result_callback(execution_id)
|
||||
cleaned_tasks = self.clean_tasks(tasks)
|
||||
self.set_control_master_if_need(cleaned_tasks)
|
||||
context.CLIARGS = ImmutableDict(self.options)
|
||||
|
@ -233,6 +235,8 @@ class AdHocRunner:
|
|||
tqm.cleanup()
|
||||
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
|
||||
|
||||
self.results_callback.close()
|
||||
|
||||
|
||||
class CommandRunner(AdHocRunner):
|
||||
results_callback_class = CommandResultCallback
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
from django.conf import settings
|
||||
|
||||
|
||||
def get_ansible_task_log_path(task_id):
|
||||
from ops.utils import get_task_log_path
|
||||
return get_task_log_path(settings.ANSIBLE_LOG_DIR, task_id, level=3)
|
|
@ -15,10 +15,14 @@ from common.api import LogTailApi
|
|||
from ..models import CeleryTask
|
||||
from ..serializers import CeleryResultSerializer, CeleryPeriodTaskSerializer
|
||||
from ..celery.utils import get_celery_task_log_path
|
||||
from ..ansible.utils import get_ansible_task_log_path
|
||||
from common.mixins.api import CommonApiMixin
|
||||
|
||||
|
||||
__all__ = ['CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet']
|
||||
__all__ = [
|
||||
'CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet',
|
||||
'AnsibleTaskLogApi',
|
||||
]
|
||||
|
||||
|
||||
class CeleryTaskLogApi(LogTailApi):
|
||||
|
@ -57,6 +61,21 @@ class CeleryTaskLogApi(LogTailApi):
|
|||
return _('Waiting task start')
|
||||
|
||||
|
||||
class AnsibleTaskLogApi(LogTailApi):
|
||||
permission_classes = (IsValidUser,)
|
||||
|
||||
def get_log_path(self):
|
||||
new_path = get_ansible_task_log_path(self.kwargs.get('pk'))
|
||||
if new_path and os.path.isfile(new_path):
|
||||
return new_path
|
||||
|
||||
def get_no_file_message(self, request):
|
||||
if self.mark == 'undefined':
|
||||
return '.'
|
||||
else:
|
||||
return _('Waiting task start')
|
||||
|
||||
|
||||
class CeleryResultApi(generics.RetrieveAPIView):
|
||||
permission_classes = (IsValidUser,)
|
||||
serializer_class = CeleryResultSerializer
|
||||
|
|
|
@ -102,11 +102,8 @@ def get_celery_periodic_task(task_name):
|
|||
|
||||
|
||||
def get_celery_task_log_path(task_id):
|
||||
task_id = str(task_id)
|
||||
rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log')
|
||||
path = os.path.join(settings.CELERY_LOG_DIR, rel_path)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
return path
|
||||
from ops.utils import get_task_log_path
|
||||
return get_task_log_path(settings.CELERY_LOG_DIR, task_id)
|
||||
|
||||
|
||||
def get_celery_status():
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
# Generated by Django 3.1 on 2020-12-30 12:04
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('ops', '0018_auto_20200509_1434'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='adhocexecution',
|
||||
name='celery_task_id',
|
||||
field=models.UUIDField(default=None, null=True),
|
||||
),
|
||||
]
|
|
@ -179,13 +179,13 @@ class AdHoc(OrgModelMixin):
|
|||
|
||||
def run(self):
|
||||
try:
|
||||
hid = current_task.request.id
|
||||
if AdHocExecution.objects.filter(id=hid).exists():
|
||||
hid = uuid.uuid4()
|
||||
celery_task_id = current_task.request.id
|
||||
except AttributeError:
|
||||
hid = uuid.uuid4()
|
||||
celery_task_id = None
|
||||
|
||||
execution = AdHocExecution(
|
||||
id=hid, adhoc=self, task=self.task,
|
||||
celery_task_id=celery_task_id,
|
||||
adhoc=self, task=self.task,
|
||||
task_display=str(self.task)[:128],
|
||||
date_start=timezone.now(),
|
||||
hosts_amount=self.hosts.count(),
|
||||
|
@ -237,6 +237,7 @@ class AdHocExecution(OrgModelMixin):
|
|||
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
|
||||
task = models.ForeignKey(Task, related_name='execution', on_delete=models.SET_NULL, null=True)
|
||||
task_display = models.CharField(max_length=128, blank=True, default='', verbose_name=_("Task display"))
|
||||
celery_task_id = models.UUIDField(default=None, null=True)
|
||||
hosts_amount = models.IntegerField(default=0, verbose_name=_("Host amount"))
|
||||
adhoc = models.ForeignKey(AdHoc, related_name='execution', on_delete=models.SET_NULL, null=True)
|
||||
date_start = models.DateTimeField(auto_now_add=True, verbose_name=_('Start time'))
|
||||
|
@ -270,6 +271,7 @@ class AdHocExecution(OrgModelMixin):
|
|||
self.adhoc.tasks,
|
||||
self.adhoc.pattern,
|
||||
self.task.name,
|
||||
execution_id=self.id
|
||||
)
|
||||
return result.results_raw, result.results_summary
|
||||
except AnsibleError as e:
|
||||
|
|
|
@ -22,6 +22,8 @@ urlpatterns = [
|
|||
path('tasks/<uuid:pk>/run/', api.TaskRun.as_view(), name='task-run'),
|
||||
path('celery/task/<uuid:pk>/log/', api.CeleryTaskLogApi.as_view(), name='celery-task-log'),
|
||||
path('celery/task/<uuid:pk>/result/', api.CeleryResultApi.as_view(), name='celery-result'),
|
||||
|
||||
path('ansible/task/<uuid:pk>/log/', api.AnsibleTaskLogApi.as_view(), name='ansible-task-log'),
|
||||
]
|
||||
|
||||
urlpatterns += router.urls
|
||||
|
|
|
@ -5,5 +5,5 @@ from .. import ws
|
|||
app_name = 'ops'
|
||||
|
||||
urlpatterns = [
|
||||
path('ws/ops/tasks/log/', ws.CeleryLogWebsocket, name='task-log-ws'),
|
||||
path('ws/ops/tasks/log/', ws.TaskLogWebsocket, name='task-log-ws'),
|
||||
]
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
# ~*~ coding: utf-8 ~*~
|
||||
import os
|
||||
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from common.utils import get_logger, get_object_or_none
|
||||
|
@ -75,3 +77,10 @@ def send_server_performance_mail(path, usage, usages):
|
|||
send_mail_async(subject, message, recipient_list, html_message=message)
|
||||
|
||||
|
||||
def get_task_log_path(base_path, task_id, level=2):
|
||||
task_id = str(task_id)
|
||||
rel_path = os.path.join(*task_id[:level], task_id + '.log')
|
||||
path = os.path.join(base_path, rel_path)
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
return path
|
||||
|
||||
|
|
|
@ -6,25 +6,37 @@ import json
|
|||
from common.utils import get_logger
|
||||
|
||||
from .celery.utils import get_celery_task_log_path
|
||||
from .ansible.utils import get_ansible_task_log_path
|
||||
from channels.generic.websocket import JsonWebsocketConsumer
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class CeleryLogWebsocket(JsonWebsocketConsumer):
|
||||
class TaskLogWebsocket(JsonWebsocketConsumer):
|
||||
disconnected = False
|
||||
|
||||
log_types = {
|
||||
'celery': get_celery_task_log_path,
|
||||
'ansible': get_ansible_task_log_path
|
||||
}
|
||||
|
||||
def connect(self):
|
||||
self.accept()
|
||||
|
||||
def get_log_path(self, task_id):
|
||||
func = self.log_types.get(self.log_type)
|
||||
if func:
|
||||
return func(task_id)
|
||||
|
||||
def receive(self, text_data=None, bytes_data=None, **kwargs):
|
||||
data = json.loads(text_data)
|
||||
task_id = data.get("task")
|
||||
task_id = data.get('task')
|
||||
self.log_type = data.get('type', 'celery')
|
||||
if task_id:
|
||||
self.handle_task(task_id)
|
||||
|
||||
def wait_util_log_path_exist(self, task_id):
|
||||
log_path = get_celery_task_log_path(task_id)
|
||||
log_path = self.get_log_path(task_id)
|
||||
while not self.disconnected:
|
||||
if not os.path.exists(log_path):
|
||||
self.send_json({'message': '.', 'task': task_id})
|
||||
|
@ -70,5 +82,3 @@ class CeleryLogWebsocket(JsonWebsocketConsumer):
|
|||
def disconnect(self, close_code):
|
||||
self.disconnected = True
|
||||
self.close()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue