jumpserver/apps/assets/tasks/system_user_connectivity.py

122 lines
4.2 KiB
Python

from itertools import groupby
from collections import defaultdict
from celery import shared_task
from django.utils.translation import ugettext as _
from common.utils import get_logger
from orgs.utils import tmp_to_org, org_aware_func
from ..models import SystemUser
from . import const
from .utils import (
clean_ansible_task_hosts, group_asset_by_platform
)
logger = get_logger(__name__)
__all__ = [
'test_system_user_connectivity_util', 'test_system_user_connectivity_manual',
'test_system_user_connectivity_period', 'test_system_user_connectivity_a_asset',
]
@org_aware_func("system_user")
def test_system_user_connectivity_util(system_user, assets, task_name):
"""
Test system cant connect his assets or not.
:param system_user:
:param assets:
:param task_name:
:return:
"""
from ops.utils import update_or_create_ansible_task
hosts = clean_ansible_task_hosts(assets, system_user=system_user)
if not hosts:
return {}
platform_hosts_map = {}
hosts_sorted = sorted(hosts, key=group_asset_by_platform)
platform_hosts = groupby(hosts_sorted, key=group_asset_by_platform)
for i in platform_hosts:
platform_hosts_map[i[0]] = list(i[1])
platform_tasks_map = {
"unixlike": const.PING_UNIXLIKE_TASKS,
"windows": const.PING_WINDOWS_TASKS
}
results_summary = dict(
contacted=defaultdict(dict), dark=defaultdict(dict), success=True
)
def run_task(_tasks, _hosts, _username):
old_name = "{}".format(system_user)
new_name = "{}({})".format(system_user.name, _username)
_task_name = task_name.replace(old_name, new_name)
_task, created = update_or_create_ansible_task(
task_name=_task_name, hosts=_hosts, tasks=_tasks,
pattern='all', options=const.TASK_OPTIONS,
run_as=_username,
)
raw, summary = _task.run()
success = summary.get('success', False)
contacted = summary.get('contacted', {})
dark = summary.get('dark', {})
results_summary['success'] &= success
results_summary['contacted'].update(contacted)
results_summary['dark'].update(dark)
for platform, _hosts in platform_hosts_map.items():
if not _hosts:
continue
if platform not in ["unixlike", "windows"]:
continue
tasks = platform_tasks_map[platform]
print(_("Start test system user connectivity for platform: [{}]").format(platform))
print(_("Hosts count: {}").format(len(_hosts)))
# 用户名不是动态的,用户名则是一个
if not system_user.username_same_with_user:
logger.debug("System user not has special auth")
run_task(tasks, _hosts, system_user.username)
# 否则需要多个任务
else:
users = system_user.users.all().values_list('username', flat=True)
print(_("System user is dynamic: {}").format(list(users)))
for username in users:
run_task(tasks, _hosts, username)
system_user.set_connectivity(results_summary)
return results_summary
@shared_task(queue="ansible")
@org_aware_func("system_user")
def test_system_user_connectivity_manual(system_user):
task_name = _("Test system user connectivity: {}").format(system_user)
assets = system_user.get_related_assets()
test_system_user_connectivity_util(system_user, assets, task_name)
@shared_task(queue="ansible")
@org_aware_func("system_user")
def test_system_user_connectivity_a_asset(system_user, asset):
task_name = _("Test system user connectivity: {} => {}").format(
system_user, asset
)
test_system_user_connectivity_util(system_user, [asset], task_name)
@shared_task(queue="ansible")
def test_system_user_connectivity_period():
if not const.PERIOD_TASK_ENABLED:
logger.debug("Period task disabled, test system user connectivity pass")
return
queryset_map = SystemUser.objects.all_group_by_org()
for org, system_user in queryset_map.items():
task_name = _("Test system user connectivity period: {}").format(system_user)
with tmp_to_org(org):
assets = system_user.get_related_assets()
test_system_user_connectivity_util(system_user, assets, task_name)