From bff6f397ce913114bd2ceafc268a403cbd6bfb8c Mon Sep 17 00:00:00 2001
From: jiangweidong <weidong.jiang@fit2cloud.com>
Date: Fri, 24 Feb 2023 18:10:42 +0800
Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E5=86=B3Activity=E4=BF=9D?=
 =?UTF-8?q?=E5=AD=98=E5=9B=A0=E4=B8=BA=E5=8F=82=E6=95=B0=E5=87=BA=E9=94=99?=
 =?UTF-8?q?=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 apps/accounts/tasks/automation.py      |  2 +-
 apps/accounts/tasks/backup_account.py  |  2 +-
 apps/accounts/tasks/gather_accounts.py |  2 +-
 apps/accounts/tasks/push_account.py    |  2 +-
 apps/assets/tasks/automation.py        |  2 +-
 apps/common/tasks.py                   |  2 +-
 apps/ops/tasks.py                      | 22 +++++++++++-----------
 apps/terminal/tasks.py                 | 10 ++++++++--
 8 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/apps/accounts/tasks/automation.py b/apps/accounts/tasks/automation.py
index 2b9f99235..359a15913 100644
--- a/apps/accounts/tasks/automation.py
+++ b/apps/accounts/tasks/automation.py
@@ -8,7 +8,7 @@ from orgs.utils import tmp_to_org, tmp_to_root_org
 logger = get_logger(__file__)
 
 
-def task_activity_callback(self, pid, trigger, tp):
+def task_activity_callback(self, pid, trigger, tp, *args, **kwargs):
     model = AutomationTypes.get_type_model(tp)
     with tmp_to_root_org():
         instance = get_object_or_none(model, pk=pid)
diff --git a/apps/accounts/tasks/backup_account.py b/apps/accounts/tasks/backup_account.py
index 1f4e46d83..16eafaf5e 100644
--- a/apps/accounts/tasks/backup_account.py
+++ b/apps/accounts/tasks/backup_account.py
@@ -9,7 +9,7 @@ from orgs.utils import tmp_to_org, tmp_to_root_org
 logger = get_logger(__file__)
 
 
-def task_activity_callback(self, pid, trigger):
+def task_activity_callback(self, pid, trigger, *args, **kwargs):
     from accounts.models import AccountBackupAutomation
     with tmp_to_root_org():
         plan = get_object_or_none(AccountBackupAutomation, pk=pid)
diff --git a/apps/accounts/tasks/gather_accounts.py b/apps/accounts/tasks/gather_accounts.py
index ceead3f9d..42f8641bb 100644
--- a/apps/accounts/tasks/gather_accounts.py
+++ b/apps/accounts/tasks/gather_accounts.py
@@ -27,7 +27,7 @@ def gather_asset_accounts_util(nodes, task_name):
 
 @shared_task(
     queue="ansible", verbose_name=_('Gather asset accounts'),
-    activity_callback=lambda self, node_ids, task_name=None: (node_ids, None)
+    activity_callback=lambda self, node_ids, task_name=None, *args, **kwargs: (node_ids, None)
 )
 def gather_asset_accounts_task(node_ids, task_name=None):
     if task_name is None:
diff --git a/apps/accounts/tasks/push_account.py b/apps/accounts/tasks/push_account.py
index 10dd4ce64..26014bdec 100644
--- a/apps/accounts/tasks/push_account.py
+++ b/apps/accounts/tasks/push_account.py
@@ -13,7 +13,7 @@ __all__ = [
 
 @shared_task(
     queue="ansible", verbose_name=_('Push accounts to assets'),
-    activity_callback=lambda self, account_ids, asset_ids: (account_ids, None)
+    activity_callback=lambda self, account_ids, *args, **kwargs: (account_ids, None)
 )
 def push_accounts_to_assets_task(account_ids):
     from accounts.models import PushAccountAutomation
diff --git a/apps/assets/tasks/automation.py b/apps/assets/tasks/automation.py
index 02e946ede..20426451c 100644
--- a/apps/assets/tasks/automation.py
+++ b/apps/assets/tasks/automation.py
@@ -8,7 +8,7 @@ from orgs.utils import tmp_to_root_org, tmp_to_org
 logger = get_logger(__file__)
 
 
-def task_activity_callback(self, pid, trigger, tp):
+def task_activity_callback(self, pid, trigger, tp, *args, **kwargs):
     model = AutomationTypes.get_type_model(tp)
     with tmp_to_root_org():
         instance = get_object_or_none(model, pk=pid)
diff --git a/apps/common/tasks.py b/apps/common/tasks.py
index b4c3d3488..ee4880ee7 100644
--- a/apps/common/tasks.py
+++ b/apps/common/tasks.py
@@ -10,7 +10,7 @@ from .utils import get_logger
 logger = get_logger(__file__)
 
 
-def task_activity_callback(self, subject, message, recipient_list, **kwargs):
+def task_activity_callback(self, subject, message, recipient_list, *args, **kwargs):
     from users.models import User
     email_list = recipient_list
     resource_ids = list(User.objects.filter(email__in=email_list).values_list('id', flat=True))
diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py
index 833e20ea0..11ad27aa4 100644
--- a/apps/ops/tasks.py
+++ b/apps/ops/tasks.py
@@ -3,8 +3,10 @@
 from celery import shared_task
 from celery.exceptions import SoftTimeLimitExceeded
 from django.utils.translation import ugettext_lazy as _
+from django_celery_beat.models import PeriodicTask
 
 from common.utils import get_logger, get_object_or_none
+from ops.celery import app
 from orgs.utils import tmp_to_org, tmp_to_root_org
 from .celery.decorator import (
     register_as_period_task, after_app_ready_start
@@ -19,7 +21,7 @@ from .notifications import ServerPerformanceCheckUtil
 logger = get_logger(__file__)
 
 
-def job_task_activity_callback(self, job_id, trigger):
+def job_task_activity_callback(self, job_id, *args, **kwargs):
     job = get_object_or_none(Job, id=job_id)
     if not job:
         return
@@ -48,7 +50,7 @@ def run_ops_job(job_id):
             logger.error("Start adhoc execution error: {}".format(e))
 
 
-def job_execution_task_activity_callback(self, execution_id, trigger):
+def job_execution_task_activity_callback(self, execution_id, *args, **kwargs):
     execution = get_object_or_none(JobExecution, id=execution_id)
     if not execution:
         return
@@ -78,16 +80,14 @@ def run_ops_job_execution(execution_id, **kwargs):
 @after_app_ready_start
 def clean_celery_periodic_tasks():
     """清除celery定时任务"""
-    need_cleaned_tasks = [
-        'handle_be_interrupted_change_auth_task_periodic',
-    ]
-    logger.info('Start clean celery periodic tasks: {}'.format(need_cleaned_tasks))
-    for task_name in need_cleaned_tasks:
-        logger.info('Start clean task: {}'.format(task_name))
-        task = get_celery_periodic_task(task_name)
-        if task is None:
-            logger.info('Task does not exist: {}'.format(task_name))
+    logger.info('Start clean celery periodic tasks.')
+    register_tasks = PeriodicTask.objects.all()
+    for task in register_tasks:
+        if task.task in app.tasks:
             continue
+
+        task_name = task.name
+        logger.info('Start clean task: {}'.format(task_name))
         disable_celery_periodic_task(task_name)
         delete_celery_periodic_task(task_name)
         task = get_celery_periodic_task(task_name)
diff --git a/apps/terminal/tasks.py b/apps/terminal/tasks.py
index 4b9fd9af3..28356972f 100644
--- a/apps/terminal/tasks.py
+++ b/apps/terminal/tasks.py
@@ -80,14 +80,20 @@ def upload_session_replay_to_external_storage(session_id):
     return
 
 
-@shared_task(verbose_name=_('Run applet host deployment'), activity_callback=lambda did: ([did], ))
+@shared_task(
+    verbose_name=_('Run applet host deployment'),
+    activity_callback=lambda self, did, *args, **kwargs: ([did], )
+)
 def run_applet_host_deployment(did):
     with tmp_to_builtin_org(system=1):
         deployment = AppletHostDeployment.objects.get(id=did)
         deployment.start()
 
 
-@shared_task(verbose_name=_('Install applet'), activity_callback=lambda did, applet_id: ([did],))
+@shared_task(
+    verbose_name=_('Install applet'),
+    activity_callback=lambda self, did, applet_id, *args, **kwargs: ([did],)
+)
 def run_applet_host_deployment_install_applet(did, applet_id):
     with tmp_to_builtin_org(system=1):
         deployment = AppletHostDeployment.objects.get(id=did)