from itertools import groupby from collections import defaultdict from celery import shared_task from django.utils.translation import ugettext as _ from assets.models import Asset from common.utils import get_logger from orgs.utils import tmp_to_org, org_aware_func from ..models import SystemUser, Connectivity, AuthBook from . import const from .utils import ( clean_ansible_task_hosts, group_asset_by_platform ) logger = get_logger(__name__) __all__ = [ 'test_system_user_connectivity_util', 'test_system_user_connectivity_manual', 'test_system_user_connectivity_period', 'test_system_user_connectivity_a_asset', 'test_system_users_connectivity_a_asset' ] def set_assets_accounts_connectivity(system_user, assets, results_summary): asset_ids_ok = set() asset_ids_failed = set() asset_hostnames_ok = results_summary.get('contacted', {}).keys() for asset in assets: if asset.hostname in asset_hostnames_ok: asset_ids_ok.add(asset.id) else: asset_ids_failed.add(asset.id) accounts_ok = AuthBook.objects.filter(asset_id__in=asset_ids_ok, systemuser=system_user) accounts_failed = AuthBook.objects.filter(asset_id__in=asset_ids_failed, systemuser=system_user) AuthBook.bulk_set_connectivity(accounts_ok, Connectivity.ok) AuthBook.bulk_set_connectivity(accounts_failed, Connectivity.failed) @org_aware_func("system_user") def test_system_user_connectivity_util(system_user, assets, task_name): """ Test system cant connect his assets or not. :param system_user: :param assets: :param task_name: :return: """ from ops.utils import update_or_create_ansible_task if system_user.username_same_with_user: logger.error(_("Dynamic system user not support test")) return # hosts = clean_ansible_task_hosts(assets, system_user=system_user) # TODO: 这里不传递系统用户,因为clean_ansible_task_hosts会通过system_user来判断是否可以推送, # 不符合测试可连接性逻辑, 后面需要优化此逻辑 hosts = clean_ansible_task_hosts(assets) if not hosts: return {} platform_hosts_map = {} hosts_sorted = sorted(hosts, key=group_asset_by_platform) platform_hosts = groupby(hosts_sorted, key=group_asset_by_platform) for i in platform_hosts: platform_hosts_map[i[0]] = list(i[1]) platform_tasks_map = { "unixlike": const.PING_UNIXLIKE_TASKS, "windows": const.PING_WINDOWS_TASKS } results_summary = dict( contacted=defaultdict(dict), dark=defaultdict(dict), success=True ) def run_task(_tasks, _hosts, _username): old_name = "{}".format(system_user) new_name = "{}({})".format(system_user.name, _username) _task_name = task_name.replace(old_name, new_name) _task, created = update_or_create_ansible_task( task_name=_task_name, hosts=_hosts, tasks=_tasks, pattern='all', options=const.TASK_OPTIONS, run_as=_username, system_user=system_user ) raw, summary = _task.run() success = summary.get('success', False) contacted = summary.get('contacted', {}) dark = summary.get('dark', {}) results_summary['success'] &= success results_summary['contacted'].update(contacted) results_summary['dark'].update(dark) for platform, _hosts in platform_hosts_map.items(): if not _hosts: continue if platform not in ["unixlike", "windows"]: continue tasks = platform_tasks_map[platform] print(_("Start test system user connectivity for platform: [{}]").format(platform)) print(_("Hosts count: {}").format(len(_hosts))) # 用户名不是动态的,用户名则是一个 logger.debug("System user not has special auth") run_task(tasks, _hosts, system_user.username) set_assets_accounts_connectivity(system_user, hosts, results_summary) return results_summary @shared_task(queue="ansible") @org_aware_func("system_user") def test_system_user_connectivity_manual(system_user, asset_ids=None): task_name = _("Test system user connectivity: {}").format(system_user) if asset_ids: assets = Asset.objects.filter(id__in=asset_ids) else: assets = system_user.get_related_assets() test_system_user_connectivity_util(system_user, assets, task_name) @shared_task(queue="ansible") @org_aware_func("system_user") def test_system_user_connectivity_a_asset(system_user, asset): task_name = _("Test system user connectivity: {} => {}").format( system_user, asset ) test_system_user_connectivity_util(system_user, [asset], task_name) @shared_task(queue="ansible") def test_system_users_connectivity_a_asset(system_users, asset): for system_user in system_users: test_system_user_connectivity_a_asset(system_user, asset) @shared_task(queue="ansible") def test_system_user_connectivity_period(): if not const.PERIOD_TASK_ENABLED: logger.debug("Period task disabled, test system user connectivity pass") return queryset_map = SystemUser.objects.all_group_by_org() for org, system_user in queryset_map.items(): task_name = _("Test system user connectivity period: {}").format(system_user) with tmp_to_org(org): assets = system_user.get_related_assets() test_system_user_connectivity_util(system_user, assets, task_name)