perf: 还原任务参数

pull/9449/head
jiangweidong 2023-02-08 11:43:35 +08:00
parent fb285adcce
commit fa54df6d99
12 changed files with 39 additions and 54 deletions

View File

@ -53,9 +53,7 @@ class AccountViewSet(OrgBulkModelViewSet):
account = super().get_object()
account_ids = [account.id]
asset_ids = [account.asset_id]
task = verify_accounts_connectivity.delay(
account_ids, asset_ids, user=request.user
)
task = verify_accounts_connectivity.delay(account_ids, asset_ids)
return Response(data={'task': task.id})

View File

@ -110,7 +110,6 @@ class AutomationExecutionViewSet(
serializer.is_valid(raise_exception=True)
automation = serializer.validated_data.get('automation')
task = execute_automation.delay(
pid=automation.pk, trigger=Trigger.manual,
tp=self.tp, user=request.user
pid=automation.pk, trigger=Trigger.manual, tp=self.tp
)
return Response({'task': task.id}, status=status.HTTP_201_CREATED)

View File

@ -35,7 +35,7 @@ class AutomationExecution(AssetAutomationExecution):
('add_pushaccountexecution', _('Can add push account execution')),
]
def start(self, **kwargs):
def start(self):
from accounts.automations.endpoint import ExecutionManager
manager = ExecutionManager(execution=self)
return manager.run(**kwargs)
return manager.run()

View File

@ -9,7 +9,7 @@ logger = get_logger(__file__)
@shared_task(queue='ansible', verbose_name=_('Account execute automation'))
def execute_automation(pid, trigger, tp, **kwargs):
def execute_automation(pid, trigger, tp):
model = AutomationTypes.get_type_model(tp)
with tmp_to_root_org():
instance = get_object_or_none(model, pk=pid)
@ -17,4 +17,4 @@ def execute_automation(pid, trigger, tp, **kwargs):
logger.error("No automation task found: {}".format(pid))
return
with tmp_to_org(instance.org):
instance.execute(trigger, **kwargs)
instance.execute(trigger)

View File

@ -5,7 +5,7 @@ from assets.tasks.common import generate_data
from common.const.choices import Trigger
def automation_execute_start(task_name, tp, child_snapshot=None, **kwargs):
def automation_execute_start(task_name, tp, child_snapshot=None):
from accounts.models import AutomationExecution
data = generate_data(task_name, tp, child_snapshot)
@ -19,4 +19,4 @@ def automation_execute_start(task_name, tp, child_snapshot=None, **kwargs):
execution = AutomationExecution.objects.create(
trigger=Trigger.manual, **data
)
execution.start(**kwargs)
execution.start()

View File

@ -14,7 +14,7 @@ __all__ = [
]
def verify_connectivity_util(assets, tp, accounts, task_name, **kwargs):
def verify_connectivity_util(assets, tp, accounts, task_name):
if not assets or not accounts:
return
account_usernames = list(accounts.values_list('username', flat=True))
@ -22,30 +22,28 @@ def verify_connectivity_util(assets, tp, accounts, task_name, **kwargs):
'accounts': account_usernames,
'assets': [str(asset.id) for asset in assets],
}
automation_execute_start(task_name, tp, child_snapshot, **kwargs)
automation_execute_start(task_name, tp, child_snapshot)
@org_aware_func("assets")
def verify_accounts_connectivity_util(accounts, assets, task_name, **kwargs):
def verify_accounts_connectivity_util(accounts, assets, task_name):
gateway_assets = assets.filter(platform__name=GATEWAY_NAME)
verify_connectivity_util(
gateway_assets, AutomationTypes.verify_gateway_account,
accounts, task_name, **kwargs
gateway_assets, AutomationTypes.verify_gateway_account, accounts, task_name
)
non_gateway_assets = assets.exclude(platform__name=GATEWAY_NAME)
verify_connectivity_util(
non_gateway_assets, AutomationTypes.verify_account,
accounts, task_name, **kwargs
non_gateway_assets, AutomationTypes.verify_account, accounts, task_name
)
@shared_task(queue="ansible", verbose_name=_('Verify asset account availability'))
def verify_accounts_connectivity(account_ids, asset_ids, **kwargs):
def verify_accounts_connectivity(account_ids, asset_ids):
from assets.models import Asset
from accounts.models import Account, VerifyAccountAutomation
assets = Asset.objects.filter(id__in=asset_ids)
accounts = Account.objects.filter(id__in=account_ids)
task_name = gettext_noop("Verify accounts connectivity")
task_name = VerifyAccountAutomation.generate_unique_name(task_name)
return verify_accounts_connectivity_util(accounts, assets, task_name, **kwargs)
return verify_accounts_connectivity_util(accounts, assets, task_name)

View File

@ -2,8 +2,7 @@
#
import django_filters
from django.db.models import Q
from django.utils.translation import ugettext_lazy as _
from rest_framework.request import Request
from django.utils.translation import gettext as _
from rest_framework.decorators import action
from rest_framework.response import Response
@ -139,21 +138,14 @@ class AssetViewSet(SuggestionMixin, NodeFilterMixin, OrgBulkModelViewSet):
class AssetsTaskMixin:
request: Request
def perform_assets_task(self, serializer):
data = serializer.validated_data
assets = data.get("assets", [])
asset_ids = [asset.id for asset in assets]
user = self.request.user
if data["action"] == "refresh":
task = update_assets_hardware_info_manual.delay(
asset_ids, user=user
)
task = update_assets_hardware_info_manual.delay(asset_ids)
else:
task = test_assets_connectivity_manual.delay(
asset_ids, user=user
)
task = test_assets_connectivity_manual.delay(asset_ids)
return task
def perform_create(self, serializer):

View File

@ -147,7 +147,7 @@ class BasePlaybookManager:
yaml.safe_dump(plays, f)
return sub_playbook_path
def get_runners(self, **kwargs):
def get_runners(self):
runners = []
for platform, assets in self.get_assets_group_by_platform().items():
assets_bulked = [assets[i:i + self.bulk_size] for i in range(0, len(assets), self.bulk_size)]
@ -195,7 +195,7 @@ class BasePlaybookManager:
pass
def run(self, *args, **kwargs):
runners = self.get_runners(user=kwargs.pop('user', None))
runners = self.get_runners()
if len(runners) > 1:
print("### 分批次执行开始任务, 总共 {}\n".format(len(runners)))
else:

View File

@ -76,7 +76,7 @@ class BaseAutomation(PeriodTaskModelMixin, JMSOrgBaseModel):
def executed_amount(self):
return self.executions.count()
def execute(self, trigger=Trigger.manual, **kwargs):
def execute(self, trigger=Trigger.manual):
try:
eid = current_task.request.id
except AttributeError:
@ -86,7 +86,7 @@ class BaseAutomation(PeriodTaskModelMixin, JMSOrgBaseModel):
id=eid, trigger=trigger, automation=self,
snapshot=self.to_attr_json(),
)
return execution.start(**kwargs)
return execution.start()
class AssetBaseAutomation(BaseAutomation):
@ -140,7 +140,7 @@ class AutomationExecution(OrgModelMixin):
return {}
return recipients
def start(self, **kwargs):
def start(self):
from assets.automations.endpoint import ExecutionManager
manager = ExecutionManager(execution=self)
return manager.run(**kwargs)
return manager.run()

View File

@ -29,7 +29,7 @@ def generate_data(task_name, tp, child_snapshot=None):
return {'id': eid, 'snapshot': snapshot}
def automation_execute_start(task_name, tp, child_snapshot=None, **kwargs):
def automation_execute_start(task_name, tp, child_snapshot=None):
from assets.models import AutomationExecution
data = generate_data(task_name, tp, child_snapshot)
@ -43,4 +43,4 @@ def automation_execute_start(task_name, tp, child_snapshot=None, **kwargs):
execution = AutomationExecution.objects.create(
trigger=Trigger.manual, **data
)
execution.start(**kwargs)
execution.start()

View File

@ -17,7 +17,7 @@ __all__ = [
]
def update_fact_util(assets=None, nodes=None, task_name=None, **kwargs):
def update_fact_util(assets=None, nodes=None, task_name=None):
from assets.models import GatherFactsAutomation
if task_name is None:
task_name = gettext_noop("Update some assets hardware info. ")
@ -30,16 +30,16 @@ def update_fact_util(assets=None, nodes=None, task_name=None, **kwargs):
'nodes': [str(node.id) for node in nodes],
}
tp = AutomationTypes.gather_facts
automation_execute_start(task_name, tp, child_snapshot, **kwargs)
automation_execute_start(task_name, tp, child_snapshot)
@org_aware_func('assets')
def update_assets_fact_util(assets=None, task_name=None, **kwargs):
def update_assets_fact_util(assets=None, task_name=None):
if assets is None:
logger.info("No assets to update hardware info")
return
update_fact_util(assets=assets, task_name=task_name, **kwargs)
update_fact_util(assets=assets, task_name=task_name)
@org_aware_func('nodes')
@ -51,11 +51,11 @@ def update_nodes_fact_util(nodes=None, task_name=None):
@shared_task(queue="ansible", verbose_name=_('Manually update the hardware information of assets'))
def update_assets_hardware_info_manual(asset_ids, **kwargs):
def update_assets_hardware_info_manual(asset_ids):
from assets.models import Asset
assets = Asset.objects.filter(id__in=asset_ids)
task_name = gettext_noop("Update assets hardware info: ")
update_assets_fact_util(assets=assets, task_name=task_name, **kwargs)
update_assets_fact_util(assets=assets, task_name=task_name)
@shared_task(queue="ansible", verbose_name=_('Manually update the hardware information of assets under a node'))

View File

@ -17,7 +17,7 @@ __all__ = [
]
def test_connectivity_util(assets, tp, task_name, local_port=None, **kwargs):
def test_connectivity_util(assets, tp, task_name, local_port=None):
if not assets:
return
@ -27,11 +27,11 @@ def test_connectivity_util(assets, tp, task_name, local_port=None, **kwargs):
child_snapshot = {'local_port': local_port}
child_snapshot['assets'] = [str(asset.id) for asset in assets]
automation_execute_start(task_name, tp, child_snapshot, **kwargs)
automation_execute_start(task_name, tp, child_snapshot)
@org_aware_func('assets')
def test_asset_connectivity_util(assets, task_name=None, local_port=None, **kwargs):
def test_asset_connectivity_util(assets, task_name=None, local_port=None):
from assets.models import PingAutomation
if task_name is None:
task_name = gettext_noop("Test assets connectivity ")
@ -40,23 +40,21 @@ def test_asset_connectivity_util(assets, task_name=None, local_port=None, **kwar
gateway_assets = assets.filter(platform__name=GATEWAY_NAME)
test_connectivity_util(
gateway_assets, AutomationTypes.ping_gateway,
task_name, local_port, **kwargs
gateway_assets, AutomationTypes.ping_gateway, task_name, local_port
)
non_gateway_assets = assets.exclude(platform__name=GATEWAY_NAME)
test_connectivity_util(
non_gateway_assets, AutomationTypes.ping,
task_name, **kwargs
non_gateway_assets, AutomationTypes.ping, task_name
)
@shared_task(queue="ansible", verbose_name=_('Manually test the connectivity of a asset'))
def test_assets_connectivity_manual(asset_ids, local_port=None, **kwargs):
def test_assets_connectivity_manual(asset_ids, local_port=None):
from assets.models import Asset
assets = Asset.objects.filter(id__in=asset_ids)
task_name = gettext_noop("Test assets connectivity ")
test_asset_connectivity_util(assets, task_name, local_port, **kwargs)
test_asset_connectivity_util(assets, task_name, local_port)
@shared_task(queue="ansible", verbose_name=_('Manually test the connectivity of assets under a node'))