mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			102 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			102 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
 | 
						|
from collections import defaultdict
 | 
						|
from celery import shared_task
 | 
						|
from django.utils.translation import ugettext as _
 | 
						|
 | 
						|
from common.utils import get_logger
 | 
						|
 | 
						|
from ..models import SystemUser
 | 
						|
from . import const
 | 
						|
from .utils import clean_hosts, clean_hosts_by_protocol
 | 
						|
 | 
						|
logger = get_logger(__name__)
 | 
						|
__all__ = [
 | 
						|
    'test_system_user_connectivity_util', 'test_system_user_connectivity_manual',
 | 
						|
    'test_system_user_connectivity_period', 'test_system_user_connectivity_a_asset',
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
@shared_task(queue="ansible")
 | 
						|
def test_system_user_connectivity_util(system_user, assets, task_name):
 | 
						|
    """
 | 
						|
    Test system cant connect his assets or not.
 | 
						|
    :param system_user:
 | 
						|
    :param assets:
 | 
						|
    :param task_name:
 | 
						|
    :return:
 | 
						|
    """
 | 
						|
    from ops.utils import update_or_create_ansible_task
 | 
						|
 | 
						|
    hosts = clean_hosts(assets)
 | 
						|
    if not hosts:
 | 
						|
        return {}
 | 
						|
 | 
						|
    hosts = clean_hosts_by_protocol(system_user, hosts)
 | 
						|
    if not hosts:
 | 
						|
        return {}
 | 
						|
 | 
						|
    hosts_category = {
 | 
						|
        'linux': {
 | 
						|
            'hosts': [],
 | 
						|
            'tasks': const.TEST_SYSTEM_USER_CONN_TASKS
 | 
						|
        },
 | 
						|
        'windows': {
 | 
						|
            'hosts': [],
 | 
						|
            'tasks': const.TEST_WINDOWS_SYSTEM_USER_CONN_TASKS
 | 
						|
        }
 | 
						|
    }
 | 
						|
    for host in hosts:
 | 
						|
        hosts_list = hosts_category['windows']['hosts'] if host.is_windows() \
 | 
						|
            else hosts_category['linux']['hosts']
 | 
						|
        hosts_list.append(host)
 | 
						|
 | 
						|
    results_summary = dict(
 | 
						|
        contacted=defaultdict(dict), dark=defaultdict(dict), success=True
 | 
						|
    )
 | 
						|
    for k, value in hosts_category.items():
 | 
						|
        if not value['hosts']:
 | 
						|
            continue
 | 
						|
        task, created = update_or_create_ansible_task(
 | 
						|
            task_name=task_name, hosts=value['hosts'], tasks=value['tasks'],
 | 
						|
            pattern='all', options=const.TASK_OPTIONS,
 | 
						|
            run_as=system_user.username, created_by=system_user.org_id,
 | 
						|
        )
 | 
						|
        raw, summary = task.run()
 | 
						|
        success = summary.get('success', False)
 | 
						|
        contacted = summary.get('contacted', {})
 | 
						|
        dark = summary.get('dark', {})
 | 
						|
 | 
						|
        results_summary['success'] &= success
 | 
						|
        results_summary['contacted'].update(contacted)
 | 
						|
        results_summary['dark'].update(dark)
 | 
						|
 | 
						|
    system_user.set_connectivity(results_summary)
 | 
						|
    return results_summary
 | 
						|
 | 
						|
 | 
						|
@shared_task(queue="ansible")
 | 
						|
def test_system_user_connectivity_manual(system_user):
 | 
						|
    task_name = _("Test system user connectivity: {}").format(system_user)
 | 
						|
    assets = system_user.get_all_assets()
 | 
						|
    return test_system_user_connectivity_util(system_user, assets, task_name)
 | 
						|
 | 
						|
 | 
						|
@shared_task(queue="ansible")
 | 
						|
def test_system_user_connectivity_a_asset(system_user, asset):
 | 
						|
    task_name = _("Test system user connectivity: {} => {}").format(
 | 
						|
        system_user, asset
 | 
						|
    )
 | 
						|
    return test_system_user_connectivity_util(system_user, [asset], task_name)
 | 
						|
 | 
						|
 | 
						|
@shared_task(queue="ansible")
 | 
						|
def test_system_user_connectivity_period():
 | 
						|
    if not const.PERIOD_TASK_ENABLED:
 | 
						|
        logger.debug("Period task disabled, test system user connectivity pass")
 | 
						|
        return
 | 
						|
    system_users = SystemUser.objects.all()
 | 
						|
    for system_user in system_users:
 | 
						|
        task_name = _("Test system user connectivity period: {}").format(system_user)
 | 
						|
        assets = system_user.get_all_assets()
 | 
						|
        test_system_user_connectivity_util(system_user, assets, task_name)
 |