From bf9bb1b973cb4e38969fe3fe035256aad8981fd2 Mon Sep 17 00:00:00 2001 From: ibuler Date: Sun, 24 Dec 2017 18:53:07 +0800 Subject: [PATCH] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9ops=20task?= =?UTF-8?q?=E8=BF=90=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/assets/api.py | 16 +- apps/assets/const.py | 26 +- apps/assets/models/user.py | 3 +- apps/assets/tasks.py | 450 +++++++++--------- .../templates/assets/asset_modal_list.html | 3 - apps/assets/views/asset.py | 6 +- apps/common/celery.py | 173 ++++++- apps/common/tasks.py | 2 +- apps/common/utils.py | 74 +-- apps/jumpserver/settings.py | 3 +- apps/ops/decorators.py | 38 -- apps/ops/models.py | 15 +- apps/ops/tasks.py | 4 +- apps/ops/templates/ops/task_list.html | 16 +- apps/ops/utils.py | 98 +--- apps/static/js/jumpserver.js | 3 - apps/templates/_left_side_bar.html | 2 +- apps/templates/_user_profile.html | 6 +- apps/users/models/group.py | 7 - apps/users/models/user.py | 4 +- apps/users/serializers.py | 4 +- run_server.py | 2 +- 22 files changed, 503 insertions(+), 452 deletions(-) delete mode 100644 apps/ops/decorators.py diff --git a/apps/assets/api.py b/apps/assets/api.py index 1fe371dcd..064c2780e 100644 --- a/apps/assets/api.py +++ b/apps/assets/api.py @@ -25,9 +25,9 @@ from .hands import IsSuperUser, IsValidUser, IsSuperUserOrAppUser, \ get_user_granted_assets from .models import AssetGroup, Asset, Cluster, SystemUser, AdminUser from . import serializers -from .tasks import update_assets_hardware_info, test_admin_user_connectability, \ - test_admin_user_connectability_manual, push_system_user_to_cluster_assets, \ - test_system_user_connectability +from .tasks import update_assets_hardware_info_manual, test_admin_user_connectability_util, \ + test_asset_connectability_manual, push_system_user_to_cluster_assets_manual, \ + test_system_user_connectability_manual class AssetViewSet(IDInFilterMixin, BulkModelViewSet): @@ -222,7 +222,7 @@ class AssetRefreshHardwareApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): asset_id = kwargs.get('pk') asset = get_object_or_404(Asset, pk=asset_id) - summary = update_assets_hardware_info([asset]) + summary = update_assets_hardware_info_manual([asset]) if summary.get('dark'): return Response(summary['dark'].values(), status=501) else: @@ -239,7 +239,7 @@ class AssetAdminUserTestApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): asset_id = kwargs.get('pk') asset = get_object_or_404(Asset, pk=asset_id) - ok, msg = test_admin_user_connectability_manual(asset) + ok, msg = test_asset_connectability_manual(asset) if ok: return Response({"msg": "pong"}) else: @@ -255,7 +255,7 @@ class AdminUserTestConnectiveApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): admin_user = self.get_object() - test_admin_user_connectability.delay(admin_user, force=True) + test_admin_user_connectability_util.delay(admin_user, force=True) return Response({"msg": "Task created"}) @@ -268,7 +268,7 @@ class SystemUserPushApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): system_user = self.get_object() - push_system_user_to_cluster_assets.delay(system_user, force=True) + push_system_user_to_cluster_assets_manual.delay(system_user, force=True) return Response({"msg": "Task created"}) @@ -281,5 +281,5 @@ class SystemUserTestConnectiveApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): system_user = self.get_object() - test_system_user_connectability.delay(system_user, force=True) + test_system_user_connectability_manual.delay(system_user, force=True) return Response({"msg": "Task created"}) diff --git a/apps/assets/const.py b/apps/assets/const.py index adad03aeb..5944e1124 100644 --- a/apps/assets/const.py +++ b/apps/assets/const.py @@ -2,14 +2,20 @@ # from django.utils.translation import ugettext as _ -PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" -PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD TASK") +# PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" +PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD: {}") +PUSH_SYSTEM_USER_MANUAL_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER MANUALLY: {}") PUSH_SYSTEM_USER_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER: {}") -PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}" +# PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}" +PUSH_SYSTEM_USER_ON_CHANGE_TASK_NAME = _("PUSH SYSTEM USER ON CHANGE: {}") +PUSH_SYSTEM_USER_ON_CREATE_TASK_NAME = _("PUSH SYSTEM USER ON CREATE: {}") +PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME = _("PUSH SYSTEM USERS ON ASSET CREAT: {}") UPDATE_ASSETS_HARDWARE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO') -UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY" +UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO MANUALLY') +UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO ON CREATE') +# UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY" UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO PERIOD') UPDATE_ASSETS_HARDWARE_TASKS = [ { @@ -20,10 +26,10 @@ UPDATE_ASSETS_HARDWARE_TASKS = [ } ] -TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY" -TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD TASK") +# TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY" +TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD: {}") +TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME = _("TEST ADMIN USER CONN MANUALLY: {}") TEST_ADMIN_USER_CONN_TASK_NAME = _("TEST ADMIN USER CONN: {}") -TEST_ADMIN_USER_CONN_LOCK_KEY = TEST_ADMIN_USER_CONN_TASK_NAME ADMIN_USER_CONN_CACHE_KEY = "ADMIN_USER_CONN_{}" TEST_ADMIN_USER_CONN_TASKS = [ { @@ -38,10 +44,8 @@ ASSET_ADMIN_CONN_CACHE_KEY = "ASSET_ADMIN_USER_CONN_{}" TEST_ASSET_CONN_TASK_NAME = _("ASSET CONN TEST MANUAL") TEST_SYSTEM_USER_CONN_PERIOD_LOCK_KEY = "TEST_SYSTEM_USER_CONN_PERIOD_KEY" -TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD TASK") -TEST_SYSTEM_USER_CONN_CACHE_KEY_PREFIX = "SYSTEM_USER_CONN_" -TEST_SYSTEM_USER_CONN_TASK_NAME = _("TEST SYSTEM USER CONN: {}") -TEST_SYSTEM_USER_CONN_LOCK_KEY = "TEST_SYSTEM_USER_CONN_{}" +TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD: {}") +TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME = _("TEST SYSTEM USER CONN MANUALLY: {}") SYSTEM_USER_CONN_CACHE_KEY = "SYSTEM_USER_CONN_{}" TEST_SYSTEM_USER_CONN_TASKS = [ { diff --git a/apps/assets/models/user.py b/apps/assets/models/user.py index 674ffa008..3e5e41b8d 100644 --- a/apps/assets/models/user.py +++ b/apps/assets/models/user.py @@ -13,13 +13,14 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from django.conf import settings -from common.utils import signer, ssh_key_string_to_obj, ssh_key_gen +from common.utils import get_signer, ssh_key_string_to_obj, ssh_key_gen from .utils import private_key_validator from ..const import SYSTEM_USER_CONN_CACHE_KEY __all__ = ['AdminUser', 'SystemUser',] logger = logging.getLogger(__name__) +signer = get_signer() class AssetUser(models.Model): diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index 64de5423f..e83357f68 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -8,9 +8,11 @@ from django.db.models.signals import post_save from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger +from common.celery import register_as_period_task, after_app_shutdown_clean, \ + after_app_ready_start, app as celery_app + from .models import SystemUser, AdminUser, Asset from . import const -from ops.decorators import register_as_period_task FORKS = 10 @@ -19,7 +21,18 @@ logger = get_logger(__file__) CACHE_MAX_TIME = 60*60*60 -def _update_asset_info(result_raw): +@shared_task +def update_assets_hardware_info(result, **kwargs): + """ + Using ops task run result, to update asset info + + @shared_task must be exit, because we using it as a task callback, is must + be a celery task also + :param result: + :param kwargs: {task_name: ""} + :return: + """ + result_raw = result[0] assets_updated = [] for hostname, info in result_raw['ok'].items(): if info: @@ -65,173 +78,240 @@ def _update_asset_info(result_raw): @shared_task -def update_assets_hardware_info(assets, task_name=None): +def update_assets_hardware_info_util(assets, task_name): """ Using ansible api to update asset hardware info :param assets: asset seq :param task_name: task_name running :return: result summary ['contacted': {}, 'dark': {}] """ - from ops.utils import create_or_update_ansible_task - if task_name is None: - task_name = const.UPDATE_ASSETS_HARDWARE_TASK_NAME + from ops.utils import update_or_create_ansible_task tasks = const.UPDATE_ASSETS_HARDWARE_TASKS hostname_list = [asset.hostname for asset in assets] - task = create_or_update_ansible_task( + task, _ = update_or_create_ansible_task( task_name, hosts=hostname_list, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) result = task.run() - summary, result_raw = result.results_summary, result.results_raw - # TOdo: may be somewhere using - assets_updated = _update_asset_info(result_raw) - return summary + # Todo: may be somewhere using + # Manual run callback function + assets_updated = update_assets_hardware_info(result) + return result @shared_task -@register_as_period_task(interval=60*60*60*24) -def update_assets_hardware_period(): +def update_assets_hardware_info_manual(assets): + task_name = const.UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME + return update_assets_hardware_info_util(assets, task_name) + + +@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") +def update_asset_info_on_created(sender, instance=None, created=False, **kwargs): + if instance and created: + msg = "Receive asset {} create signal, update asset hardware info".format( + instance + ) + logger.debug(msg) + task_name = const.UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME + update_assets_hardware_info_util.delay([instance], task_name) + + +@celery_app.task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean +def update_assets_hardware_info_period(): """ Update asset hardware period task :return: """ - from ops.utils import create_or_update_ansible_task + from ops.utils import update_or_create_ansible_task task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME - if cache.get(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1: - msg = "Task {} is running or before long, passed this time".format( - task_name - ) - logger.debug(msg) - return {} - # Todo: set cache but not update, because we want also set it to as a - # minimum update time too - cache.set(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) - assets = Asset.objects.filter(type__in=['Server', 'VM']) - return update_assets_hardware_info(assets, task_name=task_name) + hostname_list = [asset.hostname for asset in Asset.objects.all()] + tasks = const.UPDATE_ASSETS_HARDWARE_TASKS + # Only create, schedule by celery beat + _ = update_or_create_ansible_task( + task_name, hosts=hostname_list, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + interval=60*60*24, is_periodic=True, callback=update_assets_hardware_info.name, + ) + + +## ADMIN USER CONNECTIVE ## @shared_task -def test_admin_user_connectability(admin_user, force=False): - """ - Test asset admin user can connect or not. Using ansible api do that - :param admin_user: - :param force: Force update - :return: - """ - from ops.utils import create_or_update_ansible_task +def update_admin_user_connectability_info(result, **kwargs): + admin_user = kwargs.get("admin_user") + task_name = kwargs.get("task_name") + if admin_user is None and task_name is not None: + admin_user = task_name.split(":")[-1] - task_name = const.TEST_ADMIN_USER_CONN_TASK_NAME.format(admin_user.name) - lock_key = const.TEST_ADMIN_USER_CONN_LOCK_KEY.format(admin_user.name) + _, summary = result + cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user) + cache.set(cache_key, summary, CACHE_MAX_TIME) - if cache.get(lock_key, 0) == 1 and not force: - logger.debug("Task {} is running or before along, passed this time") - return {} - - assets = admin_user.get_related_assets() - hosts = [asset.hostname for asset in assets] - tasks = const.TEST_ADMIN_USER_CONN_TASKS - task = create_or_update_ansible_task( - task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', - options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', - ) - cache.set(lock_key, 1, CACHE_MAX_TIME) - result = task.run() - cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user.name) - cache.set(cache_key, result.results_summary, CACHE_MAX_TIME) - - for i in result.results_summary.get('contacted', []): + for i in summary.get('contacted', []): asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) cache.set(asset_conn_cache_key, 1, CACHE_MAX_TIME) - for i, msg in result.results_summary.get('dark', {}).items(): + for i, msg in summary.get('dark', {}).items(): asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) cache.set(asset_conn_cache_key, 0, CACHE_MAX_TIME) logger.error(msg) - return result.results_summary - @shared_task -def test_admin_user_connectability_period(): - if cache.get(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY) == 1: - msg = "{} task is running or before long, passed this time".format( - const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME - ) - logger.debug(msg) - return +def test_admin_user_connectability_util(admin_user, task_name): + """ + Test asset admin user can connect or not. Using ansible api do that + :param admin_user: + :param task_name: + :param force: Force update + :return: + """ + from ops.utils import update_or_create_ansible_task - logger.debug("Task {} start".format(const.TEST_ADMIN_USER_CONN_TASK_NAME)) - cache.set(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) - admin_users = AdminUser.objects.all() - for admin_user in admin_users: - test_admin_user_connectability(admin_user) - - -@shared_task -def test_admin_user_connectability_manual(asset, task_name=None): - from ops.utils import create_or_update_ansible_task - if task_name is None: - task_name = const.TEST_ASSET_CONN_TASK_NAME - hosts = [asset.hostname] + assets = admin_user.get_related_assets() + hosts = [asset.hostname for asset in assets] tasks = const.TEST_ADMIN_USER_CONN_TASKS - task = create_or_update_ansible_task( - task_name, tasks=tasks, hosts=hosts, run_as_admin=True, - created_by='System', options=const.TASK_OPTIONS, pattern='all', + task, created = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) result = task.run() + update_admin_user_connectability_info(result, admin_user=admin_user.name) + return result - if result.results_summary['dark']: + +@celery_app.task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean +def test_admin_user_connectability_period(): + """ + A period task that update the ansible task period + """ + from ops.utils import update_or_create_ansible_task + admin_users = AdminUser.objects.all() + for admin_user in admin_users: + task_name = const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME.format(admin_user.name) + assets = admin_user.get_related_assets() + hosts = [asset.hostname for asset in assets] + tasks = const.TEST_ADMIN_USER_CONN_TASKS + _ = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + interval=3600, is_periodic=True, + callback=update_admin_user_connectability_info.name, + ) + + +@shared_task +def test_admin_user_connectability_manual(admin_user): + task_name = const.TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME.format(admin_user.name) + return test_admin_user_connectability_util.delay(admin_user, task_name) + + +@shared_task +def test_asset_connectability_manual(asset): + from ops.utils import update_or_create_ansible_task + + task_name = const.TEST_ASSET_CONN_TASK_NAME + assets = [asset] + hosts = [asset.hostname for asset in assets] + tasks = const.TEST_ADMIN_USER_CONN_TASKS + task, created = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + ) + result = task.run() + summary = result[1] + if summary.get('dark'): cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, CACHE_MAX_TIME) - return False, result.results_summary['dark'] + return False, summary['dark'] else: cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, CACHE_MAX_TIME) return True, "" +@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") +def update_asset_conn_info_on_created(sender, instance=None, created=False, + **kwargs): + if instance and created: + task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance) + msg = "Receive asset {} create signal, test asset connectability".format( + instance + ) + logger.debug(msg) + test_asset_connectability_manual.delay(instance, task_name) + + +## System user connective ## + + @shared_task -def test_system_user_connectability(system_user, force=False): +def update_system_user_connectablity_info(result, **kwargs): + summary = result[1] + task_name = kwargs.get("task_name") + system_user = kwargs.get("system_user") + if system_user is None: + system_user = task_name.split(":")[-1] + cache_key = const.SYSTEM_USER_CONN_CACHE_KEY.format(system_user) + cache.set(cache_key, summary, CACHE_MAX_TIME) + + +@shared_task +def test_system_user_connectability_util(system_user, task_name): """ Test system cant connect his assets or not. :param system_user: - :param force + :param task_name: :return: """ - from ops.utils import create_or_update_ansible_task - lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY.format(system_user.name) - task_name = const.TEST_SYSTEM_USER_CONN_TASK_NAME.format(system_user.name) - if cache.get(lock_key, 0) == 1 and not force: - logger.debug("Task {} is running or before long, passed this time".format(task_name)) - return {} + from ops.utils import update_or_create_ansible_task assets = system_user.get_clusters_assets() hosts = [asset.hostname for asset in assets] tasks = const.TEST_SYSTEM_USER_CONN_TASKS - task = create_or_update_ansible_task( + task, created = update_or_create_ansible_task( task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as=system_user.name, created_by="System", ) - cache.set(lock_key, 1, CACHE_MAX_TIME) result = task.run() - cache_key = const.SYSTEM_USER_CONN_CACHE_KEY.format(system_user.name) - print("Set cache: {} {}".format(cache_key, result.results_summary)) - cache.set(cache_key, result.results_summary, CACHE_MAX_TIME) - return result.results_summary + update_system_user_connectablity_info(result, system_user=system_user.name) + return result @shared_task +def test_system_user_connectability_manual(system_user): + task_name = const.TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME.format(system_user.name) + return test_system_user_connectability_util(system_user, task_name) + + +@shared_task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean def test_system_user_connectability_period(): - lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY - if cache.get(lock_key) == 1: - logger.debug("{} task is running, passed this time".format( - const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME - )) - return + from ops.utils import update_or_create_ansible_task + system_users = SystemUser.objects.all() + for system_user in system_users: + task_name = const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME.format( + system_user.name + ) + assets = system_user.get_clusters_assets() + hosts = [asset.hostname for asset in assets] + tasks = const.TEST_SYSTEM_USER_CONN_TASKS + _ = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=False, run_as=system_user.name, + created_by='System', interval=3600, is_periodic=True, + callback=update_admin_user_connectability_info.name, + ) - logger.debug("Task {} start".format(const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME)) - cache.set(lock_key, 1, CACHE_MAX_TIME) - for system_user in SystemUser.objects.all(): - test_system_user_connectability(system_user) +#### Push system user tasks #### def get_push_system_user_tasks(system_user): tasks = [ @@ -271,75 +351,48 @@ def get_push_system_user_tasks(system_user): @shared_task -def push_system_user(system_user, assets, task_name=None): - from ops.utils import create_or_update_ansible_task +def push_system_user_util(system_user, task_name): + from ops.utils import update_or_create_ansible_task - if system_user.auto_push and assets: - if task_name is None: - task_name = 'PUSH-SYSTEM-USER-{}'.format(system_user.name) + tasks = get_push_system_user_tasks(system_user) + assets = system_user.get_clusters_assets() + hosts = [asset.hostname for asset in assets] + task, _ = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' + ) + return task.run() + +@shared_task +def push_system_user_to_cluster_assets_manual(system_user): + task_name = const.PUSH_SYSTEM_USER_MANUAL_TASK_NAME.format(system_user.name) + return push_system_user_util(system_user, task_name) + + +@shared_task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean +def push_system_user_period(): + from ops.utils import update_or_create_ansible_task + + for system_user in SystemUser.objects.filter(auto_push=True): + assets = system_user.get_clusters_assets() + task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME.format(system_user.name) hosts = [asset.hostname for asset in assets] tasks = get_push_system_user_tasks(system_user) - task = create_or_update_ansible_task( + _ = update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', - options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + interval=60*60*24, is_periodic=True, ) - result = task.run() - for i in result.results_summary.get('contacted'): - logger.debug("Push system user {} to {} [OK]".format( - system_user.name, i - )) - for i in result.results_summary.get('dark'): - logger.error("Push system user {} to {} [FAILED]".format( - system_user.name, i - )) - return result.results_summary - else: - msg = "Task {} does'nt execute, because auto_push " \ - "is not True, or not assets".format(task_name) - logger.debug(msg) - return {} @shared_task -def push_system_user_to_cluster_assets(system_user, force=False): - lock_key = const.PUSH_SYSTEM_USER_LOCK_KEY - task_name = const.PUSH_SYSTEM_USER_TASK_NAME.format(system_user.name) - if cache.get(lock_key, 0) == 1 and not force: - msg = "Task {} is running or before long, passed this time".format( - task_name - ) - logger.debug(msg) - return {} - - logger.debug("Task {} start".format(task_name)) - assets = system_user.get_clusters_assets() - summary = push_system_user(system_user, assets, task_name) - return summary - - -@shared_task -def push_system_user_period(): - task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME - if cache.get(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY) == 1: - msg = "Task {} is running or before long, passed this time".format( - task_name - ) - logger.debug(msg) - return - logger.debug("Task {} start".format(task_name)) - cache.set(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY, 1, timeout=CACHE_MAX_TIME) - - for system_user in SystemUser.objects.filter(auto_push=True): - push_system_user_to_cluster_assets(system_user) - - -@shared_task -def push_asset_system_users(asset, system_users=None, task_name=None): - from ops.utils import create_or_update_ansible_task - if task_name is None: - task_name = "PUSH-ASSET-SYSTEM-USER-{}".format(asset.hostname) +def push_asset_system_users_util(asset, task_name, system_users=None): + from ops.utils import update_or_create_ansible_task if system_users is None: system_users = asset.cluster.systemuser_set.all() @@ -350,81 +403,38 @@ def push_asset_system_users(asset, system_users=None, task_name=None): tasks.extend(get_push_system_user_tasks(system_user)) hosts = [asset.hostname] - - task = create_or_update_ansible_task( + task, _ = update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' ) - result = task.run() - return result.results_summary - - -@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def update_asset_info_when_created(sender, instance=None, created=False, **kwargs): - if instance and created: - msg = "Receive asset {} create signal, update asset hardware info".format( - instance - ) - logger.debug(msg) - task_name = "UPDATE-ASSET-HARDWARE-INFO-WHEN-CREATED" - update_assets_hardware_info.delay([instance], task_name) - - -@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def update_asset_conn_info_on_created(sender, instance=None, created=False, **kwargs): - if instance and created: - task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance) - msg = "Receive asset {} create signal, test asset connectability".format( - instance - ) - logger.debug(msg) - test_admin_user_connectability_manual.delay(instance, task_name) + return task.run() @receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") def push_system_user_on_created(sender, instance=None, created=False, **kwargs): if instance and created: - task_name = 'PUSH-SYSTEM-USER-WHEN-ASSET-CREATED-{}'.format(instance) + task_name = const.PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME system_users = instance.cluster.systemuser_set.all() msg = "Receive asset {} create signal, push system users".format( instance ) logger.debug(msg) - push_asset_system_users.delay(instance, system_users, task_name=task_name) + push_asset_system_users_util.delay(instance, system_users, task_name=task_name) @receiver(post_save, sender=SystemUser) -def push_system_user_on_auth_change(sender, instance=None, update_fields=None, **kwargs): - fields_check = {'_password', '_private_key', '_public_key'} - auth_changed = update_fields & fields_check if update_fields else None - if instance and instance.auto_push and auth_changed: - logger.debug("System user `{}` auth changed, push it".format(instance.name)) - task_name = "PUSH-SYSTEM-USER-ON-CREATED-{}".format(instance.name) - push_system_user_to_cluster_assets.delay(instance, task_name) +def push_system_user_on_change(sender, instance=None, update_fields=None, **kwargs): + if instance and instance.auto_push: + logger.debug("System user `{}` changed, push it".format(instance.name)) + task_name = "PUSH SYSTEM USER ON CREATED: {}".format(instance.name) + push_system_user_util.delay(instance, task_name) + + + + + -periodic_tasks = ( - { - 'update_assets_hardware_period': { - 'task': 'assets.tasks.update_assets_hardware_period', - 'schedule': 60*60*60*24, - 'args': (), - }, - 'test-admin-user-connectability_period': { - 'task': 'assets.tasks.test_admin_user_connectability_period', - 'schedule': 60*60*60, - 'args': (), - }, - 'push_system_user_period': { - 'task': 'assets.tasks.push_system_user_period', - 'schedule': 60*60*60*24, - 'args': (), - } - } -) -def initial_periodic_tasks(): - from ops.utils import create_periodic_tasks - create_periodic_tasks(periodic_tasks) diff --git a/apps/assets/templates/assets/asset_modal_list.html b/apps/assets/templates/assets/asset_modal_list.html index bd0147fb6..ee82d530e 100644 --- a/apps/assets/templates/assets/asset_modal_list.html +++ b/apps/assets/templates/assets/asset_modal_list.html @@ -49,9 +49,6 @@ $(document).ready(function(){ "aaSorting": [[2, "asc"]], "aoColumnDefs": [ { "bSortable": false, "aTargets": [ 0 ] }], "bAutoWidth": false, - "language": { - "url": "/static/js/plugins/dataTables/i18n/zh-hans.json" - }, columns: [ {data: "checkbox"}, {data: "id"}, diff --git a/apps/assets/views/asset.py b/apps/assets/views/asset.py index 431b643e2..6cf917c0a 100644 --- a/apps/assets/views/asset.py +++ b/apps/assets/views/asset.py @@ -28,7 +28,7 @@ from common.utils import get_object_or_none, get_logger, is_uuid from .. import forms from ..models import Asset, AssetGroup, AdminUser, Cluster, SystemUser from ..hands import AdminUserRequiredMixin -from ..tasks import update_assets_hardware_info +from ..tasks import update_assets_hardware_info_util __all__ = [ @@ -314,10 +314,6 @@ class BulkImportAssetView(AdminUserRequiredMixin, JSONResponseMixin, FormView): except Exception as e: failed.append('%s: %s' % (asset_dict['hostname'], str(e))) - if assets: - update_assets_hardware_info.delay([asset._to_secret_json() for asset in assets]) - - data = { 'created': created, 'created_info': 'Created {}'.format(len(created)), diff --git a/apps/common/celery.py b/apps/common/celery.py index 6a08e1bd6..d05a36c25 100644 --- a/apps/common/celery.py +++ b/apps/common/celery.py @@ -1,8 +1,15 @@ # ~*~ coding: utf-8 ~*~ import os +import json +from functools import wraps -from celery import Celery +from celery import Celery, subtask +from celery.signals import worker_ready, worker_shutdown + +from .utils import get_logger + +logger = get_logger(__file__) # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings') @@ -15,3 +22,167 @@ app = Celery('jumpserver') # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) + + +def create_or_update_celery_periodic_tasks(tasks): + from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule + """ + :param tasks: { + 'add-every-monday-morning': { + 'task': 'tasks.add' # A registered celery task, + 'interval': 30, + 'crontab': "30 7 * * *", + 'args': (16, 16), + 'kwargs': {}, + 'enabled': False, + }, + } + :return: + """ + # Todo: check task valid, task and callback must be a celery task + for name, detail in tasks.items(): + interval = None + crontab = None + if isinstance(detail.get("interval"), int): + intervals = IntervalSchedule.objects.filter( + every=detail["interval"], period=IntervalSchedule.SECONDS + ) + if intervals: + interval = intervals[0] + else: + interval = IntervalSchedule.objects.create( + every=detail['interval'], + period=IntervalSchedule.SECONDS, + ) + elif isinstance(detail.get("crontab"), str): + try: + minute, hour, day, month, week = detail["crontab"].split() + except ValueError: + raise SyntaxError("crontab is not valid") + kwargs = dict( + minute=minute, hour=hour, day_of_week=week, + day_of_month=day, month_of_year=month, + ) + contabs = CrontabSchedule.objects.filter( + **kwargs + ) + if contabs: + crontab = contabs[0] + else: + crontab = CrontabSchedule.objects.create(**kwargs) + else: + raise SyntaxError("Schedule is not valid") + + defaults = dict( + interval=interval, + crontab=crontab, + name=name, + task=detail['task'], + args=json.dumps(detail.get('args', [])), + kwargs=json.dumps(detail.get('kwargs', {})), + enabled=detail.get('enabled', True), + ) + + task = PeriodicTask.objects.update_or_create( + defaults=defaults, name=name, + ) + return task + + +def disable_celery_periodic_task(task_name): + from django_celery_beat.models import PeriodicTask + PeriodicTask.objects.filter(name=task_name).update(enabled=False) + + +def delete_celery_periodic_task(task_name): + from django_celery_beat.models import PeriodicTask + PeriodicTask.objects.filter(name=task_name).delete() + + +__REGISTER_PERIODIC_TASKS = [] +__AFTER_APP_SHUTDOWN_CLEAN_TASKS = [] +__AFTER_APP_READY_RUN_TASKS = [] + + +def register_as_period_task(crontab=None, interval=None): + """ + Warning: Task must be have not any args and kwargs + :param crontab: "* * * * *" + :param interval: 60*60*60 + :return: + """ + if crontab is None and interval is None: + raise SyntaxError("Must set crontab or interval one") + + def decorate(func): + if crontab is None and interval is None: + raise SyntaxError("Interval and crontab must set one") + + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in __REGISTER_PERIODIC_TASKS: + create_or_update_celery_periodic_tasks({ + name: { + 'task': name, + 'interval': interval, + 'crontab': crontab, + 'args': (), + 'enabled': True, + } + }) + __REGISTER_PERIODIC_TASKS.append(name) + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + return wrapper + return decorate + + +def after_app_ready_start(func): + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in __AFTER_APP_READY_RUN_TASKS: + __AFTER_APP_READY_RUN_TASKS.append(name) + + @wraps(func) + def decorate(*args, **kwargs): + return func(*args, **kwargs) + + return decorate + + +def after_app_shutdown_clean(func): + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in __AFTER_APP_READY_RUN_TASKS: + __AFTER_APP_SHUTDOWN_CLEAN_TASKS.append(name) + + @wraps(func) + def decorate(*args, **kwargs): + return func(*args, **kwargs) + + return decorate + + +@worker_ready.connect +def on_app_ready(sender=None, headers=None, body=None, **kwargs): + logger.debug("App ready signal recv") + logger.debug("Start need start task: [{}]".format( + ", ".join(__AFTER_APP_READY_RUN_TASKS)) + ) + for task in __AFTER_APP_READY_RUN_TASKS: + subtask(task).delay() + + +@worker_shutdown.connect +def after_app_shutdown(sender=None, headers=None, body=None, **kwargs): + from django_celery_beat.models import PeriodicTask + logger.debug("App shutdown signal recv") + logger.debug("Clean need cleaned period tasks: [{}]".format( + ', '.join(__AFTER_APP_SHUTDOWN_CLEAN_TASKS)) + ) + PeriodicTask.objects.filter(name__in=__AFTER_APP_SHUTDOWN_CLEAN_TASKS).delete() diff --git a/apps/common/tasks.py b/apps/common/tasks.py index 4e6e33fc4..e8d6ba8b0 100644 --- a/apps/common/tasks.py +++ b/apps/common/tasks.py @@ -1,6 +1,6 @@ from django.core.mail import send_mail from django.conf import settings -from common import celery_app as app +from .celery import app from .utils import get_logger diff --git a/apps/common/utils.py b/apps/common/utils.py index f1edce12e..9af801c1c 100644 --- a/apps/common/utils.py +++ b/apps/common/utils.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- # -import json import re from collections import OrderedDict from six import string_types import base64 import os from itertools import chain -import string import logging import datetime import time @@ -27,9 +25,6 @@ from django.conf import settings from django.utils import timezone -from .compat import to_bytes, to_string - -SECRET_KEY = settings.SECRET_KEY UUID_PATTERN = re.compile(r'[0-9a-zA-Z\-]{36}') @@ -51,9 +46,22 @@ def get_object_or_none(model, **kwargs): return obj -class Signer(object): +class Singleton(type): + def __init__(cls, *args, **kwargs): + cls.__instance = None + super().__init__(*args, **kwargs) + + def __call__(cls, *args, **kwargs): + if cls.__instance is None: + cls.__instance = super().__call__(*args, **kwargs) + return cls.__instance + else: + return cls.__instance + + +class Signer(metaclass=Singleton): """用来加密,解密,和基于时间戳的方式验证token""" - def __init__(self, secret_key=SECRET_KEY): + def __init__(self, secret_key=None): self.secret_key = secret_key def sign(self, value): @@ -100,58 +108,10 @@ def combine_seq(s1, s2, callback=None): return seq -def search_object_attr(obj, value='', attr_list=None, ignore_case=False): - """It's provide a method to search a object attribute equal some value - - If object some attribute equal :param: value, return True else return False - - class A(): - name = 'admin' - age = 7 - - :param obj: A object - :param value: A string match object attribute - :param attr_list: Only match attribute in attr_list - :param ignore_case: Ignore case - :return: Boolean - """ - if value == '': - return True - - try: - object_attr = obj.__dict__ - except AttributeError: - return False - - if attr_list is not None: - new_object_attr = {} - for attr in attr_list: - new_object_attr[attr] = object_attr.pop(attr) - object_attr = new_object_attr - - if ignore_case: - if not isinstance(value, string_types): - return False - - if value.lower() in map(string.lower, map(str, object_attr.values())): - return True - else: - if value in object_attr.values(): - return True - return False - - def get_logger(name=None): return logging.getLogger('jumpserver.%s' % name) -def int_seq(seq): - try: - return map(int, seq) - except ValueError: - return seq - - def timesince(dt, since='', default="just now"): """ Returns string representing "time since" e.g. @@ -391,4 +351,6 @@ def is_uuid(s): return False -signer = Signer() +def get_signer(): + signer = Signer(settings.SECRET_KEY) + return signer diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index e17ce5d99..049ca6f18 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -337,8 +337,9 @@ CELERY_ACCEPT_CONTENT = ['json', 'pickle'] CELERY_RESULT_EXPIRES = 3600 CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +CELERY_TASK_EAGER_PROPAGATES = True CELERY_TIMEZONE = TIME_ZONE -# TERMINAL_HEATBEAT_INTERVAL = CONFIG.TERMINAL_HEATBEAT_INTERVAL or 30 +# CELERY_ENABLE_UTC = True # Cache use redis diff --git a/apps/ops/decorators.py b/apps/ops/decorators.py deleted file mode 100644 index 88b96c6d7..000000000 --- a/apps/ops/decorators.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# -from functools import wraps - - -TASK_PREFIX = "TOOT" -CALLBACK_PREFIX = "COC" - - -def register_as_period_task(crontab=None, interval=None): - """ - :param crontab: "* * * * *" - :param interval: 60*60*60 - :return: - """ - from .utils import create_or_update_celery_periodic_tasks - if crontab is None and interval is None: - raise SyntaxError("Must set crontab or interval one") - - def decorate(func): - @wraps(func) - def wrapper(*args, **kwargs): - tasks = { - func.__name__: { - 'task': func.__name__, - 'args': args, - 'kwargs': kwargs, - 'interval': interval, - 'crontab': crontab, - 'enabled': True, - } - } - create_or_update_celery_periodic_tasks(tasks) - return func(*args, **kwargs) - return wrapper - return decorate - - diff --git a/apps/ops/models.py b/apps/ops/models.py index 67fd7e052..04dc9aa0a 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -9,7 +9,9 @@ from django.utils import timezone from django.utils.translation import ugettext_lazy as _ from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask -from common.utils import signer, get_logger +from common.utils import get_signer, get_logger +from common.celery import delete_celery_periodic_task, create_or_update_celery_periodic_tasks, \ + disable_celery_periodic_task from .ansible import AdHocRunner, AnsibleError from .inventory import JMSInventory @@ -17,6 +19,7 @@ __all__ = ["Task", "AdHoc", "AdHocRunHistory"] logger = get_logger(__file__) +signer = get_signer() class Task(models.Model): @@ -82,8 +85,6 @@ class Task(models.Model): def save(self, force_insert=False, force_update=False, using=None, update_fields=None): - from .utils import create_or_update_celery_periodic_tasks, \ - disable_celery_periodic_task from .tasks import run_ansible_task super().save( force_insert=force_insert, force_update=force_update, @@ -114,7 +115,6 @@ class Task(models.Model): disable_celery_periodic_task(self.name) def delete(self, using=None, keep_parents=False): - from .utils import delete_celery_periodic_task super().delete(using=using, keep_parents=keep_parents) delete_celery_periodic_task(self.name) @@ -246,7 +246,7 @@ class AdHoc(models.Model): } :return: """ - self._become = signer.sign(json.dumps(item)) + self._become = signer.sign(json.dumps(item)).decode('utf-8') @property def options(self): @@ -271,6 +271,11 @@ class AdHoc(models.Model): except AdHocRunHistory.DoesNotExist: return None + def save(self, force_insert=False, force_update=False, using=None, + update_fields=None): + super().save(force_insert=force_insert, force_update=force_update, + using=using, update_fields=update_fields) + def __str__(self): return "{} of {}".format(self.task.name, self.short_id) diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index be891919d..41f60f20c 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -21,9 +21,9 @@ def run_ansible_task(task_id, callback=None, **kwargs): task = get_object_or_none(Task, id=task_id) if task: - result = task.object.run() + result = task.run() if callback is not None: - subtask(callback).delay(result) + subtask(callback).delay(result, task_name=task.name) return result else: logger.error("No task found") diff --git a/apps/ops/templates/ops/task_list.html b/apps/ops/templates/ops/task_list.html index 503f1046a..161a48ace 100644 --- a/apps/ops/templates/ops/task_list.html +++ b/apps/ops/templates/ops/task_list.html @@ -57,14 +57,20 @@ {{ object.adhoc.all | length}} {{ object.latest_adhoc.hosts | length}} - {% if object.latest_history.is_success %} - - {% else %} - + {% if object.latest_history %} + {% if object.latest_history.is_success %} + + {% else %} + + {% endif %} {% endif %} {{ object.latest_history.date_start }} - {{ object.latest_history.timedelta|floatformat }} s + + {% if object.latest_history %} + {{ object.latest_history.timedelta|floatformat }} s + {% endif %} + {% trans "Run" %} {% trans "Delete" %} diff --git a/apps/ops/utils.py b/apps/ops/utils.py index 55b5761b6..0a9dce8c9 100644 --- a/apps/ops/utils.py +++ b/apps/ops/utils.py @@ -1,8 +1,4 @@ # ~*~ coding: utf-8 ~*~ -import json -from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule - - from common.utils import get_logger, get_object_or_none from .models import Task, AdHoc @@ -13,22 +9,27 @@ def get_task_by_id(task_id): return get_object_or_none(Task, id=task_id) -def create_or_update_ansible_task( - task_name, hosts, tasks, pattern='all', options=None, +def update_or_create_ansible_task( + task_name, hosts, tasks, + interval=None, crontab=None, is_periodic=False, + callback=None, pattern='all', options=None, run_as_admin=False, run_as="", become_info=None, - created_by=None, interval=None, crontab=None, - is_periodic=False, callback=None, + created_by=None, ): - task = get_object_or_none(Task, name=task_name) + defaults = { + 'name': task_name, + 'interval': interval, + 'crontab': crontab, + 'is_periodic': is_periodic, + 'callback': callback, + 'created_by': created_by, + } - if task is None: - task = Task( - name=task_name, interval=interval, - crontab=crontab, is_periodic=is_periodic, - callback=callback, created_by=created_by - ) - task.save() + created = False + task, _ = Task.objects.update_or_create( + defaults=defaults, name=task_name, + ) adhoc = task.latest_adhoc new_adhoc = AdHoc(task=task, pattern=pattern, @@ -38,70 +39,13 @@ def create_or_update_ansible_task( new_adhoc.tasks = tasks new_adhoc.options = options new_adhoc.become = become_info + if not adhoc or adhoc != new_adhoc: + logger.debug("Task create new adhoc: {}".format(task_name)) new_adhoc.save() task.latest_adhoc = new_adhoc - return task + created = True + return task, created -def create_or_update_celery_periodic_tasks(tasks): - """ - :param tasks: { - 'add-every-monday-morning': { - 'task': 'tasks.add' # A registered celery task, - 'interval': 30, - 'crontab': "30 7 * * *", - 'args': (16, 16), - 'kwargs': {}, - 'enabled': False, - }, - } - :return: - """ - # Todo: check task valid, task and callback must be a celery task - for name, detail in tasks.items(): - interval = None - crontab = None - if isinstance(detail.get("interval"), int): - interval, _ = IntervalSchedule.objects.get_or_create( - every=detail['interval'], - period=IntervalSchedule.SECONDS, - ) - elif isinstance(detail.get("crontab"), str): - try: - minute, hour, day, month, week = detail["crontab"].split() - except ValueError: - raise SyntaxError("crontab is not valid") - - crontab, _ = CrontabSchedule.objects.get_or_create( - minute=minute, hour=hour, day_of_week=week, - day_of_month=day, month_of_year=month, - ) - else: - raise SyntaxError("Schedule is not valid") - - defaults = dict( - interval=interval, - crontab=crontab, - name=name, - task=detail['task'], - args=json.dumps(detail.get('args', [])), - kwargs=json.dumps(detail.get('kwargs', {})), - enabled=detail['enabled'] - ) - - task = PeriodicTask.objects.update_or_create( - defaults=defaults, name=name, - ) - logger.info("Create periodic task: {}".format(task)) - return task - - -def disable_celery_periodic_task(task_name): - PeriodicTask.objects.filter(name=task_name).update(enabled=False) - - -def delete_celery_periodic_task(task_name): - PeriodicTask.objects.filter(name=task_name).delete() - diff --git a/apps/static/js/jumpserver.js b/apps/static/js/jumpserver.js index 04b081dcb..6992f06e9 100644 --- a/apps/static/js/jumpserver.js +++ b/apps/static/js/jumpserver.js @@ -262,9 +262,6 @@ jumpserver.initDataTable = function (options) { var table = ele.DataTable({ pageLength: options.pageLength || 15, dom: options.dom || '<"#uc.pull-left">flt<"row m-t"<"col-md-8"<"#op.col-md-6"><"col-md-6 text-center"i>><"col-md-4"p>>', - language: { - url: options.i18n_url || "/static/js/plugins/dataTables/i18n/zh-hans.json" - }, order: options.order || [], select: options.select || 'multi', buttons: [], diff --git a/apps/templates/_left_side_bar.html b/apps/templates/_left_side_bar.html index 04aa89c3c..1e0392325 100644 --- a/apps/templates/_left_side_bar.html +++ b/apps/templates/_left_side_bar.html @@ -2,7 +2,7 @@