mirror of https://github.com/jumpserver/jumpserver
202 lines
6.3 KiB
Python
202 lines
6.3 KiB
Python
# ~*~ coding: utf-8 ~*~
|
|
|
|
import os
|
|
import json
|
|
from functools import wraps
|
|
|
|
from celery import Celery, subtask
|
|
from celery.signals import worker_ready, worker_shutdown
|
|
from django.db.utils import ProgrammingError, OperationalError
|
|
|
|
from .utils import get_logger
|
|
|
|
logger = get_logger(__file__)
|
|
|
|
# set the default Django settings module for the 'celery' program.
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings')
|
|
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
|
|
app = Celery('jumpserver')
|
|
|
|
# Using a string here means the worker will not have to
|
|
# pickle the object when using Windows.
|
|
app.config_from_object('django.conf:settings', namespace='CELERY')
|
|
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
|
|
|
|
|
|
def create_or_update_celery_periodic_tasks(tasks):
|
|
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
|
|
"""
|
|
:param tasks: {
|
|
'add-every-monday-morning': {
|
|
'task': 'tasks.add' # A registered celery task,
|
|
'interval': 30,
|
|
'crontab': "30 7 * * *",
|
|
'args': (16, 16),
|
|
'kwargs': {},
|
|
'enabled': False,
|
|
},
|
|
}
|
|
:return:
|
|
"""
|
|
# Todo: check task valid, task and callback must be a celery task
|
|
for name, detail in tasks.items():
|
|
interval = None
|
|
crontab = None
|
|
try:
|
|
IntervalSchedule.objects.all().count()
|
|
except (ProgrammingError, OperationalError):
|
|
return None
|
|
|
|
if isinstance(detail.get("interval"), int):
|
|
intervals = IntervalSchedule.objects.filter(
|
|
every=detail["interval"], period=IntervalSchedule.SECONDS
|
|
)
|
|
if intervals:
|
|
interval = intervals[0]
|
|
else:
|
|
interval = IntervalSchedule.objects.create(
|
|
every=detail['interval'],
|
|
period=IntervalSchedule.SECONDS,
|
|
)
|
|
elif isinstance(detail.get("crontab"), str):
|
|
try:
|
|
minute, hour, day, month, week = detail["crontab"].split()
|
|
except ValueError:
|
|
raise SyntaxError("crontab is not valid")
|
|
kwargs = dict(
|
|
minute=minute, hour=hour, day_of_week=week,
|
|
day_of_month=day, month_of_year=month,
|
|
)
|
|
contabs = CrontabSchedule.objects.filter(
|
|
**kwargs
|
|
)
|
|
if contabs:
|
|
crontab = contabs[0]
|
|
else:
|
|
crontab = CrontabSchedule.objects.create(**kwargs)
|
|
else:
|
|
raise SyntaxError("Schedule is not valid")
|
|
|
|
defaults = dict(
|
|
interval=interval,
|
|
crontab=crontab,
|
|
name=name,
|
|
task=detail['task'],
|
|
args=json.dumps(detail.get('args', [])),
|
|
kwargs=json.dumps(detail.get('kwargs', {})),
|
|
enabled=detail.get('enabled', True),
|
|
)
|
|
|
|
task = PeriodicTask.objects.update_or_create(
|
|
defaults=defaults, name=name,
|
|
)
|
|
return task
|
|
|
|
|
|
def disable_celery_periodic_task(task_name):
|
|
from django_celery_beat.models import PeriodicTask
|
|
PeriodicTask.objects.filter(name=task_name).update(enabled=False)
|
|
|
|
|
|
def delete_celery_periodic_task(task_name):
|
|
from django_celery_beat.models import PeriodicTask
|
|
PeriodicTask.objects.filter(name=task_name).delete()
|
|
|
|
|
|
__REGISTER_PERIODIC_TASKS = []
|
|
__AFTER_APP_SHUTDOWN_CLEAN_TASKS = []
|
|
__AFTER_APP_READY_RUN_TASKS = []
|
|
|
|
|
|
def register_as_period_task(crontab=None, interval=None):
|
|
"""
|
|
Warning: Task must be have not any args and kwargs
|
|
:param crontab: "* * * * *"
|
|
:param interval: 60*60*60
|
|
:return:
|
|
"""
|
|
if crontab is None and interval is None:
|
|
raise SyntaxError("Must set crontab or interval one")
|
|
|
|
def decorate(func):
|
|
if crontab is None and interval is None:
|
|
raise SyntaxError("Interval and crontab must set one")
|
|
|
|
# Because when this decorator run, the task was not created,
|
|
# So we can't use func.name
|
|
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
|
if name not in __REGISTER_PERIODIC_TASKS:
|
|
create_or_update_celery_periodic_tasks({
|
|
name: {
|
|
'task': name,
|
|
'interval': interval,
|
|
'crontab': crontab,
|
|
'args': (),
|
|
'enabled': True,
|
|
}
|
|
})
|
|
__REGISTER_PERIODIC_TASKS.append(name)
|
|
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorate
|
|
|
|
|
|
def after_app_ready_start(func):
|
|
# Because when this decorator run, the task was not created,
|
|
# So we can't use func.name
|
|
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
|
if name not in __AFTER_APP_READY_RUN_TASKS:
|
|
__AFTER_APP_READY_RUN_TASKS.append(name)
|
|
|
|
@wraps(func)
|
|
def decorate(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
return decorate
|
|
|
|
|
|
def after_app_shutdown_clean(func):
|
|
# Because when this decorator run, the task was not created,
|
|
# So we can't use func.name
|
|
name = '{func.__module__}.{func.__name__}'.format(func=func)
|
|
if name not in __AFTER_APP_READY_RUN_TASKS:
|
|
__AFTER_APP_SHUTDOWN_CLEAN_TASKS.append(name)
|
|
|
|
@wraps(func)
|
|
def decorate(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
return decorate
|
|
|
|
|
|
@worker_ready.connect
|
|
def on_app_ready(sender=None, headers=None, body=None, **kwargs):
|
|
if cache.get("CELERY_APP_READY", 0) == 1:
|
|
return
|
|
cache.set("CELERY_APP_READY", 1, 10)
|
|
logger.debug("App ready signal recv")
|
|
logger.debug("Start need start task: [{}]".format(
|
|
", ".join(__AFTER_APP_READY_RUN_TASKS))
|
|
)
|
|
for task in __AFTER_APP_READY_RUN_TASKS:
|
|
subtask(task).delay()
|
|
|
|
|
|
@worker_shutdown.connect
|
|
def after_app_shutdown(sender=None, headers=None, body=None, **kwargs):
|
|
if cache.get("CELERY_APP_SHUTDOWN", 0) == 1:
|
|
return
|
|
cache.set("CELERY_APP_SHUTDOWN", 1, 10)
|
|
from django_celery_beat.models import PeriodicTask
|
|
logger.debug("App shutdown signal recv")
|
|
logger.debug("Clean need cleaned period tasks: [{}]".format(
|
|
', '.join(__AFTER_APP_SHUTDOWN_CLEAN_TASKS))
|
|
)
|
|
PeriodicTask.objects.filter(name__in=__AFTER_APP_SHUTDOWN_CLEAN_TASKS).delete()
|