refactor(celery): 重构celery,使用 threads 模型,避免 占用太多内存 (#5525)

* refactor(celery): 重构celery,使用 threads 模型,避免 占用太多内存

* fix: 修复无法关闭fd的bug

Co-authored-by: ibuler <ibuler@qq.com>
pull/5515/head^2
fit2bot 2021-01-25 19:34:41 +08:00 committed by GitHub
parent efb9f48c6f
commit 351d4d8123
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 34 deletions

View File

@ -4,7 +4,6 @@ import os
from kombu import Exchange, Queue
from celery import Celery
from celery.schedules import crontab
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings')
@ -20,11 +19,7 @@ configs = {k: v for k, v in settings.__dict__.items() if k.startswith('CELERY')}
configs["CELERY_QUEUES"] = [
Queue("celery", Exchange("celery"), routing_key="celery"),
Queue("ansible", Exchange("ansible"), routing_key="ansible"),
Queue("celery_node_tree", Exchange("celery_node_tree"), routing_key="celery_node_tree")
]
configs["CELERY_ROUTES"] = {
"ops.tasks.run_ansible_task": {'exchange': 'ansible', 'routing_key': 'ansible'},
}
app.namespace = 'CELERY'
app.conf.update(configs)

View File

@ -1,4 +1,5 @@
from logging import StreamHandler
from threading import get_ident
from django.conf import settings
from celery import current_task
@ -123,6 +124,32 @@ class CeleryTaskLoggerHandler(StreamHandler):
pass
class CeleryThreadingLoggerHandler(CeleryTaskLoggerHandler):
@staticmethod
def get_current_thread_id():
return str(get_ident())
def emit(self, record):
thread_id = self.get_current_thread_id()
try:
self.write_thread_task_log(thread_id, record)
self.flush()
except ValueError:
self.handleError(record)
def write_thread_task_log(self, thread_id, msg):
pass
def handle_task_start(self, task_id):
pass
def handle_task_end(self, task_id):
pass
def handleError(self, record) -> None:
pass
class CeleryTaskMQLoggerHandler(CeleryTaskLoggerHandler):
def __init__(self):
self.producer = CeleryLoggerProducer()
@ -137,9 +164,9 @@ class CeleryTaskMQLoggerHandler(CeleryTaskLoggerHandler):
class CeleryTaskFileHandler(CeleryTaskLoggerHandler):
def __init__(self):
def __init__(self, *args, **kwargs):
self.f = None
super().__init__(stream=None)
super().__init__(*args, **kwargs)
def emit(self, record):
msg = self.format(record)
@ -158,3 +185,37 @@ class CeleryTaskFileHandler(CeleryTaskLoggerHandler):
def handle_task_end(self, task_id):
self.f and self.f.close()
class CeleryThreadTaskFileHandler(CeleryThreadingLoggerHandler):
def __init__(self, *args, **kwargs):
self.thread_id_fd_mapper = {}
self.task_id_thread_id_mapper = {}
super().__init__(*args, **kwargs)
def write_thread_task_log(self, thread_id, record):
f = self.thread_id_fd_mapper.get(thread_id, None)
if not f:
raise ValueError('Not found thread task file')
msg = self.format(record)
f.write(msg)
f.write(self.terminator)
f.flush()
def flush(self):
for f in self.thread_id_fd_mapper.values():
f.flush()
def handle_task_start(self, task_id):
log_path = get_celery_task_log_path(task_id)
thread_id = self.get_current_thread_id()
self.task_id_thread_id_mapper[task_id] = thread_id
f = open(log_path, 'a')
self.thread_id_fd_mapper[thread_id] = f
def handle_task_end(self, task_id):
ident_id = self.task_id_thread_id_mapper.get(task_id, '')
f = self.thread_id_fd_mapper.pop(ident_id, None)
if f and not f.closed:
f.close()
self.task_id_thread_id_mapper.pop(task_id, None)

View File

@ -1,20 +1,17 @@
# -*- coding: utf-8 -*-
#
import logging
from django.dispatch import receiver
from django.core.cache import cache
from celery import subtask
from celery.signals import (
worker_ready, worker_shutdown, after_setup_logger
)
from kombu.utils.encoding import safe_str
from django_celery_beat.models import PeriodicTask
from common.utils import get_logger
from common.signals import django_ready
from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks
from .logger import CeleryTaskFileHandler
from .logger import CeleryThreadTaskFileHandler
logger = get_logger(__file__)
safe_str = lambda x: x
@ -47,7 +44,7 @@ def after_app_shutdown_periodic_tasks(sender=None, **kwargs):
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
if not logger:
return
task_handler = CeleryTaskFileHandler()
task_handler = CeleryThreadTaskFileHandler()
task_handler.setLevel(loglevel)
formatter = logging.Formatter(format)
task_handler.setFormatter(formatter)

26
jms
View File

@ -169,8 +169,7 @@ def is_running(s, unlink=True):
def parse_service(s):
web_services = ['gunicorn', 'flower', 'daphne']
celery_services = [
"celery_ansible", "celery_default", "celery_node_tree",
"celery_check_asset_perm_expired", "celery_heavy_tasks"
"celery_ansible", "celery_default"
]
task_services = celery_services + ['beat']
all_services = web_services + task_services
@ -227,27 +226,12 @@ def get_start_daphne_kwargs():
def get_start_celery_ansible_kwargs():
print("\n- Start Celery as Distributed Task Queue: Ansible")
return get_start_worker_kwargs('ansible', 4)
return get_start_worker_kwargs('ansible', 10)
def get_start_celery_default_kwargs():
print("\n- Start Celery as Distributed Task Queue: Celery")
return get_start_worker_kwargs('celery', 4)
def get_start_celery_node_tree_kwargs():
print("\n- Start Celery as Distributed Task Queue: NodeTree")
return get_start_worker_kwargs('node_tree', 2)
def get_start_celery_heavy_tasks_kwargs():
print("\n- Start Celery as Distributed Task Queue: HeavyTasks")
return get_start_worker_kwargs('celery_heavy_tasks', 1)
def get_start_celery_check_asset_perm_expired_kwargs():
print("\n- Start Celery as Distributed Task Queue: CheckAseetPermissionExpired")
return get_start_worker_kwargs('celery_check_asset_perm_expired', 1)
return get_start_worker_kwargs('celery', 10)
def get_start_worker_kwargs(queue, num):
@ -263,6 +247,7 @@ def get_start_worker_kwargs(queue, num):
cmd = [
'celery', 'worker',
'-P', 'threads',
'-A', 'ops',
'-l', 'INFO',
'-c', str(num),
@ -385,9 +370,6 @@ def start_service(s):
"gunicorn": get_start_gunicorn_kwargs,
"celery_ansible": get_start_celery_ansible_kwargs,
"celery_default": get_start_celery_default_kwargs,
"celery_node_tree": get_start_celery_node_tree_kwargs,
"celery_heavy_tasks": get_start_celery_heavy_tasks_kwargs,
"celery_check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs,
"beat": get_start_beat_kwargs,
"flower": get_start_flower_kwargs,
"daphne": get_start_daphne_kwargs,