diff --git a/apps/ops/celery/__init__.py b/apps/ops/celery/__init__.py index 0ded6bc52..cb7bdcb88 100644 --- a/apps/ops/celery/__init__.py +++ b/apps/ops/celery/__init__.py @@ -4,7 +4,6 @@ import os from kombu import Exchange, Queue from celery import Celery -from celery.schedules import crontab # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings') @@ -20,11 +19,7 @@ configs = {k: v for k, v in settings.__dict__.items() if k.startswith('CELERY')} configs["CELERY_QUEUES"] = [ Queue("celery", Exchange("celery"), routing_key="celery"), Queue("ansible", Exchange("ansible"), routing_key="ansible"), - Queue("celery_node_tree", Exchange("celery_node_tree"), routing_key="celery_node_tree") ] -configs["CELERY_ROUTES"] = { - "ops.tasks.run_ansible_task": {'exchange': 'ansible', 'routing_key': 'ansible'}, -} app.namespace = 'CELERY' app.conf.update(configs) diff --git a/apps/ops/celery/logger.py b/apps/ops/celery/logger.py index 1dd517200..46b7e626c 100644 --- a/apps/ops/celery/logger.py +++ b/apps/ops/celery/logger.py @@ -1,4 +1,5 @@ from logging import StreamHandler +from threading import get_ident from django.conf import settings from celery import current_task @@ -123,6 +124,32 @@ class CeleryTaskLoggerHandler(StreamHandler): pass +class CeleryThreadingLoggerHandler(CeleryTaskLoggerHandler): + @staticmethod + def get_current_thread_id(): + return str(get_ident()) + + def emit(self, record): + thread_id = self.get_current_thread_id() + try: + self.write_thread_task_log(thread_id, record) + self.flush() + except ValueError: + self.handleError(record) + + def write_thread_task_log(self, thread_id, msg): + pass + + def handle_task_start(self, task_id): + pass + + def handle_task_end(self, task_id): + pass + + def handleError(self, record) -> None: + pass + + class CeleryTaskMQLoggerHandler(CeleryTaskLoggerHandler): def __init__(self): self.producer = CeleryLoggerProducer() @@ -137,9 +164,9 @@ class CeleryTaskMQLoggerHandler(CeleryTaskLoggerHandler): class CeleryTaskFileHandler(CeleryTaskLoggerHandler): - def __init__(self): + def __init__(self, *args, **kwargs): self.f = None - super().__init__(stream=None) + super().__init__(*args, **kwargs) def emit(self, record): msg = self.format(record) @@ -158,3 +185,37 @@ class CeleryTaskFileHandler(CeleryTaskLoggerHandler): def handle_task_end(self, task_id): self.f and self.f.close() + + +class CeleryThreadTaskFileHandler(CeleryThreadingLoggerHandler): + def __init__(self, *args, **kwargs): + self.thread_id_fd_mapper = {} + self.task_id_thread_id_mapper = {} + super().__init__(*args, **kwargs) + + def write_thread_task_log(self, thread_id, record): + f = self.thread_id_fd_mapper.get(thread_id, None) + if not f: + raise ValueError('Not found thread task file') + msg = self.format(record) + f.write(msg) + f.write(self.terminator) + f.flush() + + def flush(self): + for f in self.thread_id_fd_mapper.values(): + f.flush() + + def handle_task_start(self, task_id): + log_path = get_celery_task_log_path(task_id) + thread_id = self.get_current_thread_id() + self.task_id_thread_id_mapper[task_id] = thread_id + f = open(log_path, 'a') + self.thread_id_fd_mapper[thread_id] = f + + def handle_task_end(self, task_id): + ident_id = self.task_id_thread_id_mapper.get(task_id, '') + f = self.thread_id_fd_mapper.pop(ident_id, None) + if f and not f.closed: + f.close() + self.task_id_thread_id_mapper.pop(task_id, None) diff --git a/apps/ops/celery/signal_handler.py b/apps/ops/celery/signal_handler.py index 5d5fc4227..5ca29f7ba 100644 --- a/apps/ops/celery/signal_handler.py +++ b/apps/ops/celery/signal_handler.py @@ -1,20 +1,17 @@ # -*- coding: utf-8 -*- # import logging -from django.dispatch import receiver from django.core.cache import cache from celery import subtask from celery.signals import ( worker_ready, worker_shutdown, after_setup_logger ) -from kombu.utils.encoding import safe_str from django_celery_beat.models import PeriodicTask from common.utils import get_logger -from common.signals import django_ready from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks -from .logger import CeleryTaskFileHandler +from .logger import CeleryThreadTaskFileHandler logger = get_logger(__file__) safe_str = lambda x: x @@ -47,7 +44,7 @@ def after_app_shutdown_periodic_tasks(sender=None, **kwargs): def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs): if not logger: return - task_handler = CeleryTaskFileHandler() + task_handler = CeleryThreadTaskFileHandler() task_handler.setLevel(loglevel) formatter = logging.Formatter(format) task_handler.setFormatter(formatter) diff --git a/jms b/jms index da671373c..8ba32460f 100755 --- a/jms +++ b/jms @@ -169,8 +169,7 @@ def is_running(s, unlink=True): def parse_service(s): web_services = ['gunicorn', 'flower', 'daphne'] celery_services = [ - "celery_ansible", "celery_default", "celery_node_tree", - "celery_check_asset_perm_expired", "celery_heavy_tasks" + "celery_ansible", "celery_default" ] task_services = celery_services + ['beat'] all_services = web_services + task_services @@ -227,27 +226,12 @@ def get_start_daphne_kwargs(): def get_start_celery_ansible_kwargs(): print("\n- Start Celery as Distributed Task Queue: Ansible") - return get_start_worker_kwargs('ansible', 4) + return get_start_worker_kwargs('ansible', 10) def get_start_celery_default_kwargs(): print("\n- Start Celery as Distributed Task Queue: Celery") - return get_start_worker_kwargs('celery', 4) - - -def get_start_celery_node_tree_kwargs(): - print("\n- Start Celery as Distributed Task Queue: NodeTree") - return get_start_worker_kwargs('node_tree', 2) - - -def get_start_celery_heavy_tasks_kwargs(): - print("\n- Start Celery as Distributed Task Queue: HeavyTasks") - return get_start_worker_kwargs('celery_heavy_tasks', 1) - - -def get_start_celery_check_asset_perm_expired_kwargs(): - print("\n- Start Celery as Distributed Task Queue: CheckAseetPermissionExpired") - return get_start_worker_kwargs('celery_check_asset_perm_expired', 1) + return get_start_worker_kwargs('celery', 10) def get_start_worker_kwargs(queue, num): @@ -263,6 +247,7 @@ def get_start_worker_kwargs(queue, num): cmd = [ 'celery', 'worker', + '-P', 'threads', '-A', 'ops', '-l', 'INFO', '-c', str(num), @@ -385,9 +370,6 @@ def start_service(s): "gunicorn": get_start_gunicorn_kwargs, "celery_ansible": get_start_celery_ansible_kwargs, "celery_default": get_start_celery_default_kwargs, - "celery_node_tree": get_start_celery_node_tree_kwargs, - "celery_heavy_tasks": get_start_celery_heavy_tasks_kwargs, - "celery_check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs, "beat": get_start_beat_kwargs, "flower": get_start_flower_kwargs, "daphne": get_start_daphne_kwargs,