diff --git a/apps/audits/tasks.py b/apps/audits/tasks.py index 97f46adc5..ac28e91cb 100644 --- a/apps/audits/tasks.py +++ b/apps/audits/tasks.py @@ -14,9 +14,7 @@ from django.utils.translation import gettext_lazy as _ from common.const.crontab import CRONTAB_AT_AM_TWO from common.storage.ftp_file import FTPFileStorageHandler from common.utils import get_log_keep_day, get_logger -from ops.celery.decorator import ( - register_as_period_task, after_app_shutdown_clean_periodic -) +from ops.celery.decorator import register_as_period_task from ops.models import CeleryTaskExecution from terminal.backends import server_replay_storage from terminal.models import Session, Command @@ -118,7 +116,6 @@ def clean_expired_session_period(): @shared_task(verbose_name=_('Clean audits session task log')) @register_as_period_task(crontab=CRONTAB_AT_AM_TWO) -@after_app_shutdown_clean_periodic def clean_audits_log_period(): print("Start clean audit session task log") clean_login_log_period() diff --git a/apps/ops/api/celery.py b/apps/ops/api/celery.py index dd66a886e..43b63ce5b 100644 --- a/apps/ops/api/celery.py +++ b/apps/ops/api/celery.py @@ -94,11 +94,13 @@ class CeleryPeriodTaskViewSet(CommonApiMixin, viewsets.ModelViewSet): queryset = PeriodicTask.objects.all() serializer_class = CeleryPeriodTaskSerializer http_method_names = ('get', 'head', 'options', 'patch') + lookup_field = 'name' + lookup_value_regex = '[\w.@]+' - def get_queryset(self): - queryset = super().get_queryset() - queryset = queryset.exclude(description='') - return queryset + def get_object(self): + name = self.kwargs.get('name') + obj = get_object_or_404(PeriodicTask, name=name) + return obj class CelerySummaryAPIView(generics.RetrieveAPIView): @@ -150,21 +152,18 @@ class CeleryTaskViewSet( return input_string def generate_execute_time(self, queryset): - names = [i.name for i in queryset] - periodic_tasks = PeriodicTask.objects.filter(name__in=names) - periodic_task_dict = {task.name: task for task in periodic_tasks} now = local_now() for i in queryset: - task = periodic_task_dict.get(i.name) + task = getattr(i, 'periodic_obj', None) if not task: continue i.exec_cycle = self.extract_schedule(str(task.scheduler)) - last_run_at = task.last_run_at or now next_run_at = task.schedule.remaining_estimate(last_run_at) if next_run_at.total_seconds() < 0: next_run_at = task.schedule.remaining_estimate(now) i.next_exec_time = now + next_run_at + i.enabled = task.enabled return queryset def generate_summary_state(self, execution_qs): @@ -209,6 +208,7 @@ class CeleryTaskViewSet( def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) + queryset = self.mark_periodic_and_sorted(queryset) page = self.paginate_queryset(queryset) if page is not None: @@ -222,6 +222,20 @@ class CeleryTaskViewSet( serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) + @staticmethod + def mark_periodic_and_sorted(queryset): + names = queryset.values_list('name', flat=True) + periodic_tasks = PeriodicTask.objects.filter(name__in=names) + periodic_task_dict = {task.task: task for task in periodic_tasks} + for q in queryset: + if q.name in periodic_task_dict: + q.periodic_obj = periodic_task_dict[q.name] + q.is_periodic = True + else: + q.is_periodic = False + queryset = sorted(queryset, key=lambda x: x.is_periodic, reverse=True) + return queryset + class CeleryTaskExecutionViewSet(CommonApiMixin, viewsets.ModelViewSet): serializer_class = CeleryTaskExecutionSerializer diff --git a/apps/ops/celery/utils.py b/apps/ops/celery/utils.py index 3c38ee287..580a256fe 100644 --- a/apps/ops/celery/utils.py +++ b/apps/ops/celery/utils.py @@ -75,7 +75,6 @@ def create_or_update_celery_periodic_tasks(tasks): crontab=crontab, name=name, task=detail['task'], - enabled=detail.get('enabled', True), args=json.dumps(detail.get('args', [])), kwargs=json.dumps(detail.get('kwargs', {})), description=detail.get('description') or '', diff --git a/apps/ops/serializers/celery.py b/apps/ops/serializers/celery.py index a4e9c7e8d..b0802f6b4 100644 --- a/apps/ops/serializers/celery.py +++ b/apps/ops/serializers/celery.py @@ -23,21 +23,21 @@ class CeleryResultSerializer(serializers.Serializer): class CeleryPeriodTaskSerializer(serializers.ModelSerializer): class Meta: model = PeriodicTask - fields = [ - 'name', 'task', 'enabled', 'description', - 'last_run_at', 'total_run_count' - ] + read_only_fields = ['name', 'task', 'description', + 'last_run_at', 'total_run_count'] + fields = ['enabled'] + read_only_fields class CeleryTaskSerializer(serializers.ModelSerializer): exec_cycle = serializers.CharField(read_only=True) next_exec_time = serializers.DateTimeField(format="%Y/%m/%d %H:%M:%S", read_only=True) + enabled = serializers.BooleanField(default=True) class Meta: model = CeleryTask read_only_fields = [ 'id', 'name', 'meta', 'summary', 'state', - 'date_last_publish', 'exec_cycle', 'next_exec_time' + 'date_last_publish', 'exec_cycle', 'next_exec_time', 'enabled' ] fields = read_only_fields diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index 57c57a0d8..3da3b726e 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -135,7 +135,6 @@ def clean_up_unexpected_jobs(): @shared_task(verbose_name=_('Clean job_execution db record')) @register_as_period_task(crontab=CRONTAB_AT_AM_TWO) -@after_app_shutdown_clean_periodic def clean_job_execution_period(): logger.info("Start clean job_execution db record") now = timezone.now() diff --git a/apps/terminal/tasks.py b/apps/terminal/tasks.py index b8bd89f73..0892aeac5 100644 --- a/apps/terminal/tasks.py +++ b/apps/terminal/tasks.py @@ -12,9 +12,7 @@ from django.utils.translation import gettext_lazy as _ from common.storage.replay import ReplayStorageHandler from ops.celery.decorator import ( - register_as_period_task, after_app_ready_start, - after_app_shutdown_clean_periodic -) + register_as_period_task, after_app_ready_start) from orgs.utils import tmp_to_builtin_org from orgs.utils import tmp_to_root_org from .backends import server_replay_storage @@ -33,7 +31,6 @@ logger = get_task_logger(__name__) @shared_task(verbose_name=_('Periodic delete terminal status')) @register_as_period_task(interval=3600) @after_app_ready_start -@after_app_shutdown_clean_periodic def delete_terminal_status_period(): yesterday = timezone.now() - datetime.timedelta(days=7) Status.objects.filter(date_created__lt=yesterday).delete() @@ -42,7 +39,6 @@ def delete_terminal_status_period(): @shared_task(verbose_name=_('Clean orphan session')) @register_as_period_task(interval=600) @after_app_ready_start -@after_app_shutdown_clean_periodic @tmp_to_root_org() def clean_orphan_session(): active_sessions = Session.objects.filter(is_finished=False)