feat: 支持开启、关闭定时任务执行

pull/12855/head
wangruidong 2024-03-20 18:15:00 +08:00 committed by Bryan
parent 02c2ee8c54
commit f264bf03ff
6 changed files with 30 additions and 25 deletions

View File

@ -14,9 +14,7 @@ from django.utils.translation import gettext_lazy as _
from common.const.crontab import CRONTAB_AT_AM_TWO from common.const.crontab import CRONTAB_AT_AM_TWO
from common.storage.ftp_file import FTPFileStorageHandler from common.storage.ftp_file import FTPFileStorageHandler
from common.utils import get_log_keep_day, get_logger from common.utils import get_log_keep_day, get_logger
from ops.celery.decorator import ( from ops.celery.decorator import register_as_period_task
register_as_period_task, after_app_shutdown_clean_periodic
)
from ops.models import CeleryTaskExecution from ops.models import CeleryTaskExecution
from terminal.backends import server_replay_storage from terminal.backends import server_replay_storage
from terminal.models import Session, Command from terminal.models import Session, Command
@ -118,7 +116,6 @@ def clean_expired_session_period():
@shared_task(verbose_name=_('Clean audits session task log')) @shared_task(verbose_name=_('Clean audits session task log'))
@register_as_period_task(crontab=CRONTAB_AT_AM_TWO) @register_as_period_task(crontab=CRONTAB_AT_AM_TWO)
@after_app_shutdown_clean_periodic
def clean_audits_log_period(): def clean_audits_log_period():
print("Start clean audit session task log") print("Start clean audit session task log")
clean_login_log_period() clean_login_log_period()

View File

@ -94,11 +94,13 @@ class CeleryPeriodTaskViewSet(CommonApiMixin, viewsets.ModelViewSet):
queryset = PeriodicTask.objects.all() queryset = PeriodicTask.objects.all()
serializer_class = CeleryPeriodTaskSerializer serializer_class = CeleryPeriodTaskSerializer
http_method_names = ('get', 'head', 'options', 'patch') http_method_names = ('get', 'head', 'options', 'patch')
lookup_field = 'name'
lookup_value_regex = '[\w.@]+'
def get_queryset(self): def get_object(self):
queryset = super().get_queryset() name = self.kwargs.get('name')
queryset = queryset.exclude(description='') obj = get_object_or_404(PeriodicTask, name=name)
return queryset return obj
class CelerySummaryAPIView(generics.RetrieveAPIView): class CelerySummaryAPIView(generics.RetrieveAPIView):
@ -150,21 +152,18 @@ class CeleryTaskViewSet(
return input_string return input_string
def generate_execute_time(self, queryset): def generate_execute_time(self, queryset):
names = [i.name for i in queryset]
periodic_tasks = PeriodicTask.objects.filter(name__in=names)
periodic_task_dict = {task.name: task for task in periodic_tasks}
now = local_now() now = local_now()
for i in queryset: for i in queryset:
task = periodic_task_dict.get(i.name) task = getattr(i, 'periodic_obj', None)
if not task: if not task:
continue continue
i.exec_cycle = self.extract_schedule(str(task.scheduler)) i.exec_cycle = self.extract_schedule(str(task.scheduler))
last_run_at = task.last_run_at or now last_run_at = task.last_run_at or now
next_run_at = task.schedule.remaining_estimate(last_run_at) next_run_at = task.schedule.remaining_estimate(last_run_at)
if next_run_at.total_seconds() < 0: if next_run_at.total_seconds() < 0:
next_run_at = task.schedule.remaining_estimate(now) next_run_at = task.schedule.remaining_estimate(now)
i.next_exec_time = now + next_run_at i.next_exec_time = now + next_run_at
i.enabled = task.enabled
return queryset return queryset
def generate_summary_state(self, execution_qs): def generate_summary_state(self, execution_qs):
@ -209,6 +208,7 @@ class CeleryTaskViewSet(
def list(self, request, *args, **kwargs): def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset()) queryset = self.filter_queryset(self.get_queryset())
queryset = self.mark_periodic_and_sorted(queryset)
page = self.paginate_queryset(queryset) page = self.paginate_queryset(queryset)
if page is not None: if page is not None:
@ -222,6 +222,20 @@ class CeleryTaskViewSet(
serializer = self.get_serializer(queryset, many=True) serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data) return Response(serializer.data)
@staticmethod
def mark_periodic_and_sorted(queryset):
names = queryset.values_list('name', flat=True)
periodic_tasks = PeriodicTask.objects.filter(name__in=names)
periodic_task_dict = {task.task: task for task in periodic_tasks}
for q in queryset:
if q.name in periodic_task_dict:
q.periodic_obj = periodic_task_dict[q.name]
q.is_periodic = True
else:
q.is_periodic = False
queryset = sorted(queryset, key=lambda x: x.is_periodic, reverse=True)
return queryset
class CeleryTaskExecutionViewSet(CommonApiMixin, viewsets.ModelViewSet): class CeleryTaskExecutionViewSet(CommonApiMixin, viewsets.ModelViewSet):
serializer_class = CeleryTaskExecutionSerializer serializer_class = CeleryTaskExecutionSerializer

View File

@ -75,7 +75,6 @@ def create_or_update_celery_periodic_tasks(tasks):
crontab=crontab, crontab=crontab,
name=name, name=name,
task=detail['task'], task=detail['task'],
enabled=detail.get('enabled', True),
args=json.dumps(detail.get('args', [])), args=json.dumps(detail.get('args', [])),
kwargs=json.dumps(detail.get('kwargs', {})), kwargs=json.dumps(detail.get('kwargs', {})),
description=detail.get('description') or '', description=detail.get('description') or '',

View File

@ -23,21 +23,21 @@ class CeleryResultSerializer(serializers.Serializer):
class CeleryPeriodTaskSerializer(serializers.ModelSerializer): class CeleryPeriodTaskSerializer(serializers.ModelSerializer):
class Meta: class Meta:
model = PeriodicTask model = PeriodicTask
fields = [ read_only_fields = ['name', 'task', 'description',
'name', 'task', 'enabled', 'description', 'last_run_at', 'total_run_count']
'last_run_at', 'total_run_count' fields = ['enabled'] + read_only_fields
]
class CeleryTaskSerializer(serializers.ModelSerializer): class CeleryTaskSerializer(serializers.ModelSerializer):
exec_cycle = serializers.CharField(read_only=True) exec_cycle = serializers.CharField(read_only=True)
next_exec_time = serializers.DateTimeField(format="%Y/%m/%d %H:%M:%S", read_only=True) next_exec_time = serializers.DateTimeField(format="%Y/%m/%d %H:%M:%S", read_only=True)
enabled = serializers.BooleanField(default=True)
class Meta: class Meta:
model = CeleryTask model = CeleryTask
read_only_fields = [ read_only_fields = [
'id', 'name', 'meta', 'summary', 'state', 'id', 'name', 'meta', 'summary', 'state',
'date_last_publish', 'exec_cycle', 'next_exec_time' 'date_last_publish', 'exec_cycle', 'next_exec_time', 'enabled'
] ]
fields = read_only_fields fields = read_only_fields

View File

@ -135,7 +135,6 @@ def clean_up_unexpected_jobs():
@shared_task(verbose_name=_('Clean job_execution db record')) @shared_task(verbose_name=_('Clean job_execution db record'))
@register_as_period_task(crontab=CRONTAB_AT_AM_TWO) @register_as_period_task(crontab=CRONTAB_AT_AM_TWO)
@after_app_shutdown_clean_periodic
def clean_job_execution_period(): def clean_job_execution_period():
logger.info("Start clean job_execution db record") logger.info("Start clean job_execution db record")
now = timezone.now() now = timezone.now()

View File

@ -12,9 +12,7 @@ from django.utils.translation import gettext_lazy as _
from common.storage.replay import ReplayStorageHandler from common.storage.replay import ReplayStorageHandler
from ops.celery.decorator import ( from ops.celery.decorator import (
register_as_period_task, after_app_ready_start, register_as_period_task, after_app_ready_start)
after_app_shutdown_clean_periodic
)
from orgs.utils import tmp_to_builtin_org from orgs.utils import tmp_to_builtin_org
from orgs.utils import tmp_to_root_org from orgs.utils import tmp_to_root_org
from .backends import server_replay_storage from .backends import server_replay_storage
@ -33,7 +31,6 @@ logger = get_task_logger(__name__)
@shared_task(verbose_name=_('Periodic delete terminal status')) @shared_task(verbose_name=_('Periodic delete terminal status'))
@register_as_period_task(interval=3600) @register_as_period_task(interval=3600)
@after_app_ready_start @after_app_ready_start
@after_app_shutdown_clean_periodic
def delete_terminal_status_period(): def delete_terminal_status_period():
yesterday = timezone.now() - datetime.timedelta(days=7) yesterday = timezone.now() - datetime.timedelta(days=7)
Status.objects.filter(date_created__lt=yesterday).delete() Status.objects.filter(date_created__lt=yesterday).delete()
@ -42,7 +39,6 @@ def delete_terminal_status_period():
@shared_task(verbose_name=_('Clean orphan session')) @shared_task(verbose_name=_('Clean orphan session'))
@register_as_period_task(interval=600) @register_as_period_task(interval=600)
@after_app_ready_start @after_app_ready_start
@after_app_shutdown_clean_periodic
@tmp_to_root_org() @tmp_to_root_org()
def clean_orphan_session(): def clean_orphan_session():
active_sessions = Session.objects.filter(is_finished=False) active_sessions = Session.objects.filter(is_finished=False)