fix: 解决Activity保存因为参数出错问题

pull/9759/head
jiangweidong 2023-02-24 17:59:32 +08:00
parent 9895ae73bc
commit 824e4c9e81
8 changed files with 25 additions and 19 deletions

View File

@ -8,7 +8,7 @@ from orgs.utils import tmp_to_org, tmp_to_root_org
logger = get_logger(__file__) logger = get_logger(__file__)
def task_activity_callback(self, pid, trigger, tp): def task_activity_callback(self, pid, trigger, tp, *args, **kwargs):
model = AutomationTypes.get_type_model(tp) model = AutomationTypes.get_type_model(tp)
with tmp_to_root_org(): with tmp_to_root_org():
instance = get_object_or_none(model, pk=pid) instance = get_object_or_none(model, pk=pid)

View File

@ -9,7 +9,7 @@ from orgs.utils import tmp_to_org, tmp_to_root_org
logger = get_logger(__file__) logger = get_logger(__file__)
def task_activity_callback(self, pid, trigger): def task_activity_callback(self, pid, trigger, *args, **kwargs):
from accounts.models import AccountBackupAutomation from accounts.models import AccountBackupAutomation
with tmp_to_root_org(): with tmp_to_root_org():
plan = get_object_or_none(AccountBackupAutomation, pk=pid) plan = get_object_or_none(AccountBackupAutomation, pk=pid)

View File

@ -27,7 +27,7 @@ def gather_asset_accounts_util(nodes, task_name):
@shared_task( @shared_task(
queue="ansible", verbose_name=_('Gather asset accounts'), queue="ansible", verbose_name=_('Gather asset accounts'),
activity_callback=lambda self, node_ids, task_name=None: (node_ids, None) activity_callback=lambda self, node_ids, task_name=None, *args, **kwargs: (node_ids, None)
) )
def gather_asset_accounts_task(node_ids, task_name=None): def gather_asset_accounts_task(node_ids, task_name=None):
if task_name is None: if task_name is None:

View File

@ -13,7 +13,7 @@ __all__ = [
@shared_task( @shared_task(
queue="ansible", verbose_name=_('Push accounts to assets'), queue="ansible", verbose_name=_('Push accounts to assets'),
activity_callback=lambda self, account_ids, asset_ids: (account_ids, None) activity_callback=lambda self, account_ids, *args, **kwargs: (account_ids, None)
) )
def push_accounts_to_assets_task(account_ids): def push_accounts_to_assets_task(account_ids):
from accounts.models import PushAccountAutomation from accounts.models import PushAccountAutomation

View File

@ -8,7 +8,7 @@ from orgs.utils import tmp_to_root_org, tmp_to_org
logger = get_logger(__file__) logger = get_logger(__file__)
def task_activity_callback(self, pid, trigger, tp): def task_activity_callback(self, pid, trigger, tp, *args, **kwargs):
model = AutomationTypes.get_type_model(tp) model = AutomationTypes.get_type_model(tp)
with tmp_to_root_org(): with tmp_to_root_org():
instance = get_object_or_none(model, pk=pid) instance = get_object_or_none(model, pk=pid)

View File

@ -10,7 +10,7 @@ from .utils import get_logger
logger = get_logger(__file__) logger = get_logger(__file__)
def task_activity_callback(self, subject, message, recipient_list, **kwargs): def task_activity_callback(self, subject, message, recipient_list, *args, **kwargs):
from users.models import User from users.models import User
email_list = recipient_list email_list = recipient_list
resource_ids = list(User.objects.filter(email__in=email_list).values_list('id', flat=True)) resource_ids = list(User.objects.filter(email__in=email_list).values_list('id', flat=True))

View File

@ -3,8 +3,10 @@
from celery import shared_task from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded from celery.exceptions import SoftTimeLimitExceeded
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django_celery_beat.models import PeriodicTask
from common.utils import get_logger, get_object_or_none from common.utils import get_logger, get_object_or_none
from ops.celery import app
from orgs.utils import tmp_to_org, tmp_to_root_org from orgs.utils import tmp_to_org, tmp_to_root_org
from .celery.decorator import ( from .celery.decorator import (
register_as_period_task, after_app_ready_start register_as_period_task, after_app_ready_start
@ -19,7 +21,7 @@ from .notifications import ServerPerformanceCheckUtil
logger = get_logger(__file__) logger = get_logger(__file__)
def job_task_activity_callback(self, job_id, trigger): def job_task_activity_callback(self, job_id, *args, **kwargs):
job = get_object_or_none(Job, id=job_id) job = get_object_or_none(Job, id=job_id)
if not job: if not job:
return return
@ -48,7 +50,7 @@ def run_ops_job(job_id):
logger.error("Start adhoc execution error: {}".format(e)) logger.error("Start adhoc execution error: {}".format(e))
def job_execution_task_activity_callback(self, execution_id, trigger): def job_execution_task_activity_callback(self, execution_id, *args, **kwargs):
execution = get_object_or_none(JobExecution, id=execution_id) execution = get_object_or_none(JobExecution, id=execution_id)
if not execution: if not execution:
return return
@ -78,16 +80,14 @@ def run_ops_job_execution(execution_id, **kwargs):
@after_app_ready_start @after_app_ready_start
def clean_celery_periodic_tasks(): def clean_celery_periodic_tasks():
"""清除celery定时任务""" """清除celery定时任务"""
need_cleaned_tasks = [ logger.info('Start clean celery periodic tasks.')
'handle_be_interrupted_change_auth_task_periodic', register_tasks = PeriodicTask.objects.all()
] for task in register_tasks:
logger.info('Start clean celery periodic tasks: {}'.format(need_cleaned_tasks)) if task.task in app.tasks:
for task_name in need_cleaned_tasks:
logger.info('Start clean task: {}'.format(task_name))
task = get_celery_periodic_task(task_name)
if task is None:
logger.info('Task does not exist: {}'.format(task_name))
continue continue
task_name = task.name
logger.info('Start clean task: {}'.format(task_name))
disable_celery_periodic_task(task_name) disable_celery_periodic_task(task_name)
delete_celery_periodic_task(task_name) delete_celery_periodic_task(task_name)
task = get_celery_periodic_task(task_name) task = get_celery_periodic_task(task_name)

View File

@ -80,14 +80,20 @@ def upload_session_replay_to_external_storage(session_id):
return return
@shared_task(verbose_name=_('Run applet host deployment'), activity_callback=lambda did: ([did], )) @shared_task(
verbose_name=_('Run applet host deployment'),
activity_callback=lambda self, did, *args, **kwargs: ([did], )
)
def run_applet_host_deployment(did): def run_applet_host_deployment(did):
with tmp_to_builtin_org(system=1): with tmp_to_builtin_org(system=1):
deployment = AppletHostDeployment.objects.get(id=did) deployment = AppletHostDeployment.objects.get(id=did)
deployment.start() deployment.start()
@shared_task(verbose_name=_('Install applet'), activity_callback=lambda did, applet_id: ([did],)) @shared_task(
verbose_name=_('Install applet'),
activity_callback=lambda self, did, applet_id, *args, **kwargs: ([did],)
)
def run_applet_host_deployment_install_applet(did, applet_id): def run_applet_host_deployment_install_applet(did, applet_id):
with tmp_to_builtin_org(system=1): with tmp_to_builtin_org(system=1):
deployment = AppletHostDeployment.objects.get(id=did) deployment = AppletHostDeployment.objects.get(id=did)