|
|
|
@ -1,14 +1,16 @@
|
|
|
|
|
import ast
|
|
|
|
|
from celery import signals
|
|
|
|
|
|
|
|
|
|
from django.db import transaction
|
|
|
|
|
from django.core.cache import cache
|
|
|
|
|
from django.dispatch import receiver
|
|
|
|
|
from django.db.utils import ProgrammingError
|
|
|
|
|
from django.utils import translation, timezone
|
|
|
|
|
from django.utils.translation import gettext as _
|
|
|
|
|
from django.core.cache import cache
|
|
|
|
|
from celery import signals, current_app
|
|
|
|
|
|
|
|
|
|
from common.db.utils import close_old_connections, get_logger
|
|
|
|
|
from common.signals import django_ready
|
|
|
|
|
from common.db.utils import close_old_connections, get_logger
|
|
|
|
|
|
|
|
|
|
from .celery import app
|
|
|
|
|
from .models import CeleryTaskExecution, CeleryTask
|
|
|
|
|
|
|
|
|
@ -23,15 +25,15 @@ def sync_registered_tasks(*args, **kwargs):
|
|
|
|
|
with transaction.atomic():
|
|
|
|
|
try:
|
|
|
|
|
db_tasks = CeleryTask.objects.all()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return
|
|
|
|
|
celery_task_names = [key for key in app.tasks]
|
|
|
|
|
db_task_names = db_tasks.values_list('name', flat=True)
|
|
|
|
|
celery_task_names = [key for key in app.tasks]
|
|
|
|
|
db_task_names = db_tasks.values_list('name', flat=True)
|
|
|
|
|
|
|
|
|
|
db_tasks.exclude(name__in=celery_task_names).delete()
|
|
|
|
|
not_in_db_tasks = set(celery_task_names) - set(db_task_names)
|
|
|
|
|
tasks_to_create = [CeleryTask(name=name) for name in not_in_db_tasks]
|
|
|
|
|
CeleryTask.objects.bulk_create(tasks_to_create)
|
|
|
|
|
db_tasks.exclude(name__in=celery_task_names).delete()
|
|
|
|
|
not_in_db_tasks = set(celery_task_names) - set(db_task_names)
|
|
|
|
|
tasks_to_create = [CeleryTask(name=name) for name in not_in_db_tasks]
|
|
|
|
|
CeleryTask.objects.bulk_create(tasks_to_create)
|
|
|
|
|
except ProgrammingError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@signals.before_task_publish.connect
|
|
|
|
@ -45,7 +47,7 @@ def before_task_publish(headers=None, **kwargs):
|
|
|
|
|
@signals.task_prerun.connect
|
|
|
|
|
def on_celery_task_pre_run(task_id='', **kwargs):
|
|
|
|
|
# 更新状态
|
|
|
|
|
CeleryTaskExecution.objects.filter(id=task_id)\
|
|
|
|
|
CeleryTaskExecution.objects.filter(id=task_id) \
|
|
|
|
|
.update(state='RUNNING', date_start=timezone.now())
|
|
|
|
|
# 关闭之前的数据库连接
|
|
|
|
|
close_old_connections()
|
|
|
|
|