mirror of https://github.com/jumpserver/jumpserver
145 lines
5.1 KiB
Python
145 lines
5.1 KiB
Python
|
||
from itertools import groupby
|
||
from collections import defaultdict
|
||
|
||
from celery import shared_task
|
||
from django.utils.translation import ugettext as _
|
||
|
||
from assets.models import Asset
|
||
from common.utils import get_logger
|
||
from orgs.utils import tmp_to_org, org_aware_func
|
||
from ..models import SystemUser, Connectivity, AuthBook
|
||
from . import const
|
||
from .utils import (
|
||
clean_ansible_task_hosts, group_asset_by_platform
|
||
)
|
||
|
||
logger = get_logger(__name__)
|
||
__all__ = [
|
||
'test_system_user_connectivity_util', 'test_system_user_connectivity_manual',
|
||
'test_system_user_connectivity_period', 'test_system_user_connectivity_a_asset',
|
||
]
|
||
|
||
|
||
def set_assets_accounts_connectivity(system_user, assets, results_summary):
|
||
asset_ids_ok = set()
|
||
asset_ids_failed = set()
|
||
|
||
asset_hostnames_ok = results_summary.get('contacted', {}).keys()
|
||
|
||
for asset in assets:
|
||
if asset.hostname in asset_hostnames_ok:
|
||
asset_ids_ok.add(asset.id)
|
||
else:
|
||
asset_ids_failed.add(asset.id)
|
||
|
||
accounts_ok = AuthBook.objects.filter(asset_id__in=asset_ids_ok, systemuser=system_user)
|
||
accounts_failed = AuthBook.objects.filter(asset_id__in=asset_ids_failed, systemuser=system_user)
|
||
|
||
AuthBook.bulk_set_connectivity(accounts_ok, Connectivity.ok)
|
||
AuthBook.bulk_set_connectivity(accounts_failed, Connectivity.failed)
|
||
|
||
|
||
@org_aware_func("system_user")
|
||
def test_system_user_connectivity_util(system_user, assets, task_name):
|
||
"""
|
||
Test system cant connect his assets or not.
|
||
:param system_user:
|
||
:param assets:
|
||
:param task_name:
|
||
:return:
|
||
"""
|
||
from ops.utils import update_or_create_ansible_task
|
||
|
||
if system_user.username_same_with_user:
|
||
logger.error(_("Dynamic system user not support test"))
|
||
return
|
||
|
||
# hosts = clean_ansible_task_hosts(assets, system_user=system_user)
|
||
# TODO: 这里不传递系统用户,因为clean_ansible_task_hosts会通过system_user来判断是否可以推送,
|
||
# 不符合测试可连接性逻辑, 后面需要优化此逻辑
|
||
hosts = clean_ansible_task_hosts(assets)
|
||
if not hosts:
|
||
return {}
|
||
platform_hosts_map = {}
|
||
hosts_sorted = sorted(hosts, key=group_asset_by_platform)
|
||
platform_hosts = groupby(hosts_sorted, key=group_asset_by_platform)
|
||
for i in platform_hosts:
|
||
platform_hosts_map[i[0]] = list(i[1])
|
||
|
||
platform_tasks_map = {
|
||
"unixlike": const.PING_UNIXLIKE_TASKS,
|
||
"windows": const.PING_WINDOWS_TASKS
|
||
}
|
||
|
||
results_summary = dict(
|
||
contacted=defaultdict(dict), dark=defaultdict(dict), success=True
|
||
)
|
||
|
||
def run_task(_tasks, _hosts, _username):
|
||
old_name = "{}".format(system_user)
|
||
new_name = "{}({})".format(system_user.name, _username)
|
||
_task_name = task_name.replace(old_name, new_name)
|
||
_task, created = update_or_create_ansible_task(
|
||
task_name=_task_name, hosts=_hosts, tasks=_tasks,
|
||
pattern='all', options=const.TASK_OPTIONS,
|
||
run_as=_username,
|
||
)
|
||
raw, summary = _task.run()
|
||
success = summary.get('success', False)
|
||
contacted = summary.get('contacted', {})
|
||
dark = summary.get('dark', {})
|
||
|
||
results_summary['success'] &= success
|
||
results_summary['contacted'].update(contacted)
|
||
results_summary['dark'].update(dark)
|
||
|
||
for platform, _hosts in platform_hosts_map.items():
|
||
if not _hosts:
|
||
continue
|
||
if platform not in ["unixlike", "windows"]:
|
||
continue
|
||
|
||
tasks = platform_tasks_map[platform]
|
||
print(_("Start test system user connectivity for platform: [{}]").format(platform))
|
||
print(_("Hosts count: {}").format(len(_hosts)))
|
||
# 用户名不是动态的,用户名则是一个
|
||
logger.debug("System user not has special auth")
|
||
run_task(tasks, _hosts, system_user.username)
|
||
|
||
set_assets_accounts_connectivity(system_user, hosts, results_summary)
|
||
return results_summary
|
||
|
||
|
||
@shared_task(queue="ansible")
|
||
@org_aware_func("system_user")
|
||
def test_system_user_connectivity_manual(system_user, asset_ids=None):
|
||
task_name = _("Test system user connectivity: {}").format(system_user)
|
||
if asset_ids:
|
||
assets = Asset.objects.filter(id__in=asset_ids)
|
||
else:
|
||
assets = system_user.get_related_assets()
|
||
test_system_user_connectivity_util(system_user, assets, task_name)
|
||
|
||
|
||
@shared_task(queue="ansible")
|
||
@org_aware_func("system_user")
|
||
def test_system_user_connectivity_a_asset(system_user, asset):
|
||
task_name = _("Test system user connectivity: {} => {}").format(
|
||
system_user, asset
|
||
)
|
||
test_system_user_connectivity_util(system_user, [asset], task_name)
|
||
|
||
|
||
@shared_task(queue="ansible")
|
||
def test_system_user_connectivity_period():
|
||
if not const.PERIOD_TASK_ENABLED:
|
||
logger.debug("Period task disabled, test system user connectivity pass")
|
||
return
|
||
queryset_map = SystemUser.objects.all_group_by_org()
|
||
for org, system_user in queryset_map.items():
|
||
task_name = _("Test system user connectivity period: {}").format(system_user)
|
||
with tmp_to_org(org):
|
||
assets = system_user.get_related_assets()
|
||
test_system_user_connectivity_util(system_user, assets, task_name)
|