From 4893c4664d7c8057874d428c09d5545582647624 Mon Sep 17 00:00:00 2001 From: ibuler Date: Fri, 22 Dec 2017 02:08:29 +0800 Subject: [PATCH 01/18] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9task?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E8=BF=90=E8=A1=8C=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/__init__.py | 4 +- apps/assets/tasks.py | 16 ++++--- apps/common/mixins.py | 1 - apps/common/utils.py | 1 + apps/jumpserver/settings.py | 6 +-- apps/ops/ansible/__init__.py | 2 +- apps/ops/ansible/exceptions.py | 4 ++ apps/ops/models.py | 78 +++++++++++++++++++++++++++++++--- apps/ops/tasks.py | 8 ++-- apps/ops/utils.py | 16 +++++++ config_example.py | 25 ++++++++--- requirements/requirements.txt | 2 + run_server.py | 64 +++++++++++++++++----------- 13 files changed, 174 insertions(+), 53 deletions(-) diff --git a/apps/__init__.py b/apps/__init__.py index f93d0bec7..5c66b357e 100644 --- a/apps/__init__.py +++ b/apps/__init__.py @@ -2,6 +2,4 @@ # -*- coding: utf-8 -*- # - -if __name__ == '__main__': - pass +__version__ = "0.5.0" diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index a46e697fc..494162eb4 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -8,10 +8,8 @@ from django.db.models.signals import post_save from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger -from common.celery import app as celery_app from .models import SystemUser, AdminUser, Asset from . import const -from .signals import on_app_ready FORKS = 10 @@ -402,22 +400,28 @@ def push_system_user_on_auth_change(sender, instance=None, update_fields=None, * push_system_user_to_cluster_assets.delay(instance, task_name) -celery_app.conf['CELERYBEAT_SCHEDULE'].update( +periodic_tasks = ( { 'update_assets_hardware_period': { 'task': 'assets.tasks.update_assets_hardware_period', - 'schedule': 60*60*24, + 'schedule': 60*60*60*24, 'args': (), }, 'test-admin-user-connectability_period': { 'task': 'assets.tasks.test_admin_user_connectability_period', - 'schedule': 60*60, + 'schedule': 60*60*60, 'args': (), }, 'push_system_user_period': { 'task': 'assets.tasks.push_system_user_period', - 'schedule': 60*60, + 'schedule': 60*60*60*24, 'args': (), } } ) + + +def initial_periodic_tasks(): + from ops.utils import create_periodic_tasks + create_periodic_tasks(periodic_tasks) + diff --git a/apps/common/mixins.py b/apps/common/mixins.py index 764d8e50a..8f6076e4a 100644 --- a/apps/common/mixins.py +++ b/apps/common/mixins.py @@ -7,7 +7,6 @@ from django.utils.timezone import now from django.utils.translation import ugettext_lazy as _ - class NoDeleteQuerySet(models.query.QuerySet): def delete(self): diff --git a/apps/common/utils.py b/apps/common/utils.py index e7b07860b..f1edce12e 100644 --- a/apps/common/utils.py +++ b/apps/common/utils.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # +import json import re from collections import OrderedDict from six import string_types diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index 040a92cc4..a6da1124a 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -27,9 +27,7 @@ sys.path.append(PROJECT_DIR) # Import project config setting try: - from config import config as env_config, env - - CONFIG = env_config.get(env, 'default')() + from config import config as CONFIG except ImportError: CONFIG = type('_', (), {'__getattr__': lambda arg1, arg2: None})() @@ -66,12 +64,12 @@ INSTALLED_APPS = [ 'django_filters', 'bootstrap3', 'captcha', + 'django_celery_beat', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', - ] MIDDLEWARE = [ diff --git a/apps/ops/ansible/__init__.py b/apps/ops/ansible/__init__.py index d59972354..a175387eb 100644 --- a/apps/ops/ansible/__init__.py +++ b/apps/ops/ansible/__init__.py @@ -3,4 +3,4 @@ from .callback import * from .inventory import * from .runner import * - +from .exceptions import * diff --git a/apps/ops/ansible/exceptions.py b/apps/ops/ansible/exceptions.py index e49afd34a..12061bf9d 100644 --- a/apps/ops/ansible/exceptions.py +++ b/apps/ops/ansible/exceptions.py @@ -1,6 +1,10 @@ # -*- coding: utf-8 -*- # +__all__ = [ + 'AnsibleError' +] + class AnsibleError(Exception): pass diff --git a/apps/ops/models.py b/apps/ops/models.py index e6b346334..b9ff7b460 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -4,10 +4,15 @@ import logging import json import uuid +import time from django.db import models +from django.utils import timezone from django.utils.translation import ugettext_lazy as _ +from django.core import serializers +from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask from common.utils import signer +from .ansible import AdHocRunner, AnsibleError __all__ = ["Task", "AdHoc", "AdHocRunHistory"] @@ -22,7 +27,17 @@ class Task(models.Model): """ id = models.UUIDField(default=uuid.uuid4, primary_key=True) name = models.CharField(max_length=128, unique=True, verbose_name=_('Name')) + interval = models.ForeignKey( + IntervalSchedule, on_delete=models.CASCADE, + null=True, blank=True, verbose_name=_('Interval'), + ) + crontab = models.ForeignKey( + CrontabSchedule, on_delete=models.CASCADE, null=True, blank=True, + verbose_name=_('Crontab'), help_text=_('Use one of Interval/Crontab'), + ) + is_periodic = models.BooleanField(default=False) is_deleted = models.BooleanField(default=False) + comment = models.TextField(blank=True, verbose_name=_("Comment")) created_by = models.CharField(max_length=128, blank=True, null=True, default='') date_created = models.DateTimeField(auto_now_add=True) __latest_adhoc = None @@ -65,12 +80,32 @@ class Task(models.Model): def get_run_history(self): return self.history.all() - def run(self): + def run(self, record=True): if self.latest_adhoc: - return self.latest_adhoc.run() + return self.latest_adhoc.run(record=record) else: return {'error': 'No adhoc'} + def save(self, force_insert=False, force_update=False, using=None, + update_fields=None): + instance = super().save( + force_insert=force_insert, force_update=force_update, + using=using, update_fields=update_fields, + ) + + if instance.is_periodic: + PeriodicTask.objects.update_or_create( + interval=instance.interval, + crontab=instance.crontab, + name=self.name, + task='ops.run_task', + args=serializers.serialize('json', [instance]), + ) + else: + PeriodicTask.objects.filter(name=self.name).delete() + + return instance + def __str__(self): return self.name @@ -128,9 +163,42 @@ class AdHoc(models.Model): else: return {} - def run(self): - from .utils import run_adhoc_object - return run_adhoc_object(self, **self.options) + def run(self, record=True): + if record: + return self._run_and_record() + else: + return self._run_only() + + def _run_and_record(self): + history = AdHocRunHistory(adhoc=self, task=self.task) + time_start = time.time() + try: + result = self._run_only() + history.is_finished = True + if result.results_summary.get('dark'): + history.is_success = False + else: + history.is_success = True + history.result = result.results_raw + history.summary = result.results_summary + return result + finally: + history.date_finished = timezone.now() + history.timedelta = time.time() - time_start + history.save() + + def _run_only(self): + from .utils import get_adhoc_inventory + inventory = get_adhoc_inventory(self) + runner = AdHocRunner(inventory) + for k, v in self.options.items(): + runner.set_option(k, v) + + try: + result = runner.run(self.tasks, self.pattern, self.task.name) + return result + except AnsibleError as e: + logger.error("Failed run adhoc {}, {}".format(self.task.name, e)) @become.setter def become(self, item): diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index c5298377d..b2647c465 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -1,7 +1,6 @@ # coding: utf-8 from celery import shared_task - -from .utils import run_adhoc +from django.core import serializers def rerun_task(): @@ -9,5 +8,6 @@ def rerun_task(): @shared_task -def run_add_hoc_and_record_async(adhoc, **options): - return run_adhoc(adhoc, **options) +def run_task(tasks_json): + for task in serializers.deserialize('json', tasks_json): + task.object.run() diff --git a/apps/ops/utils.py b/apps/ops/utils.py index 3c09d598f..f015f8392 100644 --- a/apps/ops/utils.py +++ b/apps/ops/utils.py @@ -3,6 +3,7 @@ import time from django.utils import timezone from django.db import transaction +from django_celery_beat.models import PeriodicTask, IntervalSchedule from common.utils import get_logger, get_object_or_none, get_short_uuid_str from .ansible import AdHocRunner, CommandResultCallback @@ -131,4 +132,19 @@ def create_or_update_task( return task +def create_periodic_tasks(tasks): + for name, detail in tasks.items(): + schedule, _ = IntervalSchedule.objects.get_or_create( + every=detail['schedule'], + period=IntervalSchedule.SECONDS, + ) + + task = PeriodicTask.objects.create( + interval=schedule, + name=name, + task=detail['task'], + args=json.dumps(detail.get('args', [])), + kwargs=json.dumps(detail.get('kwargs', {})), + ) + print("Create periodic task: {}".format(task)) diff --git a/config_example.py b/config_example.py index 95c6414c6..10cf063e4 100644 --- a/config_example.py +++ b/config_example.py @@ -4,7 +4,7 @@ Jumpserver project setting file - :copyright: (c) 2014-2016 by Jumpserver Team. + :copyright: (c) 2014-2017 by Jumpserver Team :license: GPL v2, see LICENSE for more details. """ import os @@ -50,6 +50,11 @@ class Config: # DB_PASSWORD = '' # DB_NAME = 'jumpserver' + # When Django start it will bind this host and port + # ./manage.py runserver 127.0.0.1:8080 + HTTP_BIND_HOST = '0.0.0.0' + HTTP_LISTEN_PORT = 8080 + # Use Redis as broker for celery and web socket REDIS_HOST = '127.0.0.1' REDIS_PORT = 6379 @@ -101,8 +106,18 @@ class Config: return None -config = { - 'default': Config, -} +class DevelopmentConfig(Config): + pass + + +class TestConfig(Config): + pass + + +class ProductionConfig(Config): + pass + + +# Default using Config settings, you can write if/else for different env +config = Config() -env = 'default' diff --git a/requirements/requirements.txt b/requirements/requirements.txt index b25089107..552fd1802 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -57,3 +57,5 @@ uritemplate==3.0.0 urllib3==1.22 vine==1.1.4 gunicorn==19.7.1 +django_celery_beat==1.1.0 +ephem==3.7.6.0 diff --git a/run_server.py b/run_server.py index 1e9bed6a3..72905c0d6 100644 --- a/run_server.py +++ b/run_server.py @@ -1,48 +1,64 @@ #!/usr/bin/env python -# ~*~ coding: utf-8 ~*~ -from threading import Thread import os import subprocess +import time +from threading import Thread + +from apps import __version__ try: - from config import config as env_config, env - - CONFIG = env_config.get(env, 'default')() + from config import config as CONFIG except ImportError: CONFIG = type('_', (), {'__getattr__': None})() BASE_DIR = os.path.dirname(os.path.abspath(__file__)) - -apps_dir = os.path.join(BASE_DIR, 'apps') +APPS_DIR = os.path.join(BASE_DIR, 'apps') +HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1' +HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080 +LOG_LEVEL = CONFIG.LOG_LEVEL +WORKERS = 4 -def start_django(): - http_host = CONFIG.HTTP_BIND_HOST or '127.0.0.1' - http_port = CONFIG.HTTP_LISTEN_PORT or '8080' - os.chdir(apps_dir) - print('start django') - subprocess.call('python ./manage.py runserver %s:%s' % (http_host, http_port), shell=True) +def start_gunicorn(): + print("- Start Gunicorn WSGI HTTP Server") + os.chdir(APPS_DIR) + cmd = "gunicorn jumpserver.wsgi -b {}:{} -w {}".format(HTTP_HOST, HTTP_PORT, WORKERS) + subprocess.call(cmd, shell=True) def start_celery(): - os.chdir(apps_dir) - os.environ.setdefault('C_FORCE_ROOT', '1') - os.environ.setdefault('PYTHONOPTIMIZE', '1') - print('start celery') - subprocess.call('celery -A common worker -B -s /tmp/celerybeat-schedule -l debug', shell=True) + print("- Start Celery as Distributed Task Queue") + os.chdir(APPS_DIR) + # os.environ.setdefault('PYTHONOPTIMIZE', '1') + cmd = 'celery -A common worker -l {}'.format(LOG_LEVEL.lower()) + subprocess.call(cmd, shell=True) + + +def start_beat(): + print("- Start Beat as Periodic Task Scheduler") + os.chdir(APPS_DIR) + # os.environ.setdefault('PYTHONOPTIMIZE', '1') + schduler = "django_celery_beat.schedulers:DatabaseScheduler" + cmd = 'celery -A common beat -l {} --scheduler {}'.format(LOG_LEVEL, schduler) + subprocess.call(cmd, shell=True) def main(): - t1 = Thread(target=start_django, args=()) - t2 = Thread(target=start_celery, args=()) + print(time.ctime()) + print('Jumpserver version {}, more see https://www.jumpserver.org'.format( + __version__)) + print('Quit the server with CONTROL-C.') - t1.start() - t2.start() + threads = [] + for func in (start_gunicorn, start_celery, start_beat): + t = Thread(target=func, args=()) + threads.append(t) + t.start() - t1.join() - t2.join() + for t in threads: + t.join() if __name__ == '__main__': From 30efec1b0905f580c3beba635a14b28dfeff0f1c Mon Sep 17 00:00:00 2001 From: ibuler Date: Fri, 22 Dec 2017 21:42:12 +0800 Subject: [PATCH 02/18] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9=20task=20?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 + apps/assets/tasks.py | 27 ++--- apps/common/celery.py | 7 +- apps/jumpserver/settings.py | 16 +-- apps/jumpserver/urls.py | 5 +- apps/ops/ansible/callback.py | 2 + apps/ops/decorators.py | 38 +++++++ apps/ops/models.py | 102 +++++++++++++------ apps/ops/tasks.py | 39 +++++++- apps/ops/utils.py | 183 +++++++++++++--------------------- requirements/requirements.txt | 2 +- run_server.py | 12 ++- 12 files changed, 253 insertions(+), 183 deletions(-) create mode 100644 apps/ops/decorators.py diff --git a/.gitignore b/.gitignore index 1cc61cd36..04bf6bec7 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ jumpserver.iml tmp/* sessions/* media +celerybeat.pid +django.db +celerybeat-schedule.db diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index 494162eb4..64de5423f 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -10,6 +10,7 @@ from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger from .models import SystemUser, AdminUser, Asset from . import const +from ops.decorators import register_as_period_task FORKS = 10 @@ -71,12 +72,12 @@ def update_assets_hardware_info(assets, task_name=None): :param task_name: task_name running :return: result summary ['contacted': {}, 'dark': {}] """ - from ops.utils import create_or_update_task + from ops.utils import create_or_update_ansible_task if task_name is None: task_name = const.UPDATE_ASSETS_HARDWARE_TASK_NAME tasks = const.UPDATE_ASSETS_HARDWARE_TASKS hostname_list = [asset.hostname for asset in assets] - task = create_or_update_task( + task = create_or_update_ansible_task( task_name, hosts=hostname_list, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) @@ -88,11 +89,13 @@ def update_assets_hardware_info(assets, task_name=None): @shared_task +@register_as_period_task(interval=60*60*60*24) def update_assets_hardware_period(): """ Update asset hardware period task :return: """ + from ops.utils import create_or_update_ansible_task task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME if cache.get(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1: msg = "Task {} is running or before long, passed this time".format( @@ -115,7 +118,7 @@ def test_admin_user_connectability(admin_user, force=False): :param force: Force update :return: """ - from ops.utils import create_or_update_task + from ops.utils import create_or_update_ansible_task task_name = const.TEST_ADMIN_USER_CONN_TASK_NAME.format(admin_user.name) lock_key = const.TEST_ADMIN_USER_CONN_LOCK_KEY.format(admin_user.name) @@ -127,7 +130,7 @@ def test_admin_user_connectability(admin_user, force=False): assets = admin_user.get_related_assets() hosts = [asset.hostname for asset in assets] tasks = const.TEST_ADMIN_USER_CONN_TASKS - task = create_or_update_task( + task = create_or_update_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) @@ -166,12 +169,12 @@ def test_admin_user_connectability_period(): @shared_task def test_admin_user_connectability_manual(asset, task_name=None): - from ops.utils import create_or_update_task + from ops.utils import create_or_update_ansible_task if task_name is None: task_name = const.TEST_ASSET_CONN_TASK_NAME hosts = [asset.hostname] tasks = const.TEST_ADMIN_USER_CONN_TASKS - task = create_or_update_task( + task = create_or_update_ansible_task( task_name, tasks=tasks, hosts=hosts, run_as_admin=True, created_by='System', options=const.TASK_OPTIONS, pattern='all', ) @@ -193,7 +196,7 @@ def test_system_user_connectability(system_user, force=False): :param force :return: """ - from ops.utils import create_or_update_task + from ops.utils import create_or_update_ansible_task lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY.format(system_user.name) task_name = const.TEST_SYSTEM_USER_CONN_TASK_NAME.format(system_user.name) if cache.get(lock_key, 0) == 1 and not force: @@ -202,7 +205,7 @@ def test_system_user_connectability(system_user, force=False): assets = system_user.get_clusters_assets() hosts = [asset.hostname for asset in assets] tasks = const.TEST_SYSTEM_USER_CONN_TASKS - task = create_or_update_task( + task = create_or_update_ansible_task( task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as=system_user.name, created_by="System", @@ -269,7 +272,7 @@ def get_push_system_user_tasks(system_user): @shared_task def push_system_user(system_user, assets, task_name=None): - from ops.utils import create_or_update_task + from ops.utils import create_or_update_ansible_task if system_user.auto_push and assets: if task_name is None: @@ -278,7 +281,7 @@ def push_system_user(system_user, assets, task_name=None): hosts = [asset.hostname for asset in assets] tasks = get_push_system_user_tasks(system_user) - task = create_or_update_task( + task = create_or_update_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' ) @@ -334,7 +337,7 @@ def push_system_user_period(): @shared_task def push_asset_system_users(asset, system_users=None, task_name=None): - from ops.utils import create_or_update_task + from ops.utils import create_or_update_ansible_task if task_name is None: task_name = "PUSH-ASSET-SYSTEM-USER-{}".format(asset.hostname) @@ -348,7 +351,7 @@ def push_asset_system_users(asset, system_users=None, task_name=None): hosts = [asset.hostname] - task = create_or_update_task( + task = create_or_update_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' ) diff --git a/apps/common/celery.py b/apps/common/celery.py index 735230ae2..6a08e1bd6 100644 --- a/apps/common/celery.py +++ b/apps/common/celery.py @@ -13,10 +13,5 @@ app = Celery('jumpserver') # Using a string here means the worker will not have to # pickle the object when using Windows. -app.config_from_object('django.conf:settings') +app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) - -app.conf.update( - CELERYBEAT_SCHEDULE={ - } -) diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index a6da1124a..e17ce5d99 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -130,7 +130,6 @@ MESSAGE_STORAGE = 'django.contrib.messages.storage.cookie.CookieStorage' # } # } -print(CONFIG.DB_ENGINE) DATABASES = { 'default': { 'ENGINE': 'django.db.backends.{}'.format(CONFIG.DB_ENGINE), @@ -243,7 +242,8 @@ LOGGING = { # https://docs.djangoproject.com/en/1.10/topics/i18n/ LANGUAGE_CODE = 'en-us' -TIME_ZONE = 'Asia/Shanghai' +TIME_ZONE = 'UTC' +# TIME_ZONE = 'Asia/Shanghai' USE_I18N = True @@ -258,6 +258,8 @@ LOCALE_PATHS = [os.path.join(BASE_DIR, 'locale'), ] # https://docs.djangoproject.com/en/1.10/howto/static-files/ STATIC_URL = '/static/' +STATIC_ROOT = os.path.join(BASE_DIR, "static") + STATICFILES_DIRS = ( os.path.join(BASE_DIR, "static"), @@ -323,18 +325,18 @@ if CONFIG.AUTH_LDAP: AUTH_LDAP_USER_ATTR_MAP = CONFIG.AUTH_LDAP_USER_ATTR_MAP # Celery using redis as broker -BROKER_URL = 'redis://:%(password)s@%(host)s:%(port)s/3' % { +CELERY_BROKER_URL = 'redis://:%(password)s@%(host)s:%(port)s/3' % { 'password': CONFIG.REDIS_PASSWORD if CONFIG.REDIS_PASSWORD else '', 'host': CONFIG.REDIS_HOST or '127.0.0.1', 'port': CONFIG.REDIS_PORT or 6379, } CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle' -CELERY_RESULT_BACKEND = BROKER_URL +CELERY_RESULT_BACKEND = CELERY_BROKER_URL CELERY_ACCEPT_CONTENT = ['json', 'pickle'] -CELERY_TASK_RESULT_EXPIRES = 3600 -CELERYD_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' -CELERYD_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +CELERY_RESULT_EXPIRES = 3600 +CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_TIMEZONE = TIME_ZONE # TERMINAL_HEATBEAT_INTERVAL = CONFIG.TERMINAL_HEATBEAT_INTERVAL or 30 diff --git a/apps/jumpserver/urls.py b/apps/jumpserver/urls.py index fe25d82b5..839492e7a 100644 --- a/apps/jumpserver/urls.py +++ b/apps/jumpserver/urls.py @@ -4,6 +4,7 @@ from __future__ import unicode_literals from django.conf.urls import url, include from django.conf import settings from django.conf.urls.static import static +from django.views.static import serve as static_serve from rest_framework.schemas import get_schema_view from rest_framework_swagger.renderers import SwaggerUIRenderer, OpenAPIRenderer @@ -33,8 +34,8 @@ urlpatterns = [ if settings.DEBUG: - urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) urlpatterns += [ url(r'^docs/', schema_view, name="docs"), - ] + ] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) \ + + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/apps/ops/ansible/callback.py b/apps/ops/ansible/callback.py index 810b14c51..2a7ce5a6d 100644 --- a/apps/ops/ansible/callback.py +++ b/apps/ops/ansible/callback.py @@ -28,6 +28,7 @@ class AdHocResultCallback(CallbackModule): host = res._host.get_name() task_name = res.task_name task_result = res._result + print(task_result) if self.results_raw[t].get(host): self.results_raw[t][host][task_name] = task_result @@ -50,6 +51,7 @@ class AdHocResultCallback(CallbackModule): contacted.remove(host) def v2_runner_on_failed(self, result, ignore_errors=False): + print("#######RUN FAILED" * 19) self.gather_result("failed", result) super().v2_runner_on_failed(result, ignore_errors=ignore_errors) diff --git a/apps/ops/decorators.py b/apps/ops/decorators.py new file mode 100644 index 000000000..88b96c6d7 --- /dev/null +++ b/apps/ops/decorators.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# +from functools import wraps + + +TASK_PREFIX = "TOOT" +CALLBACK_PREFIX = "COC" + + +def register_as_period_task(crontab=None, interval=None): + """ + :param crontab: "* * * * *" + :param interval: 60*60*60 + :return: + """ + from .utils import create_or_update_celery_periodic_tasks + if crontab is None and interval is None: + raise SyntaxError("Must set crontab or interval one") + + def decorate(func): + @wraps(func) + def wrapper(*args, **kwargs): + tasks = { + func.__name__: { + 'task': func.__name__, + 'args': args, + 'kwargs': kwargs, + 'interval': interval, + 'crontab': crontab, + 'enabled': True, + } + } + create_or_update_celery_periodic_tasks(tasks) + return func(*args, **kwargs) + return wrapper + return decorate + + diff --git a/apps/ops/models.py b/apps/ops/models.py index b9ff7b460..67fd7e052 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -1,6 +1,5 @@ # ~*~ coding: utf-8 ~*~ -import logging import json import uuid @@ -8,16 +7,16 @@ import time from django.db import models from django.utils import timezone from django.utils.translation import ugettext_lazy as _ -from django.core import serializers from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask -from common.utils import signer +from common.utils import signer, get_logger from .ansible import AdHocRunner, AnsibleError +from .inventory import JMSInventory __all__ = ["Task", "AdHoc", "AdHocRunHistory"] -logger = logging.getLogger(__name__) +logger = get_logger(__file__) class Task(models.Model): @@ -27,15 +26,10 @@ class Task(models.Model): """ id = models.UUIDField(default=uuid.uuid4, primary_key=True) name = models.CharField(max_length=128, unique=True, verbose_name=_('Name')) - interval = models.ForeignKey( - IntervalSchedule, on_delete=models.CASCADE, - null=True, blank=True, verbose_name=_('Interval'), - ) - crontab = models.ForeignKey( - CrontabSchedule, on_delete=models.CASCADE, null=True, blank=True, - verbose_name=_('Crontab'), help_text=_('Use one of Interval/Crontab'), - ) + interval = models.IntegerField(verbose_name=_("Interval"), null=True, blank=True, help_text=_("Units: seconds")) + crontab = models.CharField(verbose_name=_("Crontab"), null=True, blank=True, max_length=128, help_text=_("5 * * * *")) is_periodic = models.BooleanField(default=False) + callback = models.CharField(max_length=128, blank=True, null=True, verbose_name=_("Callback")) # Callback must be a registered celery task is_deleted = models.BooleanField(default=False) comment = models.TextField(blank=True, verbose_name=_("Comment")) created_by = models.CharField(max_length=128, blank=True, null=True, default='') @@ -88,23 +82,48 @@ class Task(models.Model): def save(self, force_insert=False, force_update=False, using=None, update_fields=None): - instance = super().save( + from .utils import create_or_update_celery_periodic_tasks, \ + disable_celery_periodic_task + from .tasks import run_ansible_task + super().save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, ) - if instance.is_periodic: - PeriodicTask.objects.update_or_create( - interval=instance.interval, - crontab=instance.crontab, - name=self.name, - task='ops.run_task', - args=serializers.serialize('json', [instance]), - ) - else: - PeriodicTask.objects.filter(name=self.name).delete() + if self.is_periodic: + interval = None + crontab = None - return instance + if self.interval: + interval = self.interval + elif self.crontab: + crontab = self.crontab + + tasks = { + self.name: { + "task": run_ansible_task.name, + "interval": interval, + "crontab": crontab, + "args": (str(self.id),), + "kwargs": {"callback": self.callback}, + "enabled": True, + } + } + create_or_update_celery_periodic_tasks(tasks) + else: + disable_celery_periodic_task(self.name) + + def delete(self, using=None, keep_parents=False): + from .utils import delete_celery_periodic_task + super().delete(using=using, keep_parents=keep_parents) + delete_celery_periodic_task(self.name) + + @property + def schedule(self): + try: + return PeriodicTask.objects.get(name=self.name) + except PeriodicTask.DoesNotExist: + return None def __str__(self): return self.name @@ -156,6 +175,23 @@ class AdHoc(models.Model): def hosts(self, item): self._hosts = json.dumps(item) + @property + def inventory(self): + if self.become: + become_info = { + 'become': { + self.become + } + } + else: + become_info = None + + inventory = JMSInventory( + self.hosts, run_as_admin=self.run_as_admin, + run_as=self.run_as, become_info=become_info + ) + return inventory + @property def become(self): if self._become: @@ -173,30 +209,30 @@ class AdHoc(models.Model): history = AdHocRunHistory(adhoc=self, task=self.task) time_start = time.time() try: - result = self._run_only() + raw, summary = self._run_only() history.is_finished = True - if result.results_summary.get('dark'): + if summary.get('dark'): history.is_success = False else: history.is_success = True - history.result = result.results_raw - history.summary = result.results_summary - return result + history.result = raw + history.summary = summary + return raw, summary + except: + return {}, {} finally: history.date_finished = timezone.now() history.timedelta = time.time() - time_start history.save() def _run_only(self): - from .utils import get_adhoc_inventory - inventory = get_adhoc_inventory(self) - runner = AdHocRunner(inventory) + runner = AdHocRunner(self.inventory) for k, v in self.options.items(): runner.set_option(k, v) try: result = runner.run(self.tasks, self.pattern, self.task.name) - return result + return result.results_raw, result.results_summary except AnsibleError as e: logger.error("Failed run adhoc {}, {}".format(self.task.name, e)) diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index b2647c465..be891919d 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -1,6 +1,10 @@ # coding: utf-8 -from celery import shared_task -from django.core import serializers +from celery import shared_task, subtask + +from common.utils import get_logger, get_object_or_none +from .models import Task + +logger = get_logger(__file__) def rerun_task(): @@ -8,6 +12,31 @@ def rerun_task(): @shared_task -def run_task(tasks_json): - for task in serializers.deserialize('json', tasks_json): - task.object.run() +def run_ansible_task(task_id, callback=None, **kwargs): + """ + :param task_id: is the tasks serialized data + :param callback: callback function name + :return: + """ + + task = get_object_or_none(Task, id=task_id) + if task: + result = task.object.run() + if callback is not None: + subtask(callback).delay(result) + return result + else: + logger.error("No task found") + + +@shared_task +def hello(name, callback=None): + print("Hello {}".format(name)) + if callback is not None: + subtask(callback).delay("Guahongwei") + + +@shared_task +def hello_callback(result): + print(result) + print("Hello callback") diff --git a/apps/ops/utils.py b/apps/ops/utils.py index f015f8392..55b5761b6 100644 --- a/apps/ops/utils.py +++ b/apps/ops/utils.py @@ -1,123 +1,36 @@ # ~*~ coding: utf-8 ~*~ +import json +from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule -import time -from django.utils import timezone -from django.db import transaction -from django_celery_beat.models import PeriodicTask, IntervalSchedule -from common.utils import get_logger, get_object_or_none, get_short_uuid_str -from .ansible import AdHocRunner, CommandResultCallback -from .inventory import JMSInventory -from .ansible.exceptions import AnsibleError -from .models import AdHocRunHistory, Task, AdHoc +from common.utils import get_logger, get_object_or_none +from .models import Task, AdHoc logger = get_logger(__file__) -def record_adhoc(func): - def _deco(adhoc, **options): - record = AdHocRunHistory(adhoc=adhoc, task=adhoc.task) - time_start = time.time() - try: - result = func(adhoc, **options) - record.is_finished = True - if result.results_summary.get('dark'): - record.is_success = False - else: - record.is_success = True - record.result = result.results_raw - record.summary = result.results_summary - return result - finally: - record.date_finished = timezone.now() - record.timedelta = time.time() - time_start - record.save() - return _deco +def get_task_by_id(task_id): + return get_object_or_none(Task, id=task_id) -def get_adhoc_inventory(adhoc): - if adhoc.become: - become_info = { - 'become': { - adhoc.become - } - } - else: - become_info = None - - inventory = JMSInventory( - adhoc.hosts, run_as_admin=adhoc.run_as_admin, - run_as=adhoc.run_as, become_info=become_info - ) - return inventory - - -def get_inventory(hostname_list, run_as_admin=False, run_as=None, become_info=None): - return JMSInventory( - hostname_list, run_as_admin=run_as_admin, - run_as=run_as, become_info=become_info - ) - - -def get_adhoc_runner(hostname_list, run_as_admin=False, run_as=None, become_info=None): - inventory = get_inventory( - hostname_list, run_as_admin=run_as_admin, - run_as=run_as, become_info=become_info - ) - runner = AdHocRunner(inventory) - return runner - - -@record_adhoc -def run_adhoc_object(adhoc, **options): - """ - :param adhoc: Instance of AdHoc - :param options: ansible support option, like forks ... - :return: - """ - name = adhoc.task.name - inventory = get_adhoc_inventory(adhoc) - runner = AdHocRunner(inventory) - for k, v in options.items(): - runner.set_option(k, v) - - try: - result = runner.run(adhoc.tasks, adhoc.pattern, name) - return result - except AnsibleError as e: - logger.error("Failed run adhoc {}, {}".format(name, e)) - raise - - -def run_adhoc(hostname_list, pattern, tasks, name=None, - run_as_admin=False, run_as=None, become_info=None): - if name is None: - name = "Adhoc-task-{}-{}".format( - get_short_uuid_str(), - timezone.now().strftime("%Y-%m-%d %H:%M:%S"), - ) - - inventory = get_inventory( - hostname_list, run_as_admin=run_as_admin, - run_as=run_as, become_info=become_info - ) - runner = AdHocRunner(inventory) - return runner.run(tasks, pattern, play_name=name) - - -def create_or_update_task( +def create_or_update_ansible_task( task_name, hosts, tasks, pattern='all', options=None, run_as_admin=False, run_as="", become_info=None, - created_by=None + created_by=None, interval=None, crontab=None, + is_periodic=False, callback=None, ): - print(options) - print(task_name) + task = get_object_or_none(Task, name=task_name) + if task is None: - task = Task(name=task_name, created_by=created_by) + task = Task( + name=task_name, interval=interval, + crontab=crontab, is_periodic=is_periodic, + callback=callback, created_by=created_by + ) task.save() - adhoc = task.get_latest_adhoc() + adhoc = task.latest_adhoc new_adhoc = AdHoc(task=task, pattern=pattern, run_as_admin=run_as_admin, run_as=run_as) @@ -128,23 +41,67 @@ def create_or_update_task( if not adhoc or adhoc != new_adhoc: new_adhoc.save() task.latest_adhoc = new_adhoc - print("Return task") return task -def create_periodic_tasks(tasks): +def create_or_update_celery_periodic_tasks(tasks): + """ + :param tasks: { + 'add-every-monday-morning': { + 'task': 'tasks.add' # A registered celery task, + 'interval': 30, + 'crontab': "30 7 * * *", + 'args': (16, 16), + 'kwargs': {}, + 'enabled': False, + }, + } + :return: + """ + # Todo: check task valid, task and callback must be a celery task for name, detail in tasks.items(): - schedule, _ = IntervalSchedule.objects.get_or_create( - every=detail['schedule'], - period=IntervalSchedule.SECONDS, - ) + interval = None + crontab = None + if isinstance(detail.get("interval"), int): + interval, _ = IntervalSchedule.objects.get_or_create( + every=detail['interval'], + period=IntervalSchedule.SECONDS, + ) + elif isinstance(detail.get("crontab"), str): + try: + minute, hour, day, month, week = detail["crontab"].split() + except ValueError: + raise SyntaxError("crontab is not valid") - task = PeriodicTask.objects.create( - interval=schedule, + crontab, _ = CrontabSchedule.objects.get_or_create( + minute=minute, hour=hour, day_of_week=week, + day_of_month=day, month_of_year=month, + ) + else: + raise SyntaxError("Schedule is not valid") + + defaults = dict( + interval=interval, + crontab=crontab, name=name, task=detail['task'], args=json.dumps(detail.get('args', [])), kwargs=json.dumps(detail.get('kwargs', {})), + enabled=detail['enabled'] ) - print("Create periodic task: {}".format(task)) + + task = PeriodicTask.objects.update_or_create( + defaults=defaults, name=name, + ) + logger.info("Create periodic task: {}".format(task)) + return task + + +def disable_celery_periodic_task(task_name): + PeriodicTask.objects.filter(name=task_name).update(enabled=False) + + +def delete_celery_periodic_task(task_name): + PeriodicTask.objects.filter(name=task_name).delete() + diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 552fd1802..6ee4c6e6d 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -3,7 +3,7 @@ ansible==2.4.2.0 asn1crypto==0.24.0 bcrypt==3.1.4 billiard==3.5.0.3 -celery==4.0.2 +celery==4.1.0 certifi==2017.11.5 cffi==1.11.2 chardet==3.0.4 diff --git a/run_server.py b/run_server.py index 72905c0d6..15f765107 100644 --- a/run_server.py +++ b/run_server.py @@ -17,6 +17,7 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__)) APPS_DIR = os.path.join(BASE_DIR, 'apps') HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1' HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080 +DEBUG = CONFIG.DEBUG LOG_LEVEL = CONFIG.LOG_LEVEL WORKERS = 4 @@ -25,13 +26,16 @@ def start_gunicorn(): print("- Start Gunicorn WSGI HTTP Server") os.chdir(APPS_DIR) cmd = "gunicorn jumpserver.wsgi -b {}:{} -w {}".format(HTTP_HOST, HTTP_PORT, WORKERS) + if DEBUG: + cmd += " --reload" subprocess.call(cmd, shell=True) def start_celery(): print("- Start Celery as Distributed Task Queue") os.chdir(APPS_DIR) - # os.environ.setdefault('PYTHONOPTIMIZE', '1') + # Todo: Must set this environment, if not no ansible result return + os.environ.setdefault('PYTHONOPTIMIZE', '1') cmd = 'celery -A common worker -l {}'.format(LOG_LEVEL.lower()) subprocess.call(cmd, shell=True) @@ -39,9 +43,9 @@ def start_celery(): def start_beat(): print("- Start Beat as Periodic Task Scheduler") os.chdir(APPS_DIR) - # os.environ.setdefault('PYTHONOPTIMIZE', '1') - schduler = "django_celery_beat.schedulers:DatabaseScheduler" - cmd = 'celery -A common beat -l {} --scheduler {}'.format(LOG_LEVEL, schduler) + os.environ.setdefault('PYTHONOPTIMIZE', '1') + scheduler = "django_celery_beat.schedulers:DatabaseScheduler" + cmd = 'celery -A common beat -l {} --scheduler {} --max-interval 30 '.format(LOG_LEVEL, scheduler) subprocess.call(cmd, shell=True) From bf9bb1b973cb4e38969fe3fe035256aad8981fd2 Mon Sep 17 00:00:00 2001 From: ibuler Date: Sun, 24 Dec 2017 18:53:07 +0800 Subject: [PATCH 03/18] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9ops=20task?= =?UTF-8?q?=E8=BF=90=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/assets/api.py | 16 +- apps/assets/const.py | 26 +- apps/assets/models/user.py | 3 +- apps/assets/tasks.py | 450 +++++++++--------- .../templates/assets/asset_modal_list.html | 3 - apps/assets/views/asset.py | 6 +- apps/common/celery.py | 173 ++++++- apps/common/tasks.py | 2 +- apps/common/utils.py | 74 +-- apps/jumpserver/settings.py | 3 +- apps/ops/decorators.py | 38 -- apps/ops/models.py | 15 +- apps/ops/tasks.py | 4 +- apps/ops/templates/ops/task_list.html | 16 +- apps/ops/utils.py | 98 +--- apps/static/js/jumpserver.js | 3 - apps/templates/_left_side_bar.html | 2 +- apps/templates/_user_profile.html | 6 +- apps/users/models/group.py | 7 - apps/users/models/user.py | 4 +- apps/users/serializers.py | 4 +- run_server.py | 2 +- 22 files changed, 503 insertions(+), 452 deletions(-) delete mode 100644 apps/ops/decorators.py diff --git a/apps/assets/api.py b/apps/assets/api.py index 1fe371dcd..064c2780e 100644 --- a/apps/assets/api.py +++ b/apps/assets/api.py @@ -25,9 +25,9 @@ from .hands import IsSuperUser, IsValidUser, IsSuperUserOrAppUser, \ get_user_granted_assets from .models import AssetGroup, Asset, Cluster, SystemUser, AdminUser from . import serializers -from .tasks import update_assets_hardware_info, test_admin_user_connectability, \ - test_admin_user_connectability_manual, push_system_user_to_cluster_assets, \ - test_system_user_connectability +from .tasks import update_assets_hardware_info_manual, test_admin_user_connectability_util, \ + test_asset_connectability_manual, push_system_user_to_cluster_assets_manual, \ + test_system_user_connectability_manual class AssetViewSet(IDInFilterMixin, BulkModelViewSet): @@ -222,7 +222,7 @@ class AssetRefreshHardwareApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): asset_id = kwargs.get('pk') asset = get_object_or_404(Asset, pk=asset_id) - summary = update_assets_hardware_info([asset]) + summary = update_assets_hardware_info_manual([asset]) if summary.get('dark'): return Response(summary['dark'].values(), status=501) else: @@ -239,7 +239,7 @@ class AssetAdminUserTestApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): asset_id = kwargs.get('pk') asset = get_object_or_404(Asset, pk=asset_id) - ok, msg = test_admin_user_connectability_manual(asset) + ok, msg = test_asset_connectability_manual(asset) if ok: return Response({"msg": "pong"}) else: @@ -255,7 +255,7 @@ class AdminUserTestConnectiveApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): admin_user = self.get_object() - test_admin_user_connectability.delay(admin_user, force=True) + test_admin_user_connectability_util.delay(admin_user, force=True) return Response({"msg": "Task created"}) @@ -268,7 +268,7 @@ class SystemUserPushApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): system_user = self.get_object() - push_system_user_to_cluster_assets.delay(system_user, force=True) + push_system_user_to_cluster_assets_manual.delay(system_user, force=True) return Response({"msg": "Task created"}) @@ -281,5 +281,5 @@ class SystemUserTestConnectiveApi(generics.RetrieveAPIView): def retrieve(self, request, *args, **kwargs): system_user = self.get_object() - test_system_user_connectability.delay(system_user, force=True) + test_system_user_connectability_manual.delay(system_user, force=True) return Response({"msg": "Task created"}) diff --git a/apps/assets/const.py b/apps/assets/const.py index adad03aeb..5944e1124 100644 --- a/apps/assets/const.py +++ b/apps/assets/const.py @@ -2,14 +2,20 @@ # from django.utils.translation import ugettext as _ -PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" -PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD TASK") +# PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" +PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD: {}") +PUSH_SYSTEM_USER_MANUAL_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER MANUALLY: {}") PUSH_SYSTEM_USER_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER: {}") -PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}" +# PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}" +PUSH_SYSTEM_USER_ON_CHANGE_TASK_NAME = _("PUSH SYSTEM USER ON CHANGE: {}") +PUSH_SYSTEM_USER_ON_CREATE_TASK_NAME = _("PUSH SYSTEM USER ON CREATE: {}") +PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME = _("PUSH SYSTEM USERS ON ASSET CREAT: {}") UPDATE_ASSETS_HARDWARE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO') -UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY" +UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO MANUALLY') +UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO ON CREATE') +# UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY" UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO PERIOD') UPDATE_ASSETS_HARDWARE_TASKS = [ { @@ -20,10 +26,10 @@ UPDATE_ASSETS_HARDWARE_TASKS = [ } ] -TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY" -TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD TASK") +# TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY" +TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD: {}") +TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME = _("TEST ADMIN USER CONN MANUALLY: {}") TEST_ADMIN_USER_CONN_TASK_NAME = _("TEST ADMIN USER CONN: {}") -TEST_ADMIN_USER_CONN_LOCK_KEY = TEST_ADMIN_USER_CONN_TASK_NAME ADMIN_USER_CONN_CACHE_KEY = "ADMIN_USER_CONN_{}" TEST_ADMIN_USER_CONN_TASKS = [ { @@ -38,10 +44,8 @@ ASSET_ADMIN_CONN_CACHE_KEY = "ASSET_ADMIN_USER_CONN_{}" TEST_ASSET_CONN_TASK_NAME = _("ASSET CONN TEST MANUAL") TEST_SYSTEM_USER_CONN_PERIOD_LOCK_KEY = "TEST_SYSTEM_USER_CONN_PERIOD_KEY" -TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD TASK") -TEST_SYSTEM_USER_CONN_CACHE_KEY_PREFIX = "SYSTEM_USER_CONN_" -TEST_SYSTEM_USER_CONN_TASK_NAME = _("TEST SYSTEM USER CONN: {}") -TEST_SYSTEM_USER_CONN_LOCK_KEY = "TEST_SYSTEM_USER_CONN_{}" +TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD: {}") +TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME = _("TEST SYSTEM USER CONN MANUALLY: {}") SYSTEM_USER_CONN_CACHE_KEY = "SYSTEM_USER_CONN_{}" TEST_SYSTEM_USER_CONN_TASKS = [ { diff --git a/apps/assets/models/user.py b/apps/assets/models/user.py index 674ffa008..3e5e41b8d 100644 --- a/apps/assets/models/user.py +++ b/apps/assets/models/user.py @@ -13,13 +13,14 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from django.conf import settings -from common.utils import signer, ssh_key_string_to_obj, ssh_key_gen +from common.utils import get_signer, ssh_key_string_to_obj, ssh_key_gen from .utils import private_key_validator from ..const import SYSTEM_USER_CONN_CACHE_KEY __all__ = ['AdminUser', 'SystemUser',] logger = logging.getLogger(__name__) +signer = get_signer() class AssetUser(models.Model): diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index 64de5423f..e83357f68 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -8,9 +8,11 @@ from django.db.models.signals import post_save from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger +from common.celery import register_as_period_task, after_app_shutdown_clean, \ + after_app_ready_start, app as celery_app + from .models import SystemUser, AdminUser, Asset from . import const -from ops.decorators import register_as_period_task FORKS = 10 @@ -19,7 +21,18 @@ logger = get_logger(__file__) CACHE_MAX_TIME = 60*60*60 -def _update_asset_info(result_raw): +@shared_task +def update_assets_hardware_info(result, **kwargs): + """ + Using ops task run result, to update asset info + + @shared_task must be exit, because we using it as a task callback, is must + be a celery task also + :param result: + :param kwargs: {task_name: ""} + :return: + """ + result_raw = result[0] assets_updated = [] for hostname, info in result_raw['ok'].items(): if info: @@ -65,173 +78,240 @@ def _update_asset_info(result_raw): @shared_task -def update_assets_hardware_info(assets, task_name=None): +def update_assets_hardware_info_util(assets, task_name): """ Using ansible api to update asset hardware info :param assets: asset seq :param task_name: task_name running :return: result summary ['contacted': {}, 'dark': {}] """ - from ops.utils import create_or_update_ansible_task - if task_name is None: - task_name = const.UPDATE_ASSETS_HARDWARE_TASK_NAME + from ops.utils import update_or_create_ansible_task tasks = const.UPDATE_ASSETS_HARDWARE_TASKS hostname_list = [asset.hostname for asset in assets] - task = create_or_update_ansible_task( + task, _ = update_or_create_ansible_task( task_name, hosts=hostname_list, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) result = task.run() - summary, result_raw = result.results_summary, result.results_raw - # TOdo: may be somewhere using - assets_updated = _update_asset_info(result_raw) - return summary + # Todo: may be somewhere using + # Manual run callback function + assets_updated = update_assets_hardware_info(result) + return result @shared_task -@register_as_period_task(interval=60*60*60*24) -def update_assets_hardware_period(): +def update_assets_hardware_info_manual(assets): + task_name = const.UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME + return update_assets_hardware_info_util(assets, task_name) + + +@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") +def update_asset_info_on_created(sender, instance=None, created=False, **kwargs): + if instance and created: + msg = "Receive asset {} create signal, update asset hardware info".format( + instance + ) + logger.debug(msg) + task_name = const.UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME + update_assets_hardware_info_util.delay([instance], task_name) + + +@celery_app.task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean +def update_assets_hardware_info_period(): """ Update asset hardware period task :return: """ - from ops.utils import create_or_update_ansible_task + from ops.utils import update_or_create_ansible_task task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME - if cache.get(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1: - msg = "Task {} is running or before long, passed this time".format( - task_name - ) - logger.debug(msg) - return {} - # Todo: set cache but not update, because we want also set it to as a - # minimum update time too - cache.set(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) - assets = Asset.objects.filter(type__in=['Server', 'VM']) - return update_assets_hardware_info(assets, task_name=task_name) + hostname_list = [asset.hostname for asset in Asset.objects.all()] + tasks = const.UPDATE_ASSETS_HARDWARE_TASKS + # Only create, schedule by celery beat + _ = update_or_create_ansible_task( + task_name, hosts=hostname_list, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + interval=60*60*24, is_periodic=True, callback=update_assets_hardware_info.name, + ) + + +## ADMIN USER CONNECTIVE ## @shared_task -def test_admin_user_connectability(admin_user, force=False): - """ - Test asset admin user can connect or not. Using ansible api do that - :param admin_user: - :param force: Force update - :return: - """ - from ops.utils import create_or_update_ansible_task +def update_admin_user_connectability_info(result, **kwargs): + admin_user = kwargs.get("admin_user") + task_name = kwargs.get("task_name") + if admin_user is None and task_name is not None: + admin_user = task_name.split(":")[-1] - task_name = const.TEST_ADMIN_USER_CONN_TASK_NAME.format(admin_user.name) - lock_key = const.TEST_ADMIN_USER_CONN_LOCK_KEY.format(admin_user.name) + _, summary = result + cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user) + cache.set(cache_key, summary, CACHE_MAX_TIME) - if cache.get(lock_key, 0) == 1 and not force: - logger.debug("Task {} is running or before along, passed this time") - return {} - - assets = admin_user.get_related_assets() - hosts = [asset.hostname for asset in assets] - tasks = const.TEST_ADMIN_USER_CONN_TASKS - task = create_or_update_ansible_task( - task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', - options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', - ) - cache.set(lock_key, 1, CACHE_MAX_TIME) - result = task.run() - cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user.name) - cache.set(cache_key, result.results_summary, CACHE_MAX_TIME) - - for i in result.results_summary.get('contacted', []): + for i in summary.get('contacted', []): asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) cache.set(asset_conn_cache_key, 1, CACHE_MAX_TIME) - for i, msg in result.results_summary.get('dark', {}).items(): + for i, msg in summary.get('dark', {}).items(): asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) cache.set(asset_conn_cache_key, 0, CACHE_MAX_TIME) logger.error(msg) - return result.results_summary - @shared_task -def test_admin_user_connectability_period(): - if cache.get(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY) == 1: - msg = "{} task is running or before long, passed this time".format( - const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME - ) - logger.debug(msg) - return +def test_admin_user_connectability_util(admin_user, task_name): + """ + Test asset admin user can connect or not. Using ansible api do that + :param admin_user: + :param task_name: + :param force: Force update + :return: + """ + from ops.utils import update_or_create_ansible_task - logger.debug("Task {} start".format(const.TEST_ADMIN_USER_CONN_TASK_NAME)) - cache.set(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) - admin_users = AdminUser.objects.all() - for admin_user in admin_users: - test_admin_user_connectability(admin_user) - - -@shared_task -def test_admin_user_connectability_manual(asset, task_name=None): - from ops.utils import create_or_update_ansible_task - if task_name is None: - task_name = const.TEST_ASSET_CONN_TASK_NAME - hosts = [asset.hostname] + assets = admin_user.get_related_assets() + hosts = [asset.hostname for asset in assets] tasks = const.TEST_ADMIN_USER_CONN_TASKS - task = create_or_update_ansible_task( - task_name, tasks=tasks, hosts=hosts, run_as_admin=True, - created_by='System', options=const.TASK_OPTIONS, pattern='all', + task, created = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', ) result = task.run() + update_admin_user_connectability_info(result, admin_user=admin_user.name) + return result - if result.results_summary['dark']: + +@celery_app.task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean +def test_admin_user_connectability_period(): + """ + A period task that update the ansible task period + """ + from ops.utils import update_or_create_ansible_task + admin_users = AdminUser.objects.all() + for admin_user in admin_users: + task_name = const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME.format(admin_user.name) + assets = admin_user.get_related_assets() + hosts = [asset.hostname for asset in assets] + tasks = const.TEST_ADMIN_USER_CONN_TASKS + _ = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + interval=3600, is_periodic=True, + callback=update_admin_user_connectability_info.name, + ) + + +@shared_task +def test_admin_user_connectability_manual(admin_user): + task_name = const.TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME.format(admin_user.name) + return test_admin_user_connectability_util.delay(admin_user, task_name) + + +@shared_task +def test_asset_connectability_manual(asset): + from ops.utils import update_or_create_ansible_task + + task_name = const.TEST_ASSET_CONN_TASK_NAME + assets = [asset] + hosts = [asset.hostname for asset in assets] + tasks = const.TEST_ADMIN_USER_CONN_TASKS + task, created = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + ) + result = task.run() + summary = result[1] + if summary.get('dark'): cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, CACHE_MAX_TIME) - return False, result.results_summary['dark'] + return False, summary['dark'] else: cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, CACHE_MAX_TIME) return True, "" +@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") +def update_asset_conn_info_on_created(sender, instance=None, created=False, + **kwargs): + if instance and created: + task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance) + msg = "Receive asset {} create signal, test asset connectability".format( + instance + ) + logger.debug(msg) + test_asset_connectability_manual.delay(instance, task_name) + + +## System user connective ## + + @shared_task -def test_system_user_connectability(system_user, force=False): +def update_system_user_connectablity_info(result, **kwargs): + summary = result[1] + task_name = kwargs.get("task_name") + system_user = kwargs.get("system_user") + if system_user is None: + system_user = task_name.split(":")[-1] + cache_key = const.SYSTEM_USER_CONN_CACHE_KEY.format(system_user) + cache.set(cache_key, summary, CACHE_MAX_TIME) + + +@shared_task +def test_system_user_connectability_util(system_user, task_name): """ Test system cant connect his assets or not. :param system_user: - :param force + :param task_name: :return: """ - from ops.utils import create_or_update_ansible_task - lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY.format(system_user.name) - task_name = const.TEST_SYSTEM_USER_CONN_TASK_NAME.format(system_user.name) - if cache.get(lock_key, 0) == 1 and not force: - logger.debug("Task {} is running or before long, passed this time".format(task_name)) - return {} + from ops.utils import update_or_create_ansible_task assets = system_user.get_clusters_assets() hosts = [asset.hostname for asset in assets] tasks = const.TEST_SYSTEM_USER_CONN_TASKS - task = create_or_update_ansible_task( + task, created = update_or_create_ansible_task( task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as=system_user.name, created_by="System", ) - cache.set(lock_key, 1, CACHE_MAX_TIME) result = task.run() - cache_key = const.SYSTEM_USER_CONN_CACHE_KEY.format(system_user.name) - print("Set cache: {} {}".format(cache_key, result.results_summary)) - cache.set(cache_key, result.results_summary, CACHE_MAX_TIME) - return result.results_summary + update_system_user_connectablity_info(result, system_user=system_user.name) + return result @shared_task +def test_system_user_connectability_manual(system_user): + task_name = const.TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME.format(system_user.name) + return test_system_user_connectability_util(system_user, task_name) + + +@shared_task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean def test_system_user_connectability_period(): - lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY - if cache.get(lock_key) == 1: - logger.debug("{} task is running, passed this time".format( - const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME - )) - return + from ops.utils import update_or_create_ansible_task + system_users = SystemUser.objects.all() + for system_user in system_users: + task_name = const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME.format( + system_user.name + ) + assets = system_user.get_clusters_assets() + hosts = [asset.hostname for asset in assets] + tasks = const.TEST_SYSTEM_USER_CONN_TASKS + _ = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=False, run_as=system_user.name, + created_by='System', interval=3600, is_periodic=True, + callback=update_admin_user_connectability_info.name, + ) - logger.debug("Task {} start".format(const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME)) - cache.set(lock_key, 1, CACHE_MAX_TIME) - for system_user in SystemUser.objects.all(): - test_system_user_connectability(system_user) +#### Push system user tasks #### def get_push_system_user_tasks(system_user): tasks = [ @@ -271,75 +351,48 @@ def get_push_system_user_tasks(system_user): @shared_task -def push_system_user(system_user, assets, task_name=None): - from ops.utils import create_or_update_ansible_task +def push_system_user_util(system_user, task_name): + from ops.utils import update_or_create_ansible_task - if system_user.auto_push and assets: - if task_name is None: - task_name = 'PUSH-SYSTEM-USER-{}'.format(system_user.name) + tasks = get_push_system_user_tasks(system_user) + assets = system_user.get_clusters_assets() + hosts = [asset.hostname for asset in assets] + task, _ = update_or_create_ansible_task( + task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' + ) + return task.run() + +@shared_task +def push_system_user_to_cluster_assets_manual(system_user): + task_name = const.PUSH_SYSTEM_USER_MANUAL_TASK_NAME.format(system_user.name) + return push_system_user_util(system_user, task_name) + + +@shared_task +@register_as_period_task(interval=3600) +@after_app_ready_start +@after_app_shutdown_clean +def push_system_user_period(): + from ops.utils import update_or_create_ansible_task + + for system_user in SystemUser.objects.filter(auto_push=True): + assets = system_user.get_clusters_assets() + task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME.format(system_user.name) hosts = [asset.hostname for asset in assets] tasks = get_push_system_user_tasks(system_user) - task = create_or_update_ansible_task( + _ = update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', - options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' + options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', + interval=60*60*24, is_periodic=True, ) - result = task.run() - for i in result.results_summary.get('contacted'): - logger.debug("Push system user {} to {} [OK]".format( - system_user.name, i - )) - for i in result.results_summary.get('dark'): - logger.error("Push system user {} to {} [FAILED]".format( - system_user.name, i - )) - return result.results_summary - else: - msg = "Task {} does'nt execute, because auto_push " \ - "is not True, or not assets".format(task_name) - logger.debug(msg) - return {} @shared_task -def push_system_user_to_cluster_assets(system_user, force=False): - lock_key = const.PUSH_SYSTEM_USER_LOCK_KEY - task_name = const.PUSH_SYSTEM_USER_TASK_NAME.format(system_user.name) - if cache.get(lock_key, 0) == 1 and not force: - msg = "Task {} is running or before long, passed this time".format( - task_name - ) - logger.debug(msg) - return {} - - logger.debug("Task {} start".format(task_name)) - assets = system_user.get_clusters_assets() - summary = push_system_user(system_user, assets, task_name) - return summary - - -@shared_task -def push_system_user_period(): - task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME - if cache.get(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY) == 1: - msg = "Task {} is running or before long, passed this time".format( - task_name - ) - logger.debug(msg) - return - logger.debug("Task {} start".format(task_name)) - cache.set(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY, 1, timeout=CACHE_MAX_TIME) - - for system_user in SystemUser.objects.filter(auto_push=True): - push_system_user_to_cluster_assets(system_user) - - -@shared_task -def push_asset_system_users(asset, system_users=None, task_name=None): - from ops.utils import create_or_update_ansible_task - if task_name is None: - task_name = "PUSH-ASSET-SYSTEM-USER-{}".format(asset.hostname) +def push_asset_system_users_util(asset, task_name, system_users=None): + from ops.utils import update_or_create_ansible_task if system_users is None: system_users = asset.cluster.systemuser_set.all() @@ -350,81 +403,38 @@ def push_asset_system_users(asset, system_users=None, task_name=None): tasks.extend(get_push_system_user_tasks(system_user)) hosts = [asset.hostname] - - task = create_or_update_ansible_task( + task, _ = update_or_create_ansible_task( task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' ) - result = task.run() - return result.results_summary - - -@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def update_asset_info_when_created(sender, instance=None, created=False, **kwargs): - if instance and created: - msg = "Receive asset {} create signal, update asset hardware info".format( - instance - ) - logger.debug(msg) - task_name = "UPDATE-ASSET-HARDWARE-INFO-WHEN-CREATED" - update_assets_hardware_info.delay([instance], task_name) - - -@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") -def update_asset_conn_info_on_created(sender, instance=None, created=False, **kwargs): - if instance and created: - task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance) - msg = "Receive asset {} create signal, test asset connectability".format( - instance - ) - logger.debug(msg) - test_admin_user_connectability_manual.delay(instance, task_name) + return task.run() @receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") def push_system_user_on_created(sender, instance=None, created=False, **kwargs): if instance and created: - task_name = 'PUSH-SYSTEM-USER-WHEN-ASSET-CREATED-{}'.format(instance) + task_name = const.PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME system_users = instance.cluster.systemuser_set.all() msg = "Receive asset {} create signal, push system users".format( instance ) logger.debug(msg) - push_asset_system_users.delay(instance, system_users, task_name=task_name) + push_asset_system_users_util.delay(instance, system_users, task_name=task_name) @receiver(post_save, sender=SystemUser) -def push_system_user_on_auth_change(sender, instance=None, update_fields=None, **kwargs): - fields_check = {'_password', '_private_key', '_public_key'} - auth_changed = update_fields & fields_check if update_fields else None - if instance and instance.auto_push and auth_changed: - logger.debug("System user `{}` auth changed, push it".format(instance.name)) - task_name = "PUSH-SYSTEM-USER-ON-CREATED-{}".format(instance.name) - push_system_user_to_cluster_assets.delay(instance, task_name) +def push_system_user_on_change(sender, instance=None, update_fields=None, **kwargs): + if instance and instance.auto_push: + logger.debug("System user `{}` changed, push it".format(instance.name)) + task_name = "PUSH SYSTEM USER ON CREATED: {}".format(instance.name) + push_system_user_util.delay(instance, task_name) + + + + + -periodic_tasks = ( - { - 'update_assets_hardware_period': { - 'task': 'assets.tasks.update_assets_hardware_period', - 'schedule': 60*60*60*24, - 'args': (), - }, - 'test-admin-user-connectability_period': { - 'task': 'assets.tasks.test_admin_user_connectability_period', - 'schedule': 60*60*60, - 'args': (), - }, - 'push_system_user_period': { - 'task': 'assets.tasks.push_system_user_period', - 'schedule': 60*60*60*24, - 'args': (), - } - } -) -def initial_periodic_tasks(): - from ops.utils import create_periodic_tasks - create_periodic_tasks(periodic_tasks) diff --git a/apps/assets/templates/assets/asset_modal_list.html b/apps/assets/templates/assets/asset_modal_list.html index bd0147fb6..ee82d530e 100644 --- a/apps/assets/templates/assets/asset_modal_list.html +++ b/apps/assets/templates/assets/asset_modal_list.html @@ -49,9 +49,6 @@ $(document).ready(function(){ "aaSorting": [[2, "asc"]], "aoColumnDefs": [ { "bSortable": false, "aTargets": [ 0 ] }], "bAutoWidth": false, - "language": { - "url": "/static/js/plugins/dataTables/i18n/zh-hans.json" - }, columns: [ {data: "checkbox"}, {data: "id"}, diff --git a/apps/assets/views/asset.py b/apps/assets/views/asset.py index 431b643e2..6cf917c0a 100644 --- a/apps/assets/views/asset.py +++ b/apps/assets/views/asset.py @@ -28,7 +28,7 @@ from common.utils import get_object_or_none, get_logger, is_uuid from .. import forms from ..models import Asset, AssetGroup, AdminUser, Cluster, SystemUser from ..hands import AdminUserRequiredMixin -from ..tasks import update_assets_hardware_info +from ..tasks import update_assets_hardware_info_util __all__ = [ @@ -314,10 +314,6 @@ class BulkImportAssetView(AdminUserRequiredMixin, JSONResponseMixin, FormView): except Exception as e: failed.append('%s: %s' % (asset_dict['hostname'], str(e))) - if assets: - update_assets_hardware_info.delay([asset._to_secret_json() for asset in assets]) - - data = { 'created': created, 'created_info': 'Created {}'.format(len(created)), diff --git a/apps/common/celery.py b/apps/common/celery.py index 6a08e1bd6..d05a36c25 100644 --- a/apps/common/celery.py +++ b/apps/common/celery.py @@ -1,8 +1,15 @@ # ~*~ coding: utf-8 ~*~ import os +import json +from functools import wraps -from celery import Celery +from celery import Celery, subtask +from celery.signals import worker_ready, worker_shutdown + +from .utils import get_logger + +logger = get_logger(__file__) # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings') @@ -15,3 +22,167 @@ app = Celery('jumpserver') # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) + + +def create_or_update_celery_periodic_tasks(tasks): + from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule + """ + :param tasks: { + 'add-every-monday-morning': { + 'task': 'tasks.add' # A registered celery task, + 'interval': 30, + 'crontab': "30 7 * * *", + 'args': (16, 16), + 'kwargs': {}, + 'enabled': False, + }, + } + :return: + """ + # Todo: check task valid, task and callback must be a celery task + for name, detail in tasks.items(): + interval = None + crontab = None + if isinstance(detail.get("interval"), int): + intervals = IntervalSchedule.objects.filter( + every=detail["interval"], period=IntervalSchedule.SECONDS + ) + if intervals: + interval = intervals[0] + else: + interval = IntervalSchedule.objects.create( + every=detail['interval'], + period=IntervalSchedule.SECONDS, + ) + elif isinstance(detail.get("crontab"), str): + try: + minute, hour, day, month, week = detail["crontab"].split() + except ValueError: + raise SyntaxError("crontab is not valid") + kwargs = dict( + minute=minute, hour=hour, day_of_week=week, + day_of_month=day, month_of_year=month, + ) + contabs = CrontabSchedule.objects.filter( + **kwargs + ) + if contabs: + crontab = contabs[0] + else: + crontab = CrontabSchedule.objects.create(**kwargs) + else: + raise SyntaxError("Schedule is not valid") + + defaults = dict( + interval=interval, + crontab=crontab, + name=name, + task=detail['task'], + args=json.dumps(detail.get('args', [])), + kwargs=json.dumps(detail.get('kwargs', {})), + enabled=detail.get('enabled', True), + ) + + task = PeriodicTask.objects.update_or_create( + defaults=defaults, name=name, + ) + return task + + +def disable_celery_periodic_task(task_name): + from django_celery_beat.models import PeriodicTask + PeriodicTask.objects.filter(name=task_name).update(enabled=False) + + +def delete_celery_periodic_task(task_name): + from django_celery_beat.models import PeriodicTask + PeriodicTask.objects.filter(name=task_name).delete() + + +__REGISTER_PERIODIC_TASKS = [] +__AFTER_APP_SHUTDOWN_CLEAN_TASKS = [] +__AFTER_APP_READY_RUN_TASKS = [] + + +def register_as_period_task(crontab=None, interval=None): + """ + Warning: Task must be have not any args and kwargs + :param crontab: "* * * * *" + :param interval: 60*60*60 + :return: + """ + if crontab is None and interval is None: + raise SyntaxError("Must set crontab or interval one") + + def decorate(func): + if crontab is None and interval is None: + raise SyntaxError("Interval and crontab must set one") + + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in __REGISTER_PERIODIC_TASKS: + create_or_update_celery_periodic_tasks({ + name: { + 'task': name, + 'interval': interval, + 'crontab': crontab, + 'args': (), + 'enabled': True, + } + }) + __REGISTER_PERIODIC_TASKS.append(name) + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + return wrapper + return decorate + + +def after_app_ready_start(func): + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in __AFTER_APP_READY_RUN_TASKS: + __AFTER_APP_READY_RUN_TASKS.append(name) + + @wraps(func) + def decorate(*args, **kwargs): + return func(*args, **kwargs) + + return decorate + + +def after_app_shutdown_clean(func): + # Because when this decorator run, the task was not created, + # So we can't use func.name + name = '{func.__module__}.{func.__name__}'.format(func=func) + if name not in __AFTER_APP_READY_RUN_TASKS: + __AFTER_APP_SHUTDOWN_CLEAN_TASKS.append(name) + + @wraps(func) + def decorate(*args, **kwargs): + return func(*args, **kwargs) + + return decorate + + +@worker_ready.connect +def on_app_ready(sender=None, headers=None, body=None, **kwargs): + logger.debug("App ready signal recv") + logger.debug("Start need start task: [{}]".format( + ", ".join(__AFTER_APP_READY_RUN_TASKS)) + ) + for task in __AFTER_APP_READY_RUN_TASKS: + subtask(task).delay() + + +@worker_shutdown.connect +def after_app_shutdown(sender=None, headers=None, body=None, **kwargs): + from django_celery_beat.models import PeriodicTask + logger.debug("App shutdown signal recv") + logger.debug("Clean need cleaned period tasks: [{}]".format( + ', '.join(__AFTER_APP_SHUTDOWN_CLEAN_TASKS)) + ) + PeriodicTask.objects.filter(name__in=__AFTER_APP_SHUTDOWN_CLEAN_TASKS).delete() diff --git a/apps/common/tasks.py b/apps/common/tasks.py index 4e6e33fc4..e8d6ba8b0 100644 --- a/apps/common/tasks.py +++ b/apps/common/tasks.py @@ -1,6 +1,6 @@ from django.core.mail import send_mail from django.conf import settings -from common import celery_app as app +from .celery import app from .utils import get_logger diff --git a/apps/common/utils.py b/apps/common/utils.py index f1edce12e..9af801c1c 100644 --- a/apps/common/utils.py +++ b/apps/common/utils.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- # -import json import re from collections import OrderedDict from six import string_types import base64 import os from itertools import chain -import string import logging import datetime import time @@ -27,9 +25,6 @@ from django.conf import settings from django.utils import timezone -from .compat import to_bytes, to_string - -SECRET_KEY = settings.SECRET_KEY UUID_PATTERN = re.compile(r'[0-9a-zA-Z\-]{36}') @@ -51,9 +46,22 @@ def get_object_or_none(model, **kwargs): return obj -class Signer(object): +class Singleton(type): + def __init__(cls, *args, **kwargs): + cls.__instance = None + super().__init__(*args, **kwargs) + + def __call__(cls, *args, **kwargs): + if cls.__instance is None: + cls.__instance = super().__call__(*args, **kwargs) + return cls.__instance + else: + return cls.__instance + + +class Signer(metaclass=Singleton): """用来加密,解密,和基于时间戳的方式验证token""" - def __init__(self, secret_key=SECRET_KEY): + def __init__(self, secret_key=None): self.secret_key = secret_key def sign(self, value): @@ -100,58 +108,10 @@ def combine_seq(s1, s2, callback=None): return seq -def search_object_attr(obj, value='', attr_list=None, ignore_case=False): - """It's provide a method to search a object attribute equal some value - - If object some attribute equal :param: value, return True else return False - - class A(): - name = 'admin' - age = 7 - - :param obj: A object - :param value: A string match object attribute - :param attr_list: Only match attribute in attr_list - :param ignore_case: Ignore case - :return: Boolean - """ - if value == '': - return True - - try: - object_attr = obj.__dict__ - except AttributeError: - return False - - if attr_list is not None: - new_object_attr = {} - for attr in attr_list: - new_object_attr[attr] = object_attr.pop(attr) - object_attr = new_object_attr - - if ignore_case: - if not isinstance(value, string_types): - return False - - if value.lower() in map(string.lower, map(str, object_attr.values())): - return True - else: - if value in object_attr.values(): - return True - return False - - def get_logger(name=None): return logging.getLogger('jumpserver.%s' % name) -def int_seq(seq): - try: - return map(int, seq) - except ValueError: - return seq - - def timesince(dt, since='', default="just now"): """ Returns string representing "time since" e.g. @@ -391,4 +351,6 @@ def is_uuid(s): return False -signer = Signer() +def get_signer(): + signer = Signer(settings.SECRET_KEY) + return signer diff --git a/apps/jumpserver/settings.py b/apps/jumpserver/settings.py index e17ce5d99..049ca6f18 100644 --- a/apps/jumpserver/settings.py +++ b/apps/jumpserver/settings.py @@ -337,8 +337,9 @@ CELERY_ACCEPT_CONTENT = ['json', 'pickle'] CELERY_RESULT_EXPIRES = 3600 CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' +CELERY_TASK_EAGER_PROPAGATES = True CELERY_TIMEZONE = TIME_ZONE -# TERMINAL_HEATBEAT_INTERVAL = CONFIG.TERMINAL_HEATBEAT_INTERVAL or 30 +# CELERY_ENABLE_UTC = True # Cache use redis diff --git a/apps/ops/decorators.py b/apps/ops/decorators.py deleted file mode 100644 index 88b96c6d7..000000000 --- a/apps/ops/decorators.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# -from functools import wraps - - -TASK_PREFIX = "TOOT" -CALLBACK_PREFIX = "COC" - - -def register_as_period_task(crontab=None, interval=None): - """ - :param crontab: "* * * * *" - :param interval: 60*60*60 - :return: - """ - from .utils import create_or_update_celery_periodic_tasks - if crontab is None and interval is None: - raise SyntaxError("Must set crontab or interval one") - - def decorate(func): - @wraps(func) - def wrapper(*args, **kwargs): - tasks = { - func.__name__: { - 'task': func.__name__, - 'args': args, - 'kwargs': kwargs, - 'interval': interval, - 'crontab': crontab, - 'enabled': True, - } - } - create_or_update_celery_periodic_tasks(tasks) - return func(*args, **kwargs) - return wrapper - return decorate - - diff --git a/apps/ops/models.py b/apps/ops/models.py index 67fd7e052..04dc9aa0a 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -9,7 +9,9 @@ from django.utils import timezone from django.utils.translation import ugettext_lazy as _ from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask -from common.utils import signer, get_logger +from common.utils import get_signer, get_logger +from common.celery import delete_celery_periodic_task, create_or_update_celery_periodic_tasks, \ + disable_celery_periodic_task from .ansible import AdHocRunner, AnsibleError from .inventory import JMSInventory @@ -17,6 +19,7 @@ __all__ = ["Task", "AdHoc", "AdHocRunHistory"] logger = get_logger(__file__) +signer = get_signer() class Task(models.Model): @@ -82,8 +85,6 @@ class Task(models.Model): def save(self, force_insert=False, force_update=False, using=None, update_fields=None): - from .utils import create_or_update_celery_periodic_tasks, \ - disable_celery_periodic_task from .tasks import run_ansible_task super().save( force_insert=force_insert, force_update=force_update, @@ -114,7 +115,6 @@ class Task(models.Model): disable_celery_periodic_task(self.name) def delete(self, using=None, keep_parents=False): - from .utils import delete_celery_periodic_task super().delete(using=using, keep_parents=keep_parents) delete_celery_periodic_task(self.name) @@ -246,7 +246,7 @@ class AdHoc(models.Model): } :return: """ - self._become = signer.sign(json.dumps(item)) + self._become = signer.sign(json.dumps(item)).decode('utf-8') @property def options(self): @@ -271,6 +271,11 @@ class AdHoc(models.Model): except AdHocRunHistory.DoesNotExist: return None + def save(self, force_insert=False, force_update=False, using=None, + update_fields=None): + super().save(force_insert=force_insert, force_update=force_update, + using=using, update_fields=update_fields) + def __str__(self): return "{} of {}".format(self.task.name, self.short_id) diff --git a/apps/ops/tasks.py b/apps/ops/tasks.py index be891919d..41f60f20c 100644 --- a/apps/ops/tasks.py +++ b/apps/ops/tasks.py @@ -21,9 +21,9 @@ def run_ansible_task(task_id, callback=None, **kwargs): task = get_object_or_none(Task, id=task_id) if task: - result = task.object.run() + result = task.run() if callback is not None: - subtask(callback).delay(result) + subtask(callback).delay(result, task_name=task.name) return result else: logger.error("No task found") diff --git a/apps/ops/templates/ops/task_list.html b/apps/ops/templates/ops/task_list.html index 503f1046a..161a48ace 100644 --- a/apps/ops/templates/ops/task_list.html +++ b/apps/ops/templates/ops/task_list.html @@ -57,14 +57,20 @@ {{ object.adhoc.all | length}} {{ object.latest_adhoc.hosts | length}} - {% if object.latest_history.is_success %} - - {% else %} - + {% if object.latest_history %} + {% if object.latest_history.is_success %} + + {% else %} + + {% endif %} {% endif %} {{ object.latest_history.date_start }} - {{ object.latest_history.timedelta|floatformat }} s + + {% if object.latest_history %} + {{ object.latest_history.timedelta|floatformat }} s + {% endif %} + {% trans "Run" %} {% trans "Delete" %} diff --git a/apps/ops/utils.py b/apps/ops/utils.py index 55b5761b6..0a9dce8c9 100644 --- a/apps/ops/utils.py +++ b/apps/ops/utils.py @@ -1,8 +1,4 @@ # ~*~ coding: utf-8 ~*~ -import json -from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule - - from common.utils import get_logger, get_object_or_none from .models import Task, AdHoc @@ -13,22 +9,27 @@ def get_task_by_id(task_id): return get_object_or_none(Task, id=task_id) -def create_or_update_ansible_task( - task_name, hosts, tasks, pattern='all', options=None, +def update_or_create_ansible_task( + task_name, hosts, tasks, + interval=None, crontab=None, is_periodic=False, + callback=None, pattern='all', options=None, run_as_admin=False, run_as="", become_info=None, - created_by=None, interval=None, crontab=None, - is_periodic=False, callback=None, + created_by=None, ): - task = get_object_or_none(Task, name=task_name) + defaults = { + 'name': task_name, + 'interval': interval, + 'crontab': crontab, + 'is_periodic': is_periodic, + 'callback': callback, + 'created_by': created_by, + } - if task is None: - task = Task( - name=task_name, interval=interval, - crontab=crontab, is_periodic=is_periodic, - callback=callback, created_by=created_by - ) - task.save() + created = False + task, _ = Task.objects.update_or_create( + defaults=defaults, name=task_name, + ) adhoc = task.latest_adhoc new_adhoc = AdHoc(task=task, pattern=pattern, @@ -38,70 +39,13 @@ def create_or_update_ansible_task( new_adhoc.tasks = tasks new_adhoc.options = options new_adhoc.become = become_info + if not adhoc or adhoc != new_adhoc: + logger.debug("Task create new adhoc: {}".format(task_name)) new_adhoc.save() task.latest_adhoc = new_adhoc - return task + created = True + return task, created -def create_or_update_celery_periodic_tasks(tasks): - """ - :param tasks: { - 'add-every-monday-morning': { - 'task': 'tasks.add' # A registered celery task, - 'interval': 30, - 'crontab': "30 7 * * *", - 'args': (16, 16), - 'kwargs': {}, - 'enabled': False, - }, - } - :return: - """ - # Todo: check task valid, task and callback must be a celery task - for name, detail in tasks.items(): - interval = None - crontab = None - if isinstance(detail.get("interval"), int): - interval, _ = IntervalSchedule.objects.get_or_create( - every=detail['interval'], - period=IntervalSchedule.SECONDS, - ) - elif isinstance(detail.get("crontab"), str): - try: - minute, hour, day, month, week = detail["crontab"].split() - except ValueError: - raise SyntaxError("crontab is not valid") - - crontab, _ = CrontabSchedule.objects.get_or_create( - minute=minute, hour=hour, day_of_week=week, - day_of_month=day, month_of_year=month, - ) - else: - raise SyntaxError("Schedule is not valid") - - defaults = dict( - interval=interval, - crontab=crontab, - name=name, - task=detail['task'], - args=json.dumps(detail.get('args', [])), - kwargs=json.dumps(detail.get('kwargs', {})), - enabled=detail['enabled'] - ) - - task = PeriodicTask.objects.update_or_create( - defaults=defaults, name=name, - ) - logger.info("Create periodic task: {}".format(task)) - return task - - -def disable_celery_periodic_task(task_name): - PeriodicTask.objects.filter(name=task_name).update(enabled=False) - - -def delete_celery_periodic_task(task_name): - PeriodicTask.objects.filter(name=task_name).delete() - diff --git a/apps/static/js/jumpserver.js b/apps/static/js/jumpserver.js index 04b081dcb..6992f06e9 100644 --- a/apps/static/js/jumpserver.js +++ b/apps/static/js/jumpserver.js @@ -262,9 +262,6 @@ jumpserver.initDataTable = function (options) { var table = ele.DataTable({ pageLength: options.pageLength || 15, dom: options.dom || '<"#uc.pull-left">flt<"row m-t"<"col-md-8"<"#op.col-md-6"><"col-md-6 text-center"i>><"col-md-4"p>>', - language: { - url: options.i18n_url || "/static/js/plugins/dataTables/i18n/zh-hans.json" - }, order: options.order || [], select: options.select || 'multi', buttons: [], diff --git a/apps/templates/_left_side_bar.html b/apps/templates/_left_side_bar.html index 04aa89c3c..1e0392325 100644 --- a/apps/templates/_left_side_bar.html +++ b/apps/templates/_left_side_bar.html @@ -2,7 +2,7 @@