perf: 优化账号任务

pull/9649/head
ibuler 2023-02-21 13:00:04 +08:00
parent b49b7125b2
commit e67a876513
21 changed files with 147 additions and 118 deletions

View File

@ -1,4 +1,3 @@
from .account import * from .account import *
from .backup import * from .task import *
from .template import * from .template import *
from .gathered_account import *

View File

@ -1,12 +1,11 @@
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.generics import CreateAPIView, ListAPIView from rest_framework.generics import ListAPIView
from rest_framework.response import Response from rest_framework.response import Response
from accounts import serializers from accounts import serializers
from accounts.filters import AccountFilterSet from accounts.filters import AccountFilterSet
from accounts.models import Account from accounts.models import Account
from accounts.tasks import verify_accounts_connectivity_task, push_accounts_to_assets_task
from assets.models import Asset from assets.models import Asset
from authentication.const import ConfirmType from authentication.const import ConfirmType
from common.permissions import UserConfirmation from common.permissions import UserConfirmation
@ -15,7 +14,7 @@ from orgs.mixins.api import OrgBulkModelViewSet
__all__ = [ __all__ = [
'AccountViewSet', 'AccountSecretsViewSet', 'AccountViewSet', 'AccountSecretsViewSet',
'AccountsTaskCreateAPI', 'AccountHistoriesSecretAPI' 'AccountHistoriesSecretAPI'
] ]
from rbac.permissions import RBACPermission from rbac.permissions import RBACPermission
@ -37,6 +36,7 @@ class AccountViewSet(OrgBulkModelViewSet):
def su_from_accounts(self, request, *args, **kwargs): def su_from_accounts(self, request, *args, **kwargs):
account_id = request.query_params.get('account') account_id = request.query_params.get('account')
asset_id = request.query_params.get('asset') asset_id = request.query_params.get('asset')
if account_id: if account_id:
account = get_object_or_404(Account, pk=account_id) account = get_object_or_404(Account, pk=account_id)
accounts = account.get_su_from_accounts() accounts = account.get_su_from_accounts()
@ -75,39 +75,3 @@ class AccountHistoriesSecretAPI(RecordViewLogMixin, ListAPIView):
def get_queryset(self): def get_queryset(self):
return self.model.objects.filter(id=self.kwargs.get('pk')) return self.model.objects.filter(id=self.kwargs.get('pk'))
class AccountsTaskCreateAPI(CreateAPIView):
serializer_class = serializers.AccountTaskSerializer
search_fields = AccountViewSet.search_fields
filterset_class = AccountViewSet.filterset_class
def check_permissions(self, request):
return request.user.has_perm('assets.test_assetconnectivity')
def get_accounts(self):
queryset = Account.objects.all()
queryset = self.filter_queryset(queryset)
return queryset
def perform_create(self, serializer):
data = serializer.validated_data
accounts = data.get('accounts')
account_ids = accounts.values_list('id', flat=True)
asset_ids = [account.asset_id for account in accounts]
if data['action'] == 'push':
task = push_accounts_to_assets_task.delay(account_ids, asset_ids)
else:
task = verify_accounts_connectivity_task.delay(account_ids, asset_ids)
data = getattr(serializer, '_data', {})
data["task"] = task.id
setattr(serializer, '_data', data)
return task
def get_exception_handler(self):
def handler(e, context):
return Response({"error": str(e)}, status=400)
return handler

View File

@ -1,42 +0,0 @@
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.utils.translation import ugettext_lazy as _
from accounts import serializers
from accounts.const import Source
from accounts.models import GatheredAccount
from accounts.filters import GatheredAccountFilterSet
from orgs.mixins.api import OrgBulkModelViewSet
__all__ = [
'GatheredAccountViewSet',
]
class GatheredAccountViewSet(OrgBulkModelViewSet):
model = GatheredAccount
search_fields = ('username',)
filterset_class = GatheredAccountFilterSet
serializer_classes = {
'default': serializers.GatheredAccountSerializer,
}
rbac_perms = {
'sync_account': 'assets.add_gatheredaccount',
}
@action(methods=['post'], detail=True, url_path='sync')
def sync_account(self, request, *args, **kwargs):
gathered_account = super().get_object()
asset = gathered_account.asset
username = gathered_account.username
accounts = asset.accounts.filter(username=username)
if accounts.exists():
accounts.update(source=Source.COLLECTED)
else:
asset.accounts.model.objects.create(
asset=asset, username=username,
name=f'{username}-{_("Collected")}',
source=Source.COLLECTED
)
return Response(status=status.HTTP_201_CREATED)

View File

@ -0,0 +1,37 @@
from rest_framework.generics import CreateAPIView
from rest_framework.response import Response
from accounts import serializers
from accounts.tasks import verify_accounts_connectivity_task, push_accounts_to_assets_task
__all__ = [
'AccountsTaskCreateAPI',
]
class AccountsTaskCreateAPI(CreateAPIView):
serializer_class = serializers.AccountTaskSerializer
def check_permissions(self, request):
return request.user.has_perm('assets.test_assetconnectivity')
def perform_create(self, serializer):
data = serializer.validated_data
accounts = data.get('accounts', [])
account_ids = [a.id for a in accounts]
if data['action'] == 'push':
task = push_accounts_to_assets_task.delay(account_ids)
else:
task = verify_accounts_connectivity_task.delay(account_ids)
data = getattr(serializer, '_data', {})
data["task"] = task.id
setattr(serializer, '_data', data)
return task
def get_exception_handler(self):
def handler(e, context):
return Response({"error": str(e)}, status=400)
return handler

View File

@ -1,3 +1,4 @@
from .backup import *
from .base import * from .base import *
from .change_secret import * from .change_secret import *
from .gather_accounts import * from .gather_accounts import *

View File

@ -1,13 +1,22 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
from django.utils.translation import ugettext_lazy as _
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
from accounts import serializers from accounts import serializers
from accounts.const import AutomationTypes from accounts.const import AutomationTypes
from accounts.const import Source
from accounts.filters import GatheredAccountFilterSet
from accounts.models import GatherAccountsAutomation from accounts.models import GatherAccountsAutomation
from accounts.models import GatheredAccount
from orgs.mixins.api import OrgBulkModelViewSet from orgs.mixins.api import OrgBulkModelViewSet
from .base import AutomationExecutionViewSet from .base import AutomationExecutionViewSet
__all__ = [ __all__ = [
'GatherAccountsAutomationViewSet', 'GatherAccountsExecutionViewSet' 'GatherAccountsAutomationViewSet', 'GatherAccountsExecutionViewSet',
'GatheredAccountViewSet'
] ]
@ -31,3 +40,32 @@ class GatherAccountsExecutionViewSet(AutomationExecutionViewSet):
queryset = super().get_queryset() queryset = super().get_queryset()
queryset = queryset.filter(automation__type=self.tp) queryset = queryset.filter(automation__type=self.tp)
return queryset return queryset
class GatheredAccountViewSet(OrgBulkModelViewSet):
model = GatheredAccount
search_fields = ('username',)
filterset_class = GatheredAccountFilterSet
serializer_classes = {
'default': serializers.GatheredAccountSerializer,
}
rbac_perms = {
'sync_account': 'assets.add_gatheredaccount',
}
@action(methods=['post'], detail=True, url_path='sync')
def sync_account(self, request, *args, **kwargs):
gathered_account = super().get_object()
asset = gathered_account.asset
username = gathered_account.username
accounts = asset.accounts.filter(username=username)
if accounts.exists():
accounts.update(source=Source.COLLECTED)
else:
asset.accounts.model.objects.create(
asset=asset, username=username,
name=f'{username}-{_("Collected")}',
source=Source.COLLECTED
)
return Response(status=status.HTTP_201_CREATED)

View File

@ -17,19 +17,19 @@ class VerifyHostCallbackMixin:
def host_callback(self, host, asset=None, account=None, automation=None, path_dir=None, **kwargs): def host_callback(self, host, asset=None, account=None, automation=None, path_dir=None, **kwargs):
host = super().host_callback( host = super().host_callback(
host, asset=asset, account=account, automation=automation, host, asset=asset, account=account,
path_dir=path_dir, **kwargs automation=automation, path_dir=path_dir, **kwargs
) )
if host.get('error'): if host.get('error'):
return host return host
accounts = asset.accounts.all() accounts = asset.accounts.all()
accounts = self.get_accounts(account, accounts) accounts = self.get_accounts(account, accounts)
inventory_hosts = [] inventory_hosts = []
for account in accounts: for account in accounts:
h = deepcopy(host) h = deepcopy(host)
h['name'] += '_' + account.username h['name'] += '(' + account.username + ')'
self.host_account_mapper[h['name']] = account self.host_account_mapper[h['name']] = account
secret = account.secret secret = account.secret

View File

@ -93,7 +93,7 @@ class ChangeSecretManager(AccountBasePlaybookManager):
host['secret_type'] = self.secret_type host['secret_type'] = self.secret_type
for account in accounts: for account in accounts:
h = deepcopy(host) h = deepcopy(host)
h['name'] += '_' + account.username h['name'] += '(' + account.username + ')'
new_secret = self.get_secret() new_secret = self.get_secret()
recorder = ChangeSecretRecord( recorder = ChangeSecretRecord(

View File

@ -63,7 +63,7 @@ class PushAccountManager(ChangeSecretManager, AccountBasePlaybookManager):
host['secret_type'] = self.secret_type host['secret_type'] = self.secret_type
for account in accounts: for account in accounts:
h = deepcopy(host) h = deepcopy(host)
h['name'] += '_' + account.username h['name'] += '(' + account.username + ')'
new_secret = self.get_secret() new_secret = self.get_secret()
self.name_recorder_mapper[h['name']] = { self.name_recorder_mapper[h['name']] = {

View File

@ -9,7 +9,7 @@ def quickstart_automation_by_snapshot(task_name, tp, task_snapshot=None):
data = generate_automation_execution_data(task_name, tp, task_snapshot) data = generate_automation_execution_data(task_name, tp, task_snapshot)
pk = data['id'] pk = data['id']
if AutomationExecution.objects.exists(id=pk): if AutomationExecution.objects.filter(id=pk).exists():
data['id'] = str(uuid.uuid4()) data['id'] = str(uuid.uuid4())
execution = AutomationExecution.objects.create( execution = AutomationExecution.objects.create(

View File

@ -1,5 +1,4 @@
from celery import shared_task from celery import shared_task
from collections import defaultdict
from django.utils.translation import gettext_noop, ugettext_lazy as _ from django.utils.translation import gettext_noop, ugettext_lazy as _
from accounts.const import AutomationTypes from accounts.const import AutomationTypes
@ -21,20 +20,15 @@ def push_accounts_to_assets_task(account_ids):
from accounts.models import Account from accounts.models import Account
accounts = Account.objects.filter(id__in=account_ids) accounts = Account.objects.filter(id__in=account_ids)
task_name = gettext_noop("Push accounts to assets") task_name = gettext_noop("Push accounts to assets")
task_name = PushAccountAutomation.generate_unique_name(task_name) task_name = PushAccountAutomation.generate_unique_name(task_name)
account_asset_mapper = defaultdict(set)
for account in accounts: for account in accounts:
account_asset_mapper[account.username].add(account.asset)
for username, assets in account_asset_mapper.items():
task_snapshot = { task_snapshot = {
'secret': account.secret, 'secret': account.secret,
'secret_type': account.secret_type, 'secret_type': account.secret_type,
'accounts': [account.username], 'accounts': [account.username],
'assets': asset_ids, 'assets': [str(account.asset_id)],
} }
tp = AutomationTypes.push_account tp = AutomationTypes.push_account
quickstart_automation_by_snapshot(task_name, tp, task_snapshot) quickstart_automation_by_snapshot(task_name, tp, task_snapshot)

View File

@ -26,15 +26,22 @@ def verify_connectivity_util(assets, tp, accounts, task_name):
@org_aware_func("assets") @org_aware_func("assets")
def verify_accounts_connectivity_util(accounts, assets, task_name): def verify_accounts_connectivity_util(accounts, task_name):
gateway_assets = assets.filter(platform__name=GATEWAY_NAME) from assets.models import Asset
asset_ids = [a.asset_id for a in accounts]
assets = Asset.objects.filter(id__in=asset_ids)
gateways = assets.filter(platform__name=GATEWAY_NAME)
verify_connectivity_util( verify_connectivity_util(
gateway_assets, AutomationTypes.verify_gateway_account, accounts, task_name gateways, AutomationTypes.verify_gateway_account,
accounts, task_name
) )
non_gateway_assets = assets.exclude(platform__name=GATEWAY_NAME) common_assets = assets.exclude(platform__name=GATEWAY_NAME)
verify_connectivity_util( verify_connectivity_util(
non_gateway_assets, AutomationTypes.verify_account, accounts, task_name common_assets, AutomationTypes.verify_account,
accounts, task_name
) )
@ -42,11 +49,9 @@ def verify_accounts_connectivity_util(accounts, assets, task_name):
queue="ansible", verbose_name=_('Verify asset account availability'), queue="ansible", verbose_name=_('Verify asset account availability'),
activity_callback=lambda self, account_ids, asset_ids: (account_ids, None) activity_callback=lambda self, account_ids, asset_ids: (account_ids, None)
) )
def verify_accounts_connectivity_task(account_ids, asset_ids): def verify_accounts_connectivity_task(account_ids):
from assets.models import Asset
from accounts.models import Account, VerifyAccountAutomation from accounts.models import Account, VerifyAccountAutomation
assets = Asset.objects.filter(id__in=asset_ids)
accounts = Account.objects.filter(id__in=account_ids) accounts = Account.objects.filter(id__in=account_ids)
task_name = gettext_noop("Verify accounts connectivity") task_name = gettext_noop("Verify accounts connectivity")
task_name = VerifyAccountAutomation.generate_unique_name(task_name) task_name = VerifyAccountAutomation.generate_unique_name(task_name)
return verify_accounts_connectivity_util(accounts, assets, task_name) return verify_accounts_connectivity_util(accounts, task_name)

View File

@ -103,7 +103,7 @@ class NodeAddAssetsApi(generics.UpdateAPIView):
instance = None instance = None
permission_classes = (RBACPermission,) permission_classes = (RBACPermission,)
rbac_perms = { rbac_perms = {
'PUT': 'assets.add_assettonode', 'PUT': 'assets.change_assettonode',
} }
def perform_update(self, serializer): def perform_update(self, serializer):
@ -118,7 +118,7 @@ class NodeRemoveAssetsApi(generics.UpdateAPIView):
instance = None instance = None
permission_classes = (RBACPermission,) permission_classes = (RBACPermission,)
rbac_perms = { rbac_perms = {
'PUT': 'assets.remove_assetfromnode', 'PUT': 'assets.change_assetfromnode',
} }
def perform_update(self, serializer): def perform_update(self, serializer):
@ -140,7 +140,7 @@ class MoveAssetsToNodeApi(generics.UpdateAPIView):
instance = None instance = None
permission_classes = (RBACPermission,) permission_classes = (RBACPermission,)
rbac_perms = { rbac_perms = {
'PUT': 'assets.move_assettonode', 'PUT': 'assets.change_assettonode',
} }
def perform_update(self, serializer): def perform_update(self, serializer):

View File

@ -62,6 +62,8 @@ class BasePlaybookManager:
) )
if not os.path.exists(path): if not os.path.exists(path):
os.makedirs(path, exist_ok=True, mode=0o755) os.makedirs(path, exist_ok=True, mode=0o755)
if settings.DEBUG_DEV:
logger.debug('Ansible runtime dir: {}'.format(path))
return path return path
@staticmethod @staticmethod

View File

@ -33,7 +33,7 @@ class PingGatewayManager:
err = _('No account') err = _('No account')
return False, err return False, err
print('Test account: {}'.format(account)) print('- ' + _('Asset, {}, using account {}').format(gateway, account))
try: try:
proxy.connect( proxy.connect(
gateway.address, gateway.address,

View File

@ -0,0 +1,17 @@
# Generated by Django 3.2.14 on 2023-02-21 04:55
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('assets', '0110_auto_20230220_1051'),
]
operations = [
migrations.AlterModelOptions(
name='asset',
options={'ordering': ['name'], 'permissions': [('refresh_assethardwareinfo', 'Can refresh asset hardware info'), ('test_assetconnectivity', 'Can test asset connectivity'), ('push_assetaccount', 'Can push account to asset'), ('test_account', 'Can verify account'), ('match_asset', 'Can match asset'), ('change_assettonode', 'Can change asset nodes')], 'verbose_name': 'Asset'},
),
]

View File

@ -284,7 +284,5 @@ class Asset(NodesRelationMixin, AbsConnectivity, JMSOrgBaseModel):
('push_assetaccount', _('Can push account to asset')), ('push_assetaccount', _('Can push account to asset')),
('test_account', _('Can verify account')), ('test_account', _('Can verify account')),
('match_asset', _('Can match asset')), ('match_asset', _('Can match asset')),
('add_assettonode', _('Add asset to node')), ('change_assettonode', _('Can change asset nodes')),
('move_assettonode', _('Move asset to node')),
('remove_assetfromnode', _('Remove asset from node'))
] ]

View File

@ -30,11 +30,11 @@ class Domain(JMSOrgBaseModel):
def random_gateway(self): def random_gateway(self):
gateways = [gw for gw in self.active_gateways if gw.is_connective] gateways = [gw for gw in self.active_gateways if gw.is_connective]
if not gateways: if not gateways:
logger.warn(f'Gateway all bad. domain={self}, gateway_num={len(gateways)}.')
gateways = self.active_gateways gateways = self.active_gateways
if not gateways: if not gateways:
logger.warn(f'Not active gateway. domain={self}') logger.warn(f'Not active gateway, domain={self}, pass')
return None return None
return random.choice(gateways) return random.choice(gateways)

View File

@ -101,7 +101,7 @@ class JMSInventory:
def asset_to_host(self, asset, account, automation, protocols, platform): def asset_to_host(self, asset, account, automation, protocols, platform):
host = { host = {
'name': '{}'.format(asset.name), 'name': '{}'.format(asset.name.replace(' ', '_')),
'jms_asset': { 'jms_asset': {
'id': str(asset.id), 'name': asset.name, 'address': asset.address, 'id': str(asset.id), 'name': asset.name, 'address': asset.address,
'type': asset.type, 'category': asset.category, 'type': asset.type, 'category': asset.category,

View File

@ -46,9 +46,25 @@ def sync_registered_tasks(*args, **kwargs):
@receiver(django_ready) @receiver(django_ready)
def check_registered_tasks(*args, **kwargs): def check_registered_tasks(*args, **kwargs):
attrs = ['verbose_name', 'activity_callback'] attrs = ['verbose_name', 'activity_callback']
ignores = [
'users.tasks.check_user_expired_periodic', 'ops.tasks.clean_celery_periodic_tasks',
'terminal.tasks.delete_terminal_status_period', 'ops.tasks.check_server_performance_period',
'settings.tasks.ldap.import_ldap_user', 'users.tasks.check_password_expired',
'assets.tasks.nodes_amount.check_node_assets_amount_task', 'notifications.notifications.publish_task',
'perms.tasks.check_asset_permission_will_expired',
'ops.tasks.create_or_update_registered_periodic_tasks', 'perms.tasks.check_asset_permission_expired',
'settings.tasks.ldap.import_ldap_user_periodic', 'users.tasks.check_password_expired_periodic',
'common.utils.verify_code.send_async', 'assets.tasks.nodes_amount.check_node_assets_amount_period_task',
'users.tasks.check_user_expired', 'orgs.tasks.refresh_org_cache_task',
'terminal.tasks.upload_session_replay_to_external_storage', 'terminal.tasks.clean_orphan_session',
'audits.tasks.clean_audits_log_period', 'authentication.tasks.clean_django_sessions'
]
for name, task in app.tasks.items(): for name, task in app.tasks.items():
if name.startswith('celery.'): if name.startswith('celery.'):
continue continue
if name in ignores:
continue
for attr in attrs: for attr in attrs:
if not hasattr(task, attr): if not hasattr(task, attr):
print('>>> Task {} has no attribute {}'.format(name, attr)) print('>>> Task {} has no attribute {}'.format(name, attr))