diff --git a/apps/ops/celery/signal_handler.py b/apps/ops/celery/signal_handler.py index b2a6ddf37..af8e025fd 100644 --- a/apps/ops/celery/signal_handler.py +++ b/apps/ops/celery/signal_handler.py @@ -7,6 +7,7 @@ from celery import subtask from celery.signals import ( worker_ready, worker_shutdown, after_setup_logger ) +from kombu.utils.encoding import safe_str from django_celery_beat.models import PeriodicTask from common.utils import get_logger @@ -14,6 +15,7 @@ from .decorator import get_after_app_ready_tasks, get_after_app_shutdown_clean_t from .logger import CeleryTaskFileHandler logger = get_logger(__file__) +safe_str = lambda x: x @worker_ready.connect @@ -48,58 +50,3 @@ def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=No formatter = logging.Formatter(format) handler.setFormatter(formatter) logger.addHandler(handler) - - -# @after_task_publish.connect -# def after_task_publish_signal_handler(sender, headers=None, **kwargs): -# CeleryTask.objects.create( -# id=headers["id"], status=CeleryTask.WAITING, name=headers["task"] -# ) -# cache.set(headers["id"], True, 3600) -# -# -# @task_prerun.connect -# def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): -# time.sleep(0.1) -# for i in range(5): -# if cache.get(task_id, False): -# break -# else: -# time.sleep(0.1) -# continue -# -# t = get_object_or_none(CeleryTask, id=task_id) -# if t is None: -# logger.warn("Not get the task: {}".format(task_id)) -# return -# now = datetime.datetime.now().strftime("%Y-%m-%d") -# log_path = os.path.join(now, task_id + '.log') -# full_path = os.path.join(CeleryTask.LOG_DIR, log_path) -# -# if not os.path.exists(os.path.dirname(full_path)): -# os.makedirs(os.path.dirname(full_path)) -# with transaction.atomic(): -# t.date_start = timezone.now() -# t.status = CeleryTask.RUNNING -# t.log_path = log_path -# t.save() -# f = open(full_path, 'w', encoding="utf-8") -# tee = TeeObj(f) -# sys.stdout = tee -# task.log_f = tee -# -# -# @task_postrun.connect -# def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): -# t = get_object_or_none(CeleryTask, id=task_id) -# if t is None: -# logger.warn("Not get the task: {}".format(task_id)) -# return -# with transaction.atomic(): -# t.status = CeleryTask.FINISHED -# t.date_finished = timezone.now() -# t.save() -# task.log_f.flush() -# sys.stdout = task.log_f.origin_stdout -# task.log_f.close() -