From b97d5b096023dce5143a39fdd5766348a008f122 Mon Sep 17 00:00:00 2001 From: ibuler Date: Fri, 15 Dec 2017 15:50:15 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]=20assets=20task=20=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/assets/const.py | 58 +++- apps/assets/models/asset.py | 4 +- apps/assets/serializers.py | 6 +- apps/assets/tasks.py | 368 ++++++++++++++------------ apps/assets/views/asset.py | 77 ++++-- apps/common/imexp.py | 331 ----------------------- apps/ops/ansible/callback.py | 12 +- apps/ops/models.py | 90 +++++-- apps/ops/templates/ops/task_list.html | 10 +- apps/ops/utils.py | 98 ++++--- 10 files changed, 460 insertions(+), 594 deletions(-) delete mode 100644 apps/common/imexp.py diff --git a/apps/assets/const.py b/apps/assets/const.py index 820d07b65..1dc9e5a20 100644 --- a/apps/assets/const.py +++ b/apps/assets/const.py @@ -1,9 +1,57 @@ # -*- coding: utf-8 -*- # -ADMIN_USER_CONN_CACHE_KEY_PREFIX = "ADMIN_USER_CONN_" -SYSTEM_USER_CONN_CACHE_KEY_PREFIX = "SYSTEM_USER_CONN_" +PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" +PUSH_SYSTEM_USER_PERIOD_TASK_NAME = "PUSH-SYSTEM-USER-PERIOD" +PUSH_SYSTEM_USER_TASK_NAME = "PUSH-SYSTEM-USER-TO-CLUSTER-{}" +PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}" + + +UPDATE_ASSETS_HARDWARE_TASK_NAME = 'UPDATE-ASSETS-HARDWARE-INFO' UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY" -TEST_ADMIN_USER_CONNECTABILITY_PEROID_KEY = "TEST_ADMIN_USER_CONNECTABILITY_KEY" -TEST_SYSTEM_USER_CONNECTABILITY_PEROID_KEY = "TEST_SYSTEM_USER_CONNECTABILITY_PEROID_KEY" -PUSH_SYSTEM_USER_PERIOD_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" +UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME = 'UPDATE-ASSETS-HARDWARE-INFO-PERIOD' +UPDATE_ASSETS_HARDWARE_TASKS = [ + { + 'name': UPDATE_ASSETS_HARDWARE_TASK_NAME, + 'action': { + 'module': 'setup' + } + } +] + +TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY" +TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = "TEST_ADMIN_USER_CONN_PERIOD_TASK" +TEST_ADMIN_USER_CONN_TASK_NAME = "TEST-ADMIN-USER-CONN-{}" +TEST_ADMIN_USER_CONN_LOCK_KEY = TEST_ADMIN_USER_CONN_TASK_NAME +ADMIN_USER_CONN_CACHE_KEY = "ADMIN_USER_CONN_{}" +TEST_ADMIN_USER_CONN_TASKS = [ + { + "name": "TEST_ADMIN_CONNECTIVE", + "action": { + "module": "ping", + } + } +] + +ASSET_ADMIN_CONN_CACHE_KEY = "ASSET_ADMIN_USER_CONN_{}" +TEST_ASSET_CONN_TASK_NAME = "ASSET_CONN_TEST_MANUAL" + +TEST_SYSTEM_USER_CONN_PERIOD_LOCK_KEY = "TEST_SYSTEM_USER_CONN_PERIOD_KEY" +TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = "TEST-SYSTEM-USER-CONN-PERIOD-TASK" +TEST_SYSTEM_USER_CONN_CACHE_KEY_PREFIX = "SYSTEM_USER_CONN_" +TEST_SYSTEM_USER_CONN_TASK_NAME = "TEST-ADMIN-USER-CONN-{}" +TEST_SYSTEM_USER_CONN_LOCK_KEY = "TEST_SYSTEM_USER_CONN_{}" +SYSTEM_USER_CONN_CACHE_KEY = "SYSTEM_USER_CONN_{}" +TEST_SYSTEM_USER_CONN_TASKS = [ + { + "name": "TEST_SYSTEM_USER_CONNECTIVE", + "action": { + "module": "ping", + } + } +] + +TASK_OPTIONS = { + 'timeout': 60, + 'forks': 10, +} diff --git a/apps/assets/models/asset.py b/apps/assets/models/asset.py index c2622511b..f606870de 100644 --- a/apps/assets/models/asset.py +++ b/apps/assets/models/asset.py @@ -9,7 +9,7 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from django.core.cache import cache -from ..const import ADMIN_USER_CONN_CACHE_KEY_PREFIX +from ..const import ASSET_ADMIN_CONN_CACHE_KEY from .cluster import Cluster from .group import AssetGroup from .user import AdminUser, SystemUser @@ -110,7 +110,7 @@ class Asset(models.Model): @property def is_connective(self): - val = cache.get(ADMIN_USER_CONN_CACHE_KEY_PREFIX + self.hostname) + val = cache.get(ASSET_ADMIN_CONN_CACHE_KEY.format(self.hostname)) if val == 1: return True else: diff --git a/apps/assets/serializers.py b/apps/assets/serializers.py index 50cc091e1..4f5e5aa5f 100644 --- a/apps/assets/serializers.py +++ b/apps/assets/serializers.py @@ -5,7 +5,7 @@ from rest_framework_bulk.serializers import BulkListSerializer from common.mixins import BulkSerializerMixin from .models import AssetGroup, Asset, Cluster, AdminUser, SystemUser -from .tasks import SYSTEM_USER_CONN_CACHE_KEY_PREFIX, ADMIN_USER_CONN_CACHE_KEY_PREFIX +from .const import ADMIN_USER_CONN_CACHE_KEY, SYSTEM_USER_CONN_CACHE_KEY class AssetGroupSerializer(BulkSerializerMixin, serializers.ModelSerializer): @@ -73,7 +73,7 @@ class AdminUserSerializer(serializers.ModelSerializer): @staticmethod def get_unreachable_amount(obj): - data = cache.get(ADMIN_USER_CONN_CACHE_KEY_PREFIX + obj.name) + data = cache.get(ADMIN_USER_CONN_CACHE_KEY.format(obj.name)) if data: return len(data.get('dark')) else: @@ -98,7 +98,7 @@ class SystemUserSerializer(serializers.ModelSerializer): @staticmethod def get_unreachable_amount(obj): - data = cache.get(SYSTEM_USER_CONN_CACHE_KEY_PREFIX + obj.name) + data = cache.get(SYSTEM_USER_CONN_CACHE_KEY.format(obj.name)) if data: return len(data.get('dark')) else: diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index e459039c6..ddd5436d1 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -10,9 +10,7 @@ from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger from common.celery import app as celery_app from .models import SystemUser, AdminUser, Asset -from .const import ADMIN_USER_CONN_CACHE_KEY_PREFIX, SYSTEM_USER_CONN_CACHE_KEY_PREFIX, \ - UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY, TEST_ADMIN_USER_CONNECTABILITY_PEROID_KEY, \ - TEST_SYSTEM_USER_CONNECTABILITY_PEROID_KEY, PUSH_SYSTEM_USER_PERIOD_KEY +from . import const from .signals import on_app_ready @@ -22,32 +20,14 @@ logger = get_logger(__file__) CACHE_MAX_TIME = 60*60*60 -@shared_task -def update_assets_hardware_info(assets): - """ - Using ansible api to update asset hardware info - :param assets: asset seq - :return: result summary ['contacted': {}, 'dark': {}] - """ - from ops.utils import run_adhoc - name = "GET_ASSETS_HARDWARE_INFO" - tasks = [ - { - 'name': name, - 'action': { - 'module': 'setup' - } - } - ] - hostname_list = [asset.hostname for asset in assets] - result = run_adhoc(hostname_list, pattern='all', tasks=tasks, - name=name, run_as_admin=True) - summary, result_raw = result.results_summary, result.results_raw +def _update_asset_info(result_raw): + assets_updated = [] for hostname, info in result_raw['ok'].items(): if info: - info = info[name]['ansible_facts'] + info = info[const.UPDATE_ASSETS_HARDWARE_TASK_NAME]['ansible_facts'] else: continue + asset = get_object_or_none(Asset, hostname=hostname) if not asset: continue @@ -81,12 +61,31 @@ def update_assets_hardware_info(assets): if k.startswith('___'): setattr(asset, k.strip('_'), v) asset.save() + assets_updated.append(asset) + return assets_updated - for hostname, task in summary['dark'].items(): - logger.error("Update {} hardware info error: {}".format( - hostname, task[name], - )) +@shared_task +def update_assets_hardware_info(assets, task_name=None): + """ + Using ansible api to update asset hardware info + :param assets: asset seq + :param task_name: task_name running + :return: result summary ['contacted': {}, 'dark': {}] + """ + from ops.utils import create_or_update_task + if task_name is None: + task_name = const.UPDATE_ASSETS_HARDWARE_TASK_NAME + tasks = const.UPDATE_ASSETS_HARDWARE_TASKS + hostname_list = [asset.hostname for asset in assets] + task = create_or_update_task( + task_name, hosts=hostname_list, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + ) + result = task.run() + summary, result_raw = result.results_summary, result.results_raw + # TOdo: may be somewhere using + assets_updated = _update_asset_info(result_raw) return summary @@ -96,129 +95,142 @@ def update_assets_hardware_period(): Update asset hardware period task :return: """ - if cache.get(UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1: - logger.debug("Update asset hardware period task is running, passed") + task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME + if cache.get(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1: + msg = "Task {} is running or before long, passed this time".format( + task_name + ) + logger.debug(msg) return {} - try: - cache.set(UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) - assets = Asset.objects.filter(type__in=['Server', 'VM']) - return update_assets_hardware_info(assets) - finally: - cache.set(UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY, 0) + # Todo: set cache but not update, because we want also set it to as a + # minimum update time too + cache.set(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) + assets = Asset.objects.filter(type__in=['Server', 'VM']) + return update_assets_hardware_info(assets, task_name=task_name) @shared_task -def test_admin_user_connectability(admin_user): +def test_admin_user_connectability(admin_user, force=False): """ Test asset admin user can connect or not. Using ansible api do that :param admin_user: + :param force: Force update :return: """ - from ops.utils import run_adhoc + from ops.utils import create_or_update_task + + task_name = const.TEST_ADMIN_USER_CONN_TASK_NAME.format(admin_user.name) + lock_key = const.TEST_ADMIN_USER_CONN_LOCK_KEY.format(admin_user.name) + + if cache.get(lock_key, 0) == 1 and not force: + logger.debug("Task {} is running or before along, passed this time") + return {} assets = admin_user.get_related_assets() hosts = [asset.hostname for asset in assets] - tasks = [ - { - "name": "TEST_ADMIN_CONNECTIVE", - "action": { - "module": "ping", - } - } - ] - result = run_adhoc(hosts, tasks=tasks, pattern="all", run_as_admin=True) + tasks = const.TEST_ADMIN_USER_CONN_TASKS + task = create_or_update_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + ) + cache.set(lock_key, 1, CACHE_MAX_TIME) + result = task.run() + cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user.name) + cache.set(cache_key, result.results_summary, CACHE_MAX_TIME) + + for i in result.results_summary.get('contacted', []): + asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) + cache.set(asset_conn_cache_key, 1, CACHE_MAX_TIME) + + for i, msg in result.results_summary.get('dark', {}).items(): + asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) + cache.set(asset_conn_cache_key, 0, CACHE_MAX_TIME) + logger.error(msg) + return result.results_summary @shared_task def test_admin_user_connectability_period(): - # assets = Asset.objects.filter(type__in=['Server', 'VM']) - if cache.get(TEST_ADMIN_USER_CONNECTABILITY_PEROID_KEY) == 1: - logger.debug("Test admin user connectablity period task is running, passed") + if cache.get(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY) == 1: + msg = "{} task is running or before long, passed this time".format( + const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME + ) + logger.debug(msg) return - logger.debug("Test admin user connectablity period task start") - try: - cache.set(TEST_ADMIN_USER_CONNECTABILITY_PEROID_KEY, 1, CACHE_MAX_TIME) - admin_users = AdminUser.objects.all() - for admin_user in admin_users: - summary = test_admin_user_connectability(admin_user) - - cache.set(ADMIN_USER_CONN_CACHE_KEY_PREFIX + admin_user.name, summary, 60*60*60) - for i in summary['contacted']: - cache.set(ADMIN_USER_CONN_CACHE_KEY_PREFIX + i, 1, 60*60*60) - - for i, error in summary['dark'].items(): - cache.set(ADMIN_USER_CONN_CACHE_KEY_PREFIX + i, 0, 60*60*60) - logger.error(error) - finally: - cache.set(TEST_ADMIN_USER_CONNECTABILITY_PEROID_KEY, 0) + logger.debug("Task {} start".format(const.TEST_ADMIN_USER_CONN_TASK_NAME)) + cache.set(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) + admin_users = AdminUser.objects.all() + for admin_user in admin_users: + test_admin_user_connectability(admin_user) @shared_task -def test_admin_user_connectability_manual(asset): - from ops.utils import run_adhoc - # assets = Asset.objects.filter(type__in=['Server', 'VM']) +def test_admin_user_connectability_manual(asset, task_name=None): + from ops.utils import create_or_update_task + if task_name is None: + task_name = const.TEST_ASSET_CONN_TASK_NAME hosts = [asset.hostname] - tasks = [ - { - "name": "TEST_ADMIN_CONNECTIVE", - "action": { - "module": "ping", - } - } - ] - result = run_adhoc(hosts, tasks=tasks, pattern="all", run_as_admin=True) + tasks = const.TEST_ADMIN_USER_CONN_TASKS + task = create_or_update_task(task_name, tasks=tasks, hosts=hosts) + result = task.run() + if result.results_summary['dark']: - cache.set(ADMIN_USER_CONN_CACHE_KEY_PREFIX + asset.hostname, 0, 60*60*60) + cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, CACHE_MAX_TIME) return False else: - cache.set(ADMIN_USER_CONN_CACHE_KEY_PREFIX + asset.hostname, 1, 60*60* 60) + cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, CACHE_MAX_TIME) return True @shared_task -def test_system_user_connectability(system_user): +def test_system_user_connectability(system_user, force=False): """ Test system cant connect his assets or not. :param system_user: + :param force :return: """ - from ops.utils import run_adhoc + from ops.utils import create_or_update_task + lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY.format(system_user.name) + task_name = const.TEST_SYSTEM_USER_CONN_TASK_NAME + if cache.get(lock_key, 0) == 1 and not force: + logger.debug("Task {} is running or before long, passed this time".format(task_name)) + return {} assets = system_user.get_clusters_assets() hosts = [asset.hostname for asset in assets] - tasks = [ - { - "name": "TEST_SYSTEM_USER_CONNECTIVE", - "action": { - "module": "ping", - } - } - ] - result = run_adhoc(hosts, tasks=tasks, pattern="all", run_as=system_user.name) + tasks = const.TEST_SYSTEM_USER_CONN_TASKS + task = create_or_update_task( + task_name, hosts=hosts, tasks=tasks, options=const.TASK_OPTIONS, + run_as=system_user.name, created_by="System", + ) + cache.set(lock_key, 1, CACHE_MAX_TIME) + result = task.run() + cache_key = const.SYSTEM_USER_CONN_CACHE_KEY + cache.set(cache_key, result.results_summary, CACHE_MAX_TIME) return result.results_summary @shared_task def test_system_user_connectability_period(): - if cache.get(TEST_SYSTEM_USER_CONNECTABILITY_PEROID_KEY) == 1: - logger.debug("Test admin user connectablity period task is running, passed") + lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY + if cache.get(lock_key) == 1: + logger.debug("{} task is running, passed this time".format( + const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME + )) return - logger.debug("Test system user connectablity period task start") - try: - cache.set(TEST_SYSTEM_USER_CONNECTABILITY_PEROID_KEY, 1, CACHE_MAX_TIME) - for system_user in SystemUser.objects.all(): - summary = test_system_user_connectability(system_user) - cache.set(SYSTEM_USER_CONN_CACHE_KEY_PREFIX + system_user.name, summary, 60*60*60) - finally: - cache.set(TEST_SYSTEM_USER_CONNECTABILITY_PEROID_KEY, 0) + logger.debug("Task {} start".format(const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME)) + cache.set(lock_key, 1, CACHE_MAX_TIME) + for system_user in SystemUser.objects.all(): + test_system_user_connectability(system_user) def get_push_system_user_tasks(system_user): tasks = [ { - 'name': 'Add user', + 'name': 'Add user {}'.format(system_user.username), 'action': { 'module': 'user', 'args': 'name={} shell={} state=present password={}'.format( @@ -228,7 +240,7 @@ def get_push_system_user_tasks(system_user): } }, { - 'name': 'Set authorized key', + 'name': 'Set {} authorized key'.format(system_user.username), 'action': { 'module': 'authorized_key', 'args': "user={} state=present key='{}'".format( @@ -237,7 +249,7 @@ def get_push_system_user_tasks(system_user): } }, { - 'name': 'Set sudoers', + 'name': 'Set {} sudo setting'.format(system_user.username), 'action': { 'module': 'lineinfile', 'args': "dest=/etc/sudoers state=present regexp='^{0} ALL=' " @@ -252,101 +264,127 @@ def get_push_system_user_tasks(system_user): return tasks +@shared_task def push_system_user(system_user, assets, task_name=None): - from ops.utils import get_task_by_name, run_adhoc_object, \ - create_task, create_adhoc + from ops.utils import create_or_update_task if system_user.auto_push and assets: if task_name is None: task_name = 'PUSH-SYSTEM-USER-{}'.format(system_user.name) - task = get_task_by_name(task_name) - if not task: - logger.debug("Doesn't get task {}, create it".format(task_name)) - task = create_task(task_name, created_by="System") - task.save() - tasks = get_push_system_user_tasks(system_user) hosts = [asset.hostname for asset in assets] - options = {'forks': FORKS, 'timeout': TIMEOUT} + tasks = get_push_system_user_tasks(system_user) - adhoc = task.get_latest_adhoc() - if not adhoc or adhoc.task != tasks or adhoc.hosts != hosts: - logger.debug("Task {} not exit or changed, create new version".format(task_name)) - adhoc = create_adhoc(task=task, tasks=tasks, pattern='all', - options=options, hosts=hosts, run_as_admin=True) - logger.debug("Task {} start execute".format(task_name)) - result = run_adhoc_object(adhoc) + task = create_or_update_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' + ) + result = task.run() + for i in result.results_summary.get('contacted'): + logger.debug("Push system user {} to {} [OK]".format( + system_user.name, i + )) + for i in result.results_summary.get('dark'): + logger.error("Push system user {} to {} [FAILED]".format( + system_user.name, i + )) return result.results_summary else: - msg = "Task {} does'nt execute, because not auto_push " \ + msg = "Task {} does'nt execute, because auto_push " \ "is not True, or not assets".format(task_name) logger.debug(msg) return {} @shared_task -def push_system_user_to_cluster_assets(system_user, task_name=None): - logger.debug("{} task start".format(task_name)) +def push_system_user_to_cluster_assets(system_user, force=False): + lock_key = const.PUSH_SYSTEM_USER_LOCK_KEY + task_name = const.PUSH_SYSTEM_USER_TASK_NAME.format(system_user.name) + if cache.get(lock_key, 0) == 1 and not force: + msg = "Task {} is running or before long, passed this time".format( + task_name + ) + logger.debug(msg) + return {} + + logger.debug("Task {} start".format(task_name)) assets = system_user.get_clusters_assets() summary = push_system_user(system_user, assets, task_name) - - for h in summary.get("contacted", []): - logger.debug("Push system user {} to {} success".format(system_user.name, h)) - for h, msg in summary.get('dark', {}).items(): - logger.error('Push system user {} to {} failed: {}'.format( - system_user.name, h, msg - )) return summary @shared_task def push_system_user_period(): - if cache.get(PUSH_SYSTEM_USER_PERIOD_KEY) == 1: - logger.debug("push system user period task is running, passed") + task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME + if cache.get(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY) == 1: + msg = "Task {} is running or before long, passed this time".format( + task_name + ) + logger.debug(msg) return + logger.debug("Task {} start".format(task_name)) + cache.set(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY, 1, timeout=CACHE_MAX_TIME) - logger.debug("Push system user period task start") - try: - cache.set(PUSH_SYSTEM_USER_PERIOD_KEY, 1, timeout=CACHE_MAX_TIME) - for system_user in SystemUser.objects.filter(auto_push=True): - task_name = 'PUSH-SYSTEM-USER-PERIOD' - push_system_user_to_cluster_assets(system_user, task_name) - finally: - cache.set(PUSH_SYSTEM_USER_PERIOD_KEY, 0) + for system_user in SystemUser.objects.filter(auto_push=True): + push_system_user_to_cluster_assets(system_user) -# def push_system_user_to_assets_if_need(system_user, assets=None, asset_groups=None): -# assets_to_push = [] -# system_user_assets = system_user.assets.all() -# if assets: -# assets_to_push.extend(assets) -# if asset_groups: -# for group in asset_groups: -# assets_to_push.extend(group.assets.all()) -# -# assets_need_push = set(assets_to_push) - set(system_user_assets) -# if not assets_need_push: -# return -# logger.debug("Push system user {} to {} assets".format( -# system_user.name, ', '.join([asset.hostname for asset in assets_need_push]) -# )) -# result = push_system_user(system_user, assets_need_push, PUSH_SYSTEM_USER_TASK_NAME) -# system_user.assets.add(*tuple(assets_need_push)) -# return result +@shared_task +def push_asset_system_users(asset, system_users=None, task_name=None): + from ops.utils import create_or_update_task + if task_name is None: + task_name = "PUSH-ASSET-SYSTEM-USER-{}".format(asset.hostname) + + if system_users is None: + system_users = asset.cluster.systemuser_set.all() + + tasks = [] + for system_user in system_users: + if system_user.auto_push: + tasks.extend(get_push_system_user_tasks(system_user)) + + hosts = [asset.hostname] + + task = create_or_update_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' + ) + result = task.run() + return result.results_summary @receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def update_asset_info(sender, instance=None, created=False, **kwargs): +def update_asset_info_when_created(sender, instance=None, created=False, **kwargs): if instance and created: - logger.debug("Receive asset create signal, update asset hardware info") - update_assets_hardware_info.delay([instance]) + msg = "Receive asset {} create signal, update asset hardware info".format( + instance + ) + logger.debug(msg) + task_name = "UPDATE-ASSET-HARDWARE-INFO-WHEN-CREATED" + update_assets_hardware_info.delay([instance], task_name) @receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def test_admin_user_connective(sender, instance=None, created=False, **kwargs): +def update_asset_conn_info_when_created(sender, instance=None, created=False, **kwargs): if instance and created: - logger.debug("Receive asset create signal, test admin user connectability") - test_admin_user_connectability_manual.delay(instance) + task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance) + msg = "Receive asset {} create signal, test asset connectability".format( + instance + ) + logger.debug(msg) + test_admin_user_connectability_manual.delay(instance, task_name) + + +@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") +def push_system_user_when_created(sender, instance=None, created=False, **kwargs): + if instance and created: + task_name = 'PUSH-SYSTEM-USER-WHEN-ASSET-CREATED-{}'.format(instance) + system_users = instance.cluster.systemuser_set.all() + msg = "Receive asset {} create signal, push system users".format( + instance + ) + logger.debug(msg) + push_asset_system_users.delay(instance, system_users, task_name=task_name) @receiver(post_save, sender=SystemUser) diff --git a/apps/assets/views/asset.py b/apps/assets/views/asset.py index 23a16e6cb..355feb936 100644 --- a/apps/assets/views/asset.py +++ b/apps/assets/views/asset.py @@ -9,24 +9,22 @@ import chardet from io import StringIO from django.conf import settings -from django.core.exceptions import ImproperlyConfigured, FieldDoesNotExist from django.utils.translation import ugettext_lazy as _ from django.views.generic import TemplateView, ListView, View from django.views.generic.edit import CreateView, DeleteView, FormView, UpdateView from django.urls import reverse_lazy -from django.views.generic.detail import DetailView, SingleObjectMixin -from django.http import HttpResponse, JsonResponse, HttpResponseRedirect, Http404 -from django.views.decorators.csrf import csrf_protect, csrf_exempt +from django.views.generic.detail import DetailView +from django.http import HttpResponse, JsonResponse +from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from django.core.cache import cache from django.utils import timezone from django.contrib.auth.mixins import LoginRequiredMixin -from django.shortcuts import get_object_or_404, redirect, reverse +from django.shortcuts import redirect from common.mixins import JSONResponseMixin -from common.utils import get_object_or_none -from common.imexp import ModelExportView +from common.utils import get_object_or_none, get_logger from .. import forms from ..models import Asset, AssetGroup, AdminUser, Cluster, SystemUser from ..hands import AdminUserRequiredMixin @@ -39,6 +37,7 @@ __all__ = [ 'AssetModalListView', 'AssetDeleteView', 'AssetExportView', 'BulkImportAssetView', ] +logger = get_logger(__file__) class AssetListView(AdminUserRequiredMixin, TemplateView): @@ -48,12 +47,11 @@ class AssetListView(AdminUserRequiredMixin, TemplateView): context = { 'app': 'Assets', 'action': 'Asset list', - 'groups': AssetGroup.objects.all(), + # 'groups': AssetGroup.objects.all(), 'system_users': SystemUser.objects.all(), - # 'form': forms.AssetBulkUpdateForm(), } kwargs.update(context) - return super(AssetListView, self).get_context_data(**kwargs) + return super().get_context_data(**kwargs) class UserAssetListView(LoginRequiredMixin, TemplateView): @@ -64,10 +62,9 @@ class UserAssetListView(LoginRequiredMixin, TemplateView): 'app': 'Assets', 'action': 'Asset list', 'system_users': SystemUser.objects.all(), - 'default_pk': '00000000-0000-0000-0000-000000000000', } kwargs.update(context) - return super(UserAssetListView, self).get_context_data(**kwargs) + return super().get_context_data(**kwargs) class AssetCreateView(AdminUserRequiredMixin, CreateView): @@ -107,7 +104,7 @@ class AssetModalListView(AdminUserRequiredMixin, ListView): 'assets': assets } kwargs.update(context) - return super(AssetModalListView, self).get_context_data(**kwargs) + return super().get_context_data(**kwargs) class AssetBulkUpdateView(AdminUserRequiredMixin, ListView): @@ -128,7 +125,7 @@ class AssetBulkUpdateView(AdminUserRequiredMixin, ListView): ) else: self.form = self.form_class() - return super(AssetBulkUpdateView, self).get(request, *args, **kwargs) + return super().get(request, *args, **kwargs) def post(self, request, *args, **kwargs): form = self.form_class(request.POST) @@ -148,7 +145,7 @@ class AssetBulkUpdateView(AdminUserRequiredMixin, ListView): 'assets': Asset.objects.all(), } kwargs.update(context) - return super(AssetBulkUpdateView, self).get_context_data(**kwargs) + return super().get_context_data(**kwargs) class AssetUpdateView(AdminUserRequiredMixin, UpdateView): @@ -166,8 +163,8 @@ class AssetUpdateView(AdminUserRequiredMixin, UpdateView): return super(AssetUpdateView, self).get_context_data(**kwargs) def form_invalid(self, form): - print(form.errors) - return super(AssetUpdateView, self).form_invalid(form) + logger.error(form.errors) + return super().form_invalid(form) class AssetDeleteView(AdminUserRequiredMixin, DeleteView): @@ -196,11 +193,46 @@ class AssetDetailView(DetailView): @method_decorator(csrf_exempt, name='dispatch') -class AssetExportView(ModelExportView): - filename_prefix = 'jumpserver' - redirect_url = reverse_lazy('assets:asset-export') - model = Asset - fields = ('hostname', 'ip') +class AssetExportView(View): + def get(self, request): + spm = request.GET.get('spm', '') + assets_id_default = [Asset.objects.first().id] if Asset.objects.first() else [1] + assets_id = cache.get(spm, assets_id_default) + fields = [ + field for field in Asset._meta.fields + if field.name not in [ + 'date_created' + ] + ] + filename = 'assets-{}.csv'.format( + timezone.localtime(timezone.now()).strftime('%Y-%m-%d_%H-%M-%S')) + response = HttpResponse(content_type='text/csv') + response['Content-Disposition'] = 'attachment; filename="%s"' % filename + response.write(codecs.BOM_UTF8) + assets = Asset.objects.filter(id__in=assets_id) + writer = csv.writer(response, dialect='excel', + quoting=csv.QUOTE_MINIMAL) + + header = [field.verbose_name for field in fields] + header.append(_('Asset groups')) + writer.writerow(header) + + for asset in assets: + groups = ','.join([group.name for group in asset.groups.all()]) + data = [getattr(asset, field.name) for field in fields] + data.append(groups) + writer.writerow(data) + return response + + def post(self, request, *args, **kwargs): + try: + assets_id = json.loads(request.body).get('assets_id', []) + except ValueError: + return HttpResponse('Json object not valid', status=400) + spm = uuid.uuid4().hex + cache.set(spm, assets_id, 300) + url = reverse_lazy('assets:asset-export') + '?spm=%s' % spm + return JsonResponse({'redirect': url}) class BulkImportAssetView(AdminUserRequiredMixin, JSONResponseMixin, FormView): @@ -305,4 +337,3 @@ class BulkImportAssetView(AdminUserRequiredMixin, JSONResponseMixin, FormView): } return self.render_json_response(data) - diff --git a/apps/common/imexp.py b/apps/common/imexp.py deleted file mode 100644 index 9bbe46fe1..000000000 --- a/apps/common/imexp.py +++ /dev/null @@ -1,331 +0,0 @@ -# -*- coding: utf-8 -*- -# -import codecs -import csv -import uuid -import json -from io import StringIO -import warnings -import chardet - -from django import forms -from django.utils import timezone -from django.views import View -from django.core.cache import cache -from django.core.exceptions import ImproperlyConfigured, FieldDoesNotExist -from django.utils.encoding import force_text -from django.http import Http404, HttpResponseRedirect, HttpResponse, JsonResponse - - -class ModelExportPostMixin: - """ - 将用户post上来的数据转存到cache, 生成一个uuid, redirect 到GET URL - """ - redirect_url = None - error_message = 'Json object not valid' - keyword = 'spm' - cache_key = None - request = None - - def get_redirect_url(self): - if self.redirect_url: - # Forcing possible reverse_lazy evaluation - url = force_text(self.redirect_url) - else: - msg = "No URL to redirect to. Provide a redirect_url." - raise ImproperlyConfigured(msg) - sep = "?" if url.find('?') else '&' - url = '{}{}{}={}'.format(url, sep, self.keyword, self.cache_key) - return url - - def save_objects_id_to_cache(self, objects_id): - self.cache_key = uuid.uuid4().hex - cache.set(self.cache_key, objects_id, 300) - return self.cache_key - - def get_objects_id_from_request(self): - try: - objects_id = json.loads(self.request.body) - except ValueError: - raise Http404(self.error_message) - return objects_id - - def get_redirect_response(self): - objects_id = self.get_objects_id_from_request() - self.save_objects_id_to_cache(objects_id) - url = self.get_redirect_url() - return HttpResponseRedirect(redirect_to=url) - - # View need implement it - # def post(self, request, *args, **kwargs): - # self.request = request - # return self.get_redirect_response() - - -class MethodField: - def __init__(self, name, verbose_name=None): - self.name = name - self.verbose_name = verbose_name - - if self.verbose_name is None: - self.verbose_name = name - - -class FieldCheckMeta(type): - - def __new__(cls, name, bases, attrs): - error = cls.validate_fields(attrs) - if not error: - return super().__new__(cls, name, bases, attrs) - else: - raise AttributeError(error) - - @staticmethod - def validate_fields(attrs): - model = attrs.get('model') - fields = attrs.get('fields') - if model is None or fields in ('__all__', None): - return None - - all_attr = [attr for attr in dir(model) if not attr.startswith('_')] - invalid_fields = [] - - for field in fields: - if field not in all_attr: - invalid_fields.append(field) - - if not invalid_fields: - return None - - error = 'model {} is not have `{}` attr, check `fields` setting'.format( - model._meta.model_name, ', '.join(invalid_fields) - ) - return error - - -class ModelFieldsMixin(metaclass=FieldCheckMeta): - model = None - fields = None - exclude = None - errors = None - __cleaned_fields_name = None - __is_valid = False - __defined_fields_name = None - - def get_define_fields_name(self): - """ - Calculate fields, fields may be `__all__`, `(field1, field2)` or - set `exclude` so do that - :return: => list - """ - if self.__defined_fields_name: - return self.__defined_fields_name - - all_fields = [field.name for field in self.model._meta.fields] - if self.fields == '__all__': - return all_fields - elif self.fields: - return self.fields - elif self.exclude: - return list(set(all_fields) - set(self.exclude)) - else: - return [] - - def get_field(self, field_name): - try: - return self.model._meta.get_field(field_name) - except FieldDoesNotExist: - attr = getattr(self.model, field_name) - if hasattr(attr, 'verbose_name'): - verbose_name = getattr(attr, 'verbose_name') - else: - verbose_name = field_name - return MethodField(field_name, verbose_name) - - def get_fields(self, cleaned_fields_name): - """ - Get fields by fields name - :param cleaned_fields_name: - :return: - """ - fields = [] - for name in cleaned_fields_name: - fields.append(self.get_field(name)) - return fields - - def get_define_fields(self): - fields_name = self.get_define_fields_name() - return self.get_fields(fields_name) - - def valid_field_name(self, field_name): - if not hasattr(self.model, field_name): - msg = "{} not has `{}` attr".format(self.model._meta.model_name, field_name) - raise AttributeError(msg) - elif field_name not in self.get_define_fields_name(): - msg = '{} not allowed by server'.format(field_name) - raise AttributeError(msg) - - def is_valid(self, fields, ignore_exception=True): - self.__cleaned_fields_name = [] - self.errors = {} - - for field_name in fields: - try: - self.valid_field_name(field_name) - self.__cleaned_fields_name.append(field_name) - except AttributeError as e: - if not ignore_exception: - self.errors[field_name] = str(e) - - if self.errors: - self.__is_valid = False - return False - else: - self.__is_valid = True - return True - - @property - def field_verbose_name_mapping(self): - mapping = {} - for field in self.get_define_fields(): - mapping[field.verbose_name] = field.name - return mapping - - @property - def cleaned_fields(self): - if self.__cleaned_fields_name is None: - raise AttributeError("Run `is_valid` first") - - if not self.__is_valid: - warnings.warn("Is not valid, result may be not complete") - - return self.get_fields(self.__cleaned_fields_name) - - -class ModelExportGetMixin(ModelFieldsMixin): - filename_prefix = 'jumpserver' - response = None - writer = None - model = None - objects_id = None - queryset = None - keyword = 'spm' - - def get_filename(self): - now = timezone.localtime(timezone.now()).strftime('%Y-%m-%d_%H-%M-%S') - filename = '{}-{}-{}.csv'.format( - self.filename_prefix, self.model._meta.model_name, now - ) - return filename - - def get_objects_id(self): - cache_key = self.request.GET.get(self.keyword) - self.objects_id = cache.get(cache_key, []) - return self.objects_id - - def get_queryset(self): - queryset = None - - if self.queryset: - queryset = self.queryset - elif self.queryset is None: - queryset = self.model._meta.default_manager.all() - - if queryset is None: - raise AttributeError("Get queryset failed, set `queryset` or `model`") - - objects_id = self.get_objects_id() - queryset_filtered = queryset.filter(id__in=objects_id) - return queryset_filtered - - def initial_csv_response(self): - filename = self.get_filename() - self.response = HttpResponse(content_type='text/csv') - self.response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename) - self.response.write(codecs.BOM_UTF8) - self.writer = csv.writer(self.response, dialect='excel', quoting=csv.QUOTE_MINIMAL) - header = [] - for field in self.get_define_fields(): - if hasattr(field, 'verbose_name'): - header.append(getattr(field, 'verbose_name')) - else: - header.append(getattr(field, 'name')) - self.writer.writerow(header) - - def make_csv_response(self): - self.initial_csv_response() - queryset = self.get_queryset() - - for instance in queryset: - data = [getattr(instance, field.name) for field in self.get_define_fields()] - self.writer.writerow(data) - return self.response - - -class FileForm(forms.Form): - file = forms.FileField() - - -class ModelImportPostMixin(ModelFieldsMixin): - form_context = "file" - csv_data = None - form_class = FileForm - stream = None - - def get_form(self): - form = self.form_class(self.request.POST) - if form.is_valid(): - raise Http404("Form is not valid") - return form - - def get_stream(self): - self.stream = self.get_form().cleaned_data[self.form_context] - return self.stream - - def get_csv_data(self, stream=None): - if stream is None: - stream = self.stream - result = chardet.detect(stream.read()) - stream.seek(0) - raw_data = stream.read().decode(result['encoding'])\ - .strip(codecs.BOM_UTF8.decode()) - csv_file = StringIO(raw_data) - reader = csv.reader(csv_file) - csv_data = [row for row in reader] - self.csv_data = csv_data - return csv_data - - def cleaned_post_fields(self): - fields = [] - header = self.csv_data[0] - fields_name = [self.field_verbose_name_mapping.get(v) for v in header] - for name in fields_name: - if name in self.get_define_fields(): - fields.append(self.get_field(name)) - else: - fields.append(None) - return fields - - def create_or_update(self): - stream = self.get_stream() - csv_data = self.get_csv_data(stream) - cleaned_fields = self.cleaned_post_fields() - - - -class ModelImportView(ModelImportPostMixin): - def post(self, request, *args, **kwargs): - return self.create_or_update() - - -class ModelExportView(ModelExportPostMixin, ModelExportGetMixin, View): - model = None - filename_prefix = 'jumpserver' - - def post(self, request, *args, **kwargs): - return self.get_redirect_response() - - def get(self, request, *args, **kwargs): - self.request = request - response = self.make_csv_response() - return response diff --git a/apps/ops/ansible/callback.py b/apps/ops/ansible/callback.py index cc4bc6215..81d39c2e7 100644 --- a/apps/ops/ansible/callback.py +++ b/apps/ops/ansible/callback.py @@ -1,13 +1,14 @@ # ~*~ coding: utf-8 ~*~ from ansible.plugins.callback import CallbackBase +from ansible.plugins.callback.default import CallbackModule -class AdHocResultCallback(CallbackBase): +class AdHocResultCallback(CallbackModule): """ Task result Callback """ - def __init__(self, display=None): + def __init__(self, display=None, options=None): # result_raw example: { # "ok": {"hostname": {"task_name": {},...},..}, # "failed": {"hostname": {"task_name": {}..}, ..}, @@ -20,9 +21,10 @@ class AdHocResultCallback(CallbackBase): # } self.results_raw = dict(ok={}, failed={}, unreachable={}, skipped={}) self.results_summary = dict(contacted=[], dark={}) - super().__init__(display) + super().__init__() def gather_result(self, t, res): + self._clean_results(res._result, res._task.action) host = res._host.get_name() task_name = res.task_name task_result = res._result @@ -49,15 +51,19 @@ class AdHocResultCallback(CallbackBase): def v2_runner_on_failed(self, result, ignore_errors=False): self.gather_result("failed", result) + super().v2_runner_on_failed(result, ignore_errors=ignore_errors) def v2_runner_on_ok(self, result): self.gather_result("ok", result) + super().v2_runner_on_ok(result) def v2_runner_on_skipped(self, result): self.gather_result("skipped", result) + super().v2_runner_on_skipped(result) def v2_runner_on_unreachable(self, result): self.gather_result("unreachable", result) + super().v2_runner_on_unreachable(result) class CommandResultCallback(AdHocResultCallback): diff --git a/apps/ops/models.py b/apps/ops/models.py index e242a0867..efc7e3a6e 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -21,37 +21,62 @@ class Task(models.Model): One task can have some versions of adhoc, run a task only run the latest version adhoc """ id = models.UUIDField(default=uuid.uuid4, primary_key=True) - name = models.CharField(max_length=128, blank=True, verbose_name=_('Name')) + name = models.CharField(max_length=128, unique=True, verbose_name=_('Name')) is_deleted = models.BooleanField(default=False) created_by = models.CharField(max_length=128, blank=True, default='') date_created = models.DateTimeField(auto_now_add=True) + __latest_adhoc = None @property def short_id(self): return str(self.id).split('-')[-1] + @property + def latest_adhoc(self): + if not self.__latest_adhoc: + self.__latest_adhoc = self.get_latest_adhoc() + return self.__latest_adhoc + + @latest_adhoc.setter + def latest_adhoc(self, item): + self.__latest_adhoc = item + + @property + def latest_history(self): + try: + return self.history.all().latest() + except AdHocRunHistory.DoesNotExist: + return None + + def get_latest_adhoc(self): + try: + return self.adhoc.all().latest() + except AdHoc.DoesNotExist: + return None + + @property + def history_summary(self): + history = self.get_run_history() + total = len(history) + success = len([history for history in history if history.is_success]) + failed = len([history for history in history if not history.is_success]) + return {'total': total, 'success': success, 'failed': failed} + + def get_run_history(self): + return self.history.all() + + def run(self): + if self.latest_adhoc: + return self.latest_adhoc.run() + else: + return {'error': 'No adhoc'} + def __str__(self): return self.name - def get_latest_adhoc(self): - return self.adhoc.all().order_by('date_created').last() - - def get_latest_history(self): - return self.get_latest_adhoc().get_latest_history() - - def get_all_run_history(self): - adhocs = self.adhoc.all() - return AdHocRunHistory.objects.filter(adhoc__in=adhocs) - - def get_all_run_times(self): - history_all = self.get_all_run_history() - total = len(history_all) - success = len([history for history in history_all if history.is_success]) - failed = len([history for history in history_all if not history.is_success]) - return {'total': total, 'success': success, 'failed': failed} - class Meta: db_table = 'ops_task' + get_latest_by = 'date_created' class AdHoc(models.Model): @@ -103,6 +128,10 @@ class AdHoc(models.Model): else: return {} + def run(self): + from .utils import run_adhoc_object + return run_adhoc_object(self, **self.options) + @become.setter def become(self, item): """ @@ -130,14 +159,31 @@ class AdHoc(models.Model): def short_id(self): return str(self.id).split('-')[-1] - def get_latest_history(self): - return self.history.all().order_by('date_start').last() + @property + def latest_history(self): + try: + return self.history.all().latest() + except AdHocRunHistory.DoesNotExist: + return None def __str__(self): return "{} of {}".format(self.task.name, self.short_id) + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + fields_check = [] + for field in self.__class__._meta.fields: + if field.name not in ['id', 'date_created']: + fields_check.append(field) + for field in fields_check: + if getattr(self, field.name) != getattr(other, field.name): + return False + return True + class Meta: db_table = "ops_adhoc" + get_latest_by = 'date_created' class AdHocRunHistory(models.Model): @@ -145,7 +191,8 @@ class AdHocRunHistory(models.Model): AdHoc running history. """ id = models.UUIDField(default=uuid.uuid4, primary_key=True) - adhoc = models.ForeignKey(AdHoc, related_name='history', on_delete=models.CASCADE) + task = models.ForeignKey(Task, related_name='history', on_delete=models.SET_NULL, null=True) + adhoc = models.ForeignKey(AdHoc, related_name='history', on_delete=models.SET_NULL, null=True) date_start = models.DateTimeField(auto_now_add=True, verbose_name=_('Start time')) date_finished = models.DateTimeField(blank=True, null=True, verbose_name=_('End time')) timedelta = models.FloatField(default=0.0, verbose_name=_('Time'), null=True) @@ -179,3 +226,4 @@ class AdHocRunHistory(models.Model): class Meta: db_table = "ops_adhoc_history" + get_latest_by = 'date_start' diff --git a/apps/ops/templates/ops/task_list.html b/apps/ops/templates/ops/task_list.html index 7b1c2cde3..bc5a432c5 100644 --- a/apps/ops/templates/ops/task_list.html +++ b/apps/ops/templates/ops/task_list.html @@ -52,19 +52,19 @@ {{ object.name }} - {{ object.get_all_run_times.failed }}/{{ object.get_all_run_times.success}}/{{ object.get_all_run_times.total}} + {{ object.history_summary.failed }}/{{ object.history_summary.success}}/{{ object.history_summary.total}} {{ object.adhoc.all | length}} - {{ object.get_latest_adhoc.hosts | length}} + {{ object.latest_adhoc.hosts | length}} - {% if object.get_latest_history.is_success %} + {% if object.latest_history.is_success %} {% else %} {% endif %} - {{ object.get_latest_history.date_start }} - {{ object.get_latest_history.timedelta|floatformat }} s + {{ object.latest_history.date_start }} + {{ object.latest_history.timedelta|floatformat }} s {% trans "Run" %} {% trans "Delete" %} diff --git a/apps/ops/utils.py b/apps/ops/utils.py index 61feca1e4..1e2977af4 100644 --- a/apps/ops/utils.py +++ b/apps/ops/utils.py @@ -21,10 +21,9 @@ def is_uuid(s): return False - def record_adhoc(func): def _deco(adhoc, **options): - record = AdHocRunHistory(adhoc=adhoc) + record = AdHocRunHistory(adhoc=adhoc, task=adhoc.task) time_start = time.time() try: result = func(adhoc, **options) @@ -86,7 +85,7 @@ def run_adhoc_object(adhoc, **options): name = adhoc.task.name inventory = get_adhoc_inventory(adhoc) runner = AdHocRunner(inventory) - for k, v in options: + for k, v in options.items(): runner.set_option(k, v) try: @@ -113,42 +112,69 @@ def run_adhoc(hostname_list, pattern, tasks, name=None, return runner.run(tasks, pattern, play_name=name) -def create_and_run_adhoc(hostname_list, pattern, tasks, name=None, - run_as_admin=False, run_as=None, become_info=None): - if name is None: - name = "Adhoc-task-{}-{}".format( - get_short_uuid_str(), - timezone.now().strftime("%Y-%m-%d %H:%M:%S"), - ) - task = Task(name=name) - task.save() - adhoc = AdHoc( - task=task, pattern=pattern, name=name, - run_as_admin=run_as_admin, run_as=run_as - ) - adhoc.hosts = hostname_list - adhoc.tasks = tasks - adhoc.become = become_info - adhoc.save() +# def create_and_run_adhoc(hostname_list, pattern, tasks, name=None, +# run_as_admin=False, run_as=None, become_info=None): +# if name is None: +# name = "Adhoc-task-{}-{}".format( +# get_short_uuid_str(), +# timezone.now().strftime("%Y-%m-%d %H:%M:%S"), +# ) +# task = Task(name=name) +# task.save() +# adhoc = AdHoc( +# task=task, pattern=pattern, name=name, +# run_as_admin=run_as_admin, run_as=run_as +# ) +# adhoc.hosts = hostname_list +# adhoc.tasks = tasks +# adhoc.become = become_info +# adhoc.save() -def get_task_by_name(name): - task = get_object_or_none(Task, name=name) +# def get_task_by_name(name): +# task = get_object_or_none(Task, name=name) +# return task + + +# def create_task(name, created_by=""): +# return Task.objects.create(name=name, created_by=created_by) +# +# +# def create_adhoc(task, hosts, tasks, pattern='all', options=None, +# run_as_admin=False, run_as="", +# become_info=None, created_by=""): +# adhoc = AdHoc(task=task, pattern=pattern, run_as_admin=run_as_admin, +# run_as=run_as, created_by=created_by) +# adhoc.hosts = hosts +# adhoc.tasks = tasks +# adhoc.options = options +# adhoc.become = become_info +# adhoc.save() +# return adhoc + + +def create_or_update_task( + task_name, hosts, tasks, pattern='all', options=None, + run_as_admin=False, run_as="", become_info=None, + created_by=None + ): + task = get_object_or_none(Task, name=task_name) + if task is None: + task = Task(name=task_name, created_by=created_by) + task.save() + + adhoc = task.get_latest_adhoc() + new_adhoc = AdHoc(task=task, pattern=pattern, + run_as_admin=run_as_admin, + run_as=run_as) + new_adhoc.hosts = hosts + new_adhoc.tasks = tasks + new_adhoc.options = options + new_adhoc.become = become_info + if not adhoc or adhoc != new_adhoc: + new_adhoc.save() + task.latest_adhoc = new_adhoc return task -def create_task(name, created_by=""): - return Task.objects.create(name=name, created_by=created_by) - -def create_adhoc(task, hosts, tasks, pattern='all', options=None, - run_as_admin=False, run_as="", - become_info=None, created_by=""): - adhoc = AdHoc(task=task, pattern=pattern, run_as_admin=run_as_admin, - run_as=run_as, created_by=created_by) - adhoc.hosts = hosts - adhoc.tasks = tasks - adhoc.options = options - adhoc.become = become_info - adhoc.save() - return adhoc