[Update] 修改task定时运行机制

pull/828/merge
ibuler 2017-12-22 02:08:29 +08:00
parent 47040a61c8
commit 4893c4664d
13 changed files with 174 additions and 53 deletions

View File

@ -2,6 +2,4 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
__version__ = "0.5.0"
if __name__ == '__main__':
pass

View File

@ -8,10 +8,8 @@ from django.db.models.signals import post_save
from common.utils import get_object_or_none, capacity_convert, \ from common.utils import get_object_or_none, capacity_convert, \
sum_capacity, encrypt_password, get_logger sum_capacity, encrypt_password, get_logger
from common.celery import app as celery_app
from .models import SystemUser, AdminUser, Asset from .models import SystemUser, AdminUser, Asset
from . import const from . import const
from .signals import on_app_ready
FORKS = 10 FORKS = 10
@ -402,22 +400,28 @@ def push_system_user_on_auth_change(sender, instance=None, update_fields=None, *
push_system_user_to_cluster_assets.delay(instance, task_name) push_system_user_to_cluster_assets.delay(instance, task_name)
celery_app.conf['CELERYBEAT_SCHEDULE'].update( periodic_tasks = (
{ {
'update_assets_hardware_period': { 'update_assets_hardware_period': {
'task': 'assets.tasks.update_assets_hardware_period', 'task': 'assets.tasks.update_assets_hardware_period',
'schedule': 60*60*24, 'schedule': 60*60*60*24,
'args': (), 'args': (),
}, },
'test-admin-user-connectability_period': { 'test-admin-user-connectability_period': {
'task': 'assets.tasks.test_admin_user_connectability_period', 'task': 'assets.tasks.test_admin_user_connectability_period',
'schedule': 60*60, 'schedule': 60*60*60,
'args': (), 'args': (),
}, },
'push_system_user_period': { 'push_system_user_period': {
'task': 'assets.tasks.push_system_user_period', 'task': 'assets.tasks.push_system_user_period',
'schedule': 60*60, 'schedule': 60*60*60*24,
'args': (), 'args': (),
} }
} }
) )
def initial_periodic_tasks():
from ops.utils import create_periodic_tasks
create_periodic_tasks(periodic_tasks)

View File

@ -7,7 +7,6 @@ from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
class NoDeleteQuerySet(models.query.QuerySet): class NoDeleteQuerySet(models.query.QuerySet):
def delete(self): def delete(self):

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import json
import re import re
from collections import OrderedDict from collections import OrderedDict
from six import string_types from six import string_types

View File

@ -27,9 +27,7 @@ sys.path.append(PROJECT_DIR)
# Import project config setting # Import project config setting
try: try:
from config import config as env_config, env from config import config as CONFIG
CONFIG = env_config.get(env, 'default')()
except ImportError: except ImportError:
CONFIG = type('_', (), {'__getattr__': lambda arg1, arg2: None})() CONFIG = type('_', (), {'__getattr__': lambda arg1, arg2: None})()
@ -66,12 +64,12 @@ INSTALLED_APPS = [
'django_filters', 'django_filters',
'bootstrap3', 'bootstrap3',
'captcha', 'captcha',
'django_celery_beat',
'django.contrib.auth', 'django.contrib.auth',
'django.contrib.contenttypes', 'django.contrib.contenttypes',
'django.contrib.sessions', 'django.contrib.sessions',
'django.contrib.messages', 'django.contrib.messages',
'django.contrib.staticfiles', 'django.contrib.staticfiles',
] ]
MIDDLEWARE = [ MIDDLEWARE = [

View File

@ -3,4 +3,4 @@
from .callback import * from .callback import *
from .inventory import * from .inventory import *
from .runner import * from .runner import *
from .exceptions import *

View File

@ -1,6 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
__all__ = [
'AnsibleError'
]
class AnsibleError(Exception): class AnsibleError(Exception):
pass pass

View File

@ -4,10 +4,15 @@ import logging
import json import json
import uuid import uuid
import time
from django.db import models from django.db import models
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core import serializers
from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask
from common.utils import signer from common.utils import signer
from .ansible import AdHocRunner, AnsibleError
__all__ = ["Task", "AdHoc", "AdHocRunHistory"] __all__ = ["Task", "AdHoc", "AdHocRunHistory"]
@ -22,7 +27,17 @@ class Task(models.Model):
""" """
id = models.UUIDField(default=uuid.uuid4, primary_key=True) id = models.UUIDField(default=uuid.uuid4, primary_key=True)
name = models.CharField(max_length=128, unique=True, verbose_name=_('Name')) name = models.CharField(max_length=128, unique=True, verbose_name=_('Name'))
interval = models.ForeignKey(
IntervalSchedule, on_delete=models.CASCADE,
null=True, blank=True, verbose_name=_('Interval'),
)
crontab = models.ForeignKey(
CrontabSchedule, on_delete=models.CASCADE, null=True, blank=True,
verbose_name=_('Crontab'), help_text=_('Use one of Interval/Crontab'),
)
is_periodic = models.BooleanField(default=False)
is_deleted = models.BooleanField(default=False) is_deleted = models.BooleanField(default=False)
comment = models.TextField(blank=True, verbose_name=_("Comment"))
created_by = models.CharField(max_length=128, blank=True, null=True, default='') created_by = models.CharField(max_length=128, blank=True, null=True, default='')
date_created = models.DateTimeField(auto_now_add=True) date_created = models.DateTimeField(auto_now_add=True)
__latest_adhoc = None __latest_adhoc = None
@ -65,12 +80,32 @@ class Task(models.Model):
def get_run_history(self): def get_run_history(self):
return self.history.all() return self.history.all()
def run(self): def run(self, record=True):
if self.latest_adhoc: if self.latest_adhoc:
return self.latest_adhoc.run() return self.latest_adhoc.run(record=record)
else: else:
return {'error': 'No adhoc'} return {'error': 'No adhoc'}
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
instance = super().save(
force_insert=force_insert, force_update=force_update,
using=using, update_fields=update_fields,
)
if instance.is_periodic:
PeriodicTask.objects.update_or_create(
interval=instance.interval,
crontab=instance.crontab,
name=self.name,
task='ops.run_task',
args=serializers.serialize('json', [instance]),
)
else:
PeriodicTask.objects.filter(name=self.name).delete()
return instance
def __str__(self): def __str__(self):
return self.name return self.name
@ -128,9 +163,42 @@ class AdHoc(models.Model):
else: else:
return {} return {}
def run(self): def run(self, record=True):
from .utils import run_adhoc_object if record:
return run_adhoc_object(self, **self.options) return self._run_and_record()
else:
return self._run_only()
def _run_and_record(self):
history = AdHocRunHistory(adhoc=self, task=self.task)
time_start = time.time()
try:
result = self._run_only()
history.is_finished = True
if result.results_summary.get('dark'):
history.is_success = False
else:
history.is_success = True
history.result = result.results_raw
history.summary = result.results_summary
return result
finally:
history.date_finished = timezone.now()
history.timedelta = time.time() - time_start
history.save()
def _run_only(self):
from .utils import get_adhoc_inventory
inventory = get_adhoc_inventory(self)
runner = AdHocRunner(inventory)
for k, v in self.options.items():
runner.set_option(k, v)
try:
result = runner.run(self.tasks, self.pattern, self.task.name)
return result
except AnsibleError as e:
logger.error("Failed run adhoc {}, {}".format(self.task.name, e))
@become.setter @become.setter
def become(self, item): def become(self, item):

View File

@ -1,7 +1,6 @@
# coding: utf-8 # coding: utf-8
from celery import shared_task from celery import shared_task
from django.core import serializers
from .utils import run_adhoc
def rerun_task(): def rerun_task():
@ -9,5 +8,6 @@ def rerun_task():
@shared_task @shared_task
def run_add_hoc_and_record_async(adhoc, **options): def run_task(tasks_json):
return run_adhoc(adhoc, **options) for task in serializers.deserialize('json', tasks_json):
task.object.run()

View File

@ -3,6 +3,7 @@
import time import time
from django.utils import timezone from django.utils import timezone
from django.db import transaction from django.db import transaction
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from common.utils import get_logger, get_object_or_none, get_short_uuid_str from common.utils import get_logger, get_object_or_none, get_short_uuid_str
from .ansible import AdHocRunner, CommandResultCallback from .ansible import AdHocRunner, CommandResultCallback
@ -131,4 +132,19 @@ def create_or_update_task(
return task return task
def create_periodic_tasks(tasks):
for name, detail in tasks.items():
schedule, _ = IntervalSchedule.objects.get_or_create(
every=detail['schedule'],
period=IntervalSchedule.SECONDS,
)
task = PeriodicTask.objects.create(
interval=schedule,
name=name,
task=detail['task'],
args=json.dumps(detail.get('args', [])),
kwargs=json.dumps(detail.get('kwargs', {})),
)
print("Create periodic task: {}".format(task))

View File

@ -4,7 +4,7 @@
Jumpserver project setting file Jumpserver project setting file
:copyright: (c) 2014-2016 by Jumpserver Team. :copyright: (c) 2014-2017 by Jumpserver Team
:license: GPL v2, see LICENSE for more details. :license: GPL v2, see LICENSE for more details.
""" """
import os import os
@ -50,6 +50,11 @@ class Config:
# DB_PASSWORD = '' # DB_PASSWORD = ''
# DB_NAME = 'jumpserver' # DB_NAME = 'jumpserver'
# When Django start it will bind this host and port
# ./manage.py runserver 127.0.0.1:8080
HTTP_BIND_HOST = '0.0.0.0'
HTTP_LISTEN_PORT = 8080
# Use Redis as broker for celery and web socket # Use Redis as broker for celery and web socket
REDIS_HOST = '127.0.0.1' REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379 REDIS_PORT = 6379
@ -101,8 +106,18 @@ class Config:
return None return None
config = { class DevelopmentConfig(Config):
'default': Config, pass
}
class TestConfig(Config):
pass
class ProductionConfig(Config):
pass
# Default using Config settings, you can write if/else for different env
config = Config()
env = 'default'

View File

@ -57,3 +57,5 @@ uritemplate==3.0.0
urllib3==1.22 urllib3==1.22
vine==1.1.4 vine==1.1.4
gunicorn==19.7.1 gunicorn==19.7.1
django_celery_beat==1.1.0
ephem==3.7.6.0

View File

@ -1,48 +1,64 @@
#!/usr/bin/env python #!/usr/bin/env python
# ~*~ coding: utf-8 ~*~
from threading import Thread
import os import os
import subprocess import subprocess
import time
from threading import Thread
from apps import __version__
try: try:
from config import config as env_config, env from config import config as CONFIG
CONFIG = env_config.get(env, 'default')()
except ImportError: except ImportError:
CONFIG = type('_', (), {'__getattr__': None})() CONFIG = type('_', (), {'__getattr__': None})()
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
APPS_DIR = os.path.join(BASE_DIR, 'apps')
apps_dir = os.path.join(BASE_DIR, 'apps') HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1'
HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080
LOG_LEVEL = CONFIG.LOG_LEVEL
WORKERS = 4
def start_django(): def start_gunicorn():
http_host = CONFIG.HTTP_BIND_HOST or '127.0.0.1' print("- Start Gunicorn WSGI HTTP Server")
http_port = CONFIG.HTTP_LISTEN_PORT or '8080' os.chdir(APPS_DIR)
os.chdir(apps_dir) cmd = "gunicorn jumpserver.wsgi -b {}:{} -w {}".format(HTTP_HOST, HTTP_PORT, WORKERS)
print('start django') subprocess.call(cmd, shell=True)
subprocess.call('python ./manage.py runserver %s:%s' % (http_host, http_port), shell=True)
def start_celery(): def start_celery():
os.chdir(apps_dir) print("- Start Celery as Distributed Task Queue")
os.environ.setdefault('C_FORCE_ROOT', '1') os.chdir(APPS_DIR)
os.environ.setdefault('PYTHONOPTIMIZE', '1') # os.environ.setdefault('PYTHONOPTIMIZE', '1')
print('start celery') cmd = 'celery -A common worker -l {}'.format(LOG_LEVEL.lower())
subprocess.call('celery -A common worker -B -s /tmp/celerybeat-schedule -l debug', shell=True) subprocess.call(cmd, shell=True)
def start_beat():
print("- Start Beat as Periodic Task Scheduler")
os.chdir(APPS_DIR)
# os.environ.setdefault('PYTHONOPTIMIZE', '1')
schduler = "django_celery_beat.schedulers:DatabaseScheduler"
cmd = 'celery -A common beat -l {} --scheduler {}'.format(LOG_LEVEL, schduler)
subprocess.call(cmd, shell=True)
def main(): def main():
t1 = Thread(target=start_django, args=()) print(time.ctime())
t2 = Thread(target=start_celery, args=()) print('Jumpserver version {}, more see https://www.jumpserver.org'.format(
__version__))
print('Quit the server with CONTROL-C.')
t1.start() threads = []
t2.start() for func in (start_gunicorn, start_celery, start_beat):
t = Thread(target=func, args=())
threads.append(t)
t.start()
t1.join() for t in threads:
t2.join() t.join()
if __name__ == '__main__': if __name__ == '__main__':