feat(ops): 项目启动时,清除指定的celery定时任务;添加获取celery定时任务的函数 (#4378)

* feat(ops): 添加获取celery定时任务的函数

* feat(ops): 项目启动时,清除指定的celery定时任务

* feat(ops): 项目启动时,清除指定的celery定时任务 2

Co-authored-by: Bai <bugatti_it@163.com>
pull/4370/head
老广 2020-07-21 15:33:33 +08:00 committed by GitHub
parent 31ba0564e4
commit 5d08438dad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 1 deletions

View File

@ -93,6 +93,12 @@ def delete_celery_periodic_task(task_name):
PeriodicTasks.update_changed()
def get_celery_periodic_task(task_name):
from django_celery_beat.models import PeriodicTask
task = PeriodicTask.objects.filter(name=task_name).first()
return task
def get_celery_task_log_path(task_id):
task_id = str(task_id)
rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log')

View File

@ -15,7 +15,10 @@ from .celery.decorator import (
register_as_period_task, after_app_shutdown_clean_periodic,
after_app_ready_start
)
from .celery.utils import create_or_update_celery_periodic_tasks
from .celery.utils import (
create_or_update_celery_periodic_tasks, get_celery_periodic_task,
disable_celery_periodic_task, delete_celery_periodic_task
)
from .models import Task, CommandExecution, CeleryTask
from .utils import send_server_performance_mail
@ -95,6 +98,29 @@ def clean_celery_tasks_period():
subprocess.call(command, shell=True)
@shared_task
@after_app_ready_start
def clean_celery_periodic_tasks():
"""清除celery定时任务"""
need_cleaned_tasks = [
'handle_be_interrupted_change_auth_task_periodic',
]
logger.info('Start clean celery periodic tasks: {}'.format(need_cleaned_tasks))
for task_name in need_cleaned_tasks:
logger.info('Start clean task: {}'.format(task_name))
task = get_celery_periodic_task(task_name)
if task is None:
logger.info('Task does not exist: {}'.format(task_name))
continue
disable_celery_periodic_task(task_name)
delete_celery_periodic_task(task_name)
task = get_celery_periodic_task(task_name)
if task is None:
logger.info('Clean task success: {}'.format(task_name))
else:
logger.info('Clean task failure: {}'.format(task))
@shared_task
@after_app_ready_start
def create_or_update_registered_periodic_tasks():