jumpserver/apps/ops/celery/utils.py

98 lines
3.1 KiB
Python
Raw Normal View History

2018-04-02 05:19:31 +00:00
# -*- coding: utf-8 -*-
#
2017-12-24 10:53:07 +00:00
import json
import os
2016-11-13 14:34:38 +00:00
from django.conf import settings
2019-02-24 13:34:45 +00:00
from django.utils.timezone import get_current_timezone
2017-12-25 17:54:10 +00:00
from django.db.utils import ProgrammingError, OperationalError
2018-04-02 05:19:31 +00:00
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
2017-12-24 10:53:07 +00:00
def create_or_update_celery_periodic_tasks(tasks):
"""
:param tasks: {
'add-every-monday-morning': {
'task': 'tasks.add' # A registered celery task,
'interval': 30,
'crontab': "30 7 * * *",
'args': (16, 16),
'kwargs': {},
'enabled': False,
},
}
:return:
"""
# Todo: check task valid, task and callback must be a celery task
for name, detail in tasks.items():
interval = None
crontab = None
2017-12-24 15:21:05 +00:00
try:
IntervalSchedule.objects.all().count()
2017-12-25 17:54:10 +00:00
except (ProgrammingError, OperationalError):
2017-12-24 15:21:05 +00:00
return None
2017-12-24 10:53:07 +00:00
if isinstance(detail.get("interval"), int):
intervals = IntervalSchedule.objects.filter(
every=detail["interval"], period=IntervalSchedule.SECONDS
)
if intervals:
interval = intervals[0]
else:
interval = IntervalSchedule.objects.create(
every=detail['interval'],
period=IntervalSchedule.SECONDS,
)
elif isinstance(detail.get("crontab"), str):
try:
minute, hour, day, month, week = detail["crontab"].split()
except ValueError:
raise SyntaxError("crontab is not valid")
kwargs = dict(
minute=minute, hour=hour, day_of_week=week,
2019-02-24 13:34:45 +00:00
day_of_month=day, month_of_year=month, timezone=get_current_timezone()
2017-12-24 10:53:07 +00:00
)
contabs = CrontabSchedule.objects.filter(
**kwargs
)
if contabs:
crontab = contabs[0]
else:
crontab = CrontabSchedule.objects.create(**kwargs)
else:
raise SyntaxError("Schedule is not valid")
defaults = dict(
interval=interval,
crontab=crontab,
name=name,
task=detail['task'],
args=json.dumps(detail.get('args', [])),
kwargs=json.dumps(detail.get('kwargs', {})),
enabled=detail.get('enabled', True),
)
task = PeriodicTask.objects.update_or_create(
defaults=defaults, name=name,
)
return task
def disable_celery_periodic_task(task_name):
from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.filter(name=task_name).update(enabled=False)
def delete_celery_periodic_task(task_name):
from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.filter(name=task_name).delete()
def get_celery_task_log_path(task_id):
task_id = str(task_id)
rel_path = os.path.join(task_id[0], task_id[1], task_id + '.log')
path = os.path.join(settings.CELERY_LOG_DIR, rel_path)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
2017-12-24 10:53:07 +00:00