From 76f3f33c31b8e4729be974928de405f69dd787a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9B=B7=E4=BA=8C=E7=8C=9B?= Date: Fri, 29 Nov 2019 12:58:00 +0800 Subject: [PATCH] A api add schedule module --- spug_api/apps/schedule/__init__.py | 0 spug_api/apps/schedule/executors.py | 42 ++++++++++ .../management/commands/runscheduler.py | 10 +++ spug_api/apps/schedule/models.py | 47 +++++++++++ spug_api/apps/schedule/scheduler.py | 83 +++++++++++++++++++ spug_api/apps/schedule/urls.py | 7 ++ spug_api/apps/schedule/views.py | 69 +++++++++++++++ spug_api/requirements.txt | 1 + spug_api/spug/settings.py | 4 + spug_api/spug/urls.py | 1 + 10 files changed, 264 insertions(+) create mode 100644 spug_api/apps/schedule/__init__.py create mode 100644 spug_api/apps/schedule/executors.py create mode 100644 spug_api/apps/schedule/management/commands/runscheduler.py create mode 100644 spug_api/apps/schedule/models.py create mode 100644 spug_api/apps/schedule/scheduler.py create mode 100644 spug_api/apps/schedule/urls.py create mode 100644 spug_api/apps/schedule/views.py diff --git a/spug_api/apps/schedule/__init__.py b/spug_api/apps/schedule/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spug_api/apps/schedule/executors.py b/spug_api/apps/schedule/executors.py new file mode 100644 index 0000000..1b12207 --- /dev/null +++ b/spug_api/apps/schedule/executors.py @@ -0,0 +1,42 @@ +from queue import Queue +from threading import Thread +from libs.ssh import SSH +from apps.host.models import Host +from apps.setting.utils import AppSetting +import subprocess + + +def local_executor(q, command): + exit_code, out = -1, None + try: + task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + exit_code = task.wait() + out = task.stdout.read() + task.stderr.read() + finally: + q.put(('local', exit_code, out.decode())) + + +def host_executor(q, host, pkey, command): + exit_code, out = -1, None + try: + cli = SSH(host.hostname, host.port, host.username, pkey=pkey) + exit_code, out = cli.exec_command(command) + finally: + q.put((host.id, exit_code, out.decode())) + + +def dispatch(command, targets): + threads, pkey, q = [], AppSetting.get('private_key'), Queue() + for t in targets: + if t == 'local': + threads.append(Thread(target=local_executor, args=(q, command))) + elif isinstance(t, int): + host = Host.objects.filter(pk=t).first() + if not host: + raise ValueError(f'unknown host id: {t!r}') + threads.append(Thread(target=host_executor, args=(q, host, pkey, command))) + else: + raise ValueError(f'invalid target: {t!r}') + for t in threads: + t.start() + return [q.get() for _ in threads] diff --git a/spug_api/apps/schedule/management/commands/runscheduler.py b/spug_api/apps/schedule/management/commands/runscheduler.py new file mode 100644 index 0000000..216ae93 --- /dev/null +++ b/spug_api/apps/schedule/management/commands/runscheduler.py @@ -0,0 +1,10 @@ +from django.core.management.base import BaseCommand +from apps.schedule.scheduler import Scheduler + + +class Command(BaseCommand): + help = 'Start schedule process' + + def handle(self, *args, **options): + s = Scheduler() + s.run() diff --git a/spug_api/apps/schedule/models.py b/spug_api/apps/schedule/models.py new file mode 100644 index 0000000..e8db505 --- /dev/null +++ b/spug_api/apps/schedule/models.py @@ -0,0 +1,47 @@ +from django.db import models +from libs import ModelMixin, human_time +from apps.account.models import User +import json + + +class Task(models.Model, ModelMixin): + TRIGGERS = ( + ('date', '一次性'), + ('calendarinterval', '日历间隔'), + ('cron', 'UNIX cron'), + ('interval', '普通间隔') + ) + STATUS = ( + (0, '成功'), + (1, '异常'), + (2, '失败'), + ) + name = models.CharField(max_length=50) + type = models.CharField(max_length=50) + command = models.TextField() + targets = models.TextField() + trigger = models.CharField(max_length=20, choices=TRIGGERS) + trigger_args = models.CharField(max_length=255) + is_active = models.BooleanField(default=False) + desc = models.CharField(max_length=255, null=True) + latest_status = models.SmallIntegerField(choices=STATUS, null=True) + latest_run_time = models.CharField(max_length=20, null=True) + latest_output = models.TextField(null=True) + + created_at = models.CharField(max_length=20, default=human_time) + created_by = models.ForeignKey(User, models.PROTECT, related_name='+') + updated_at = models.CharField(max_length=20, null=True) + updated_by = models.ForeignKey(User, models.PROTECT, related_name='+', null=True) + + def to_dict(self, *args, **kwargs): + tmp = super().to_dict(*args, **kwargs) + tmp['targets'] = json.loads(self.targets) + tmp['latest_status_alias'] = self.get_latest_status_display() + return tmp + + def __repr__(self): + return '' % self.name + + class Meta: + db_table = 'tasks' + ordering = ('-id',) diff --git a/spug_api/apps/schedule/scheduler.py b/spug_api/apps/schedule/scheduler.py new file mode 100644 index 0000000..5ee92da --- /dev/null +++ b/spug_api/apps/schedule/scheduler.py @@ -0,0 +1,83 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.date import DateTrigger +from apscheduler import events +from django_redis import get_redis_connection +from apps.schedule.models import Task +from apps.schedule.executors import dispatch +from django.conf import settings +from libs import AttrDict, human_time +import logging +import json + +logger = logging.getLogger("django.apps.scheduler") + + +class Scheduler: + timezone = settings.TIME_ZONE + + def __init__(self): + self.scheduler = BackgroundScheduler(timezone=self.timezone) + self.scheduler.add_listener(self._handle_event, ) + + @classmethod + def parse_trigger(cls, trigger, trigger_args): + if trigger == 'interval': + return IntervalTrigger(seconds=int(trigger_args), timezone=cls.timezone) + elif trigger == 'date': + return DateTrigger(run_date=trigger_args, timezone=cls.timezone) + else: + raise TypeError(f'unknown schedule policy: {trigger!r}') + + def _handle_event(self, event): + if event.code == events.EVENT_SCHEDULER_SHUTDOWN: + logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}') + if event.code == events.EVENT_JOB_MAX_INSTANCES: + logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}') + if event.code == events.EVENT_JOB_ERROR: + logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') + if event.code == events.EVENT_JOB_MISSED: + logger.info(f'EVENT_JOB_MISSED: job_id {event.job_id}') + if event.code == events.EVENT_JOB_EXECUTED: + score = 0 + for item in event.retval: + score += 1 if item[1] else 0 + Task.objects.filter(pk=event.job_id).update( + latest_status=2 if score == len(event.retval) else 1 if score else 0, + latest_run_time=human_time(event.scheduled_run_time), + latest_output=json.dumps(event.retval) + ) + + def _init(self): + self.scheduler.start() + for task in Task.objects.filter(is_active=True): + trigger = self.parse_trigger(task.trigger, task.trigger_args) + self.scheduler.add_job( + dispatch, + trigger, + id=str(task.id), + args=(task.command, json.loads(task.targets)), + ) + + def run(self): + rds_cli = get_redis_connection() + self._init() + rds_cli.delete(settings.SCHEDULE_KEY) + logger.info('Running scheduler') + while True: + _, data = rds_cli.blpop(settings.SCHEDULE_KEY) + task = AttrDict(json.loads(data)) + print(f'queue: {task!r}') + if task.action in ('add', 'modify'): + trigger = self.parse_trigger(task.trigger, task.trigger_args) + self.scheduler.add_job( + dispatch, + trigger, + id=str(task.id), + args=(task.command, task.targets), + replace_existing=True + ) + elif task.action == 'remove': + job = self.scheduler.get_job(str(task.id)) + if job: + job.remove() diff --git a/spug_api/apps/schedule/urls.py b/spug_api/apps/schedule/urls.py new file mode 100644 index 0000000..99690a9 --- /dev/null +++ b/spug_api/apps/schedule/urls.py @@ -0,0 +1,7 @@ +from django.conf.urls import url + +from .views import * + +urlpatterns = [ + url(r'^$', Schedule.as_view()), +] diff --git a/spug_api/apps/schedule/views.py b/spug_api/apps/schedule/views.py new file mode 100644 index 0000000..b8f8108 --- /dev/null +++ b/spug_api/apps/schedule/views.py @@ -0,0 +1,69 @@ +from django.views.generic import View +from django_redis import get_redis_connection +from apps.schedule.models import Task +from django.conf import settings +from libs import json_response, JsonParser, Argument, human_time +import json + + +class Schedule(View): + def get(self, request): + tasks = Task.objects.all() + types = [x['type'] for x in tasks.order_by('type').values('type').distinct()] + return json_response({'types': types, 'tasks': [x.to_dict() for x in tasks]}) + + def post(self, request): + form, error = JsonParser( + Argument('id', type=int, required=False), + Argument('type', help='请输入任务类型'), + Argument('name', help='请输入任务名称'), + Argument('command', help='请输入任务内容'), + Argument('targets', type=list, filter=lambda x: len(x), help='请选择执行对象'), + Argument('trigger', filter=lambda x: x in dict(Task.TRIGGERS), help='请选择触发器类型'), + Argument('trigger_args', help='请输入触发器参数'), + Argument('desc', required=False), + ).parse(request.body) + if error is None: + form.targets = json.dumps(form.targets) + if form.id: + Task.objects.filter(pk=form.id).update( + updated_at=human_time(), + updated_by=request.user, + **form + ) + form.action = 'modify' + rds_cli = get_redis_connection() + rds_cli.rpush(settings.SCHEDULE_KEY, json.dumps(form)) + else: + Task.objects.create(created_by=request.user, **form) + return json_response(error=error) + + def patch(self, request): + form, error = JsonParser( + Argument('id', type=int, help='请指定操作对象'), + Argument('is_active', type=bool, required=False) + ).parse(request.body, True) + if error is None: + Task.objects.filter(pk=form.id).update(**form) + if form.get('is_active') is not None: + if form.is_active: + task = Task.objects.filter(pk=form.id).first() + message = {'id': form.id, 'action': 'add'} + message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets'))) + else: + message = {'id': form.id, 'action': 'remove'} + rds_cli = get_redis_connection() + rds_cli.rpush(settings.SCHEDULE_KEY, json.dumps(message)) + return json_response(error=error) + + def delete(self, request): + form, error = JsonParser( + Argument('id', type=int, help='请指定操作对象') + ).parse(request.GET) + if error is None: + task = Task.objects.filter(pk=form.id).first() + if task: + if task.is_active: + return json_response(error='该任务在运行中,请先停止任务再尝试删除') + task.delete() + return json_response(error=error) diff --git a/spug_api/requirements.txt b/spug_api/requirements.txt index ed53582..cb83f47 100644 --- a/spug_api/requirements.txt +++ b/spug_api/requirements.txt @@ -1,3 +1,4 @@ +apscheduler==3.6.3 Django==2.2.7 channels==2.3.1 paramiko==2.6.0 diff --git a/spug_api/spug/settings.py b/spug_api/spug/settings.py index ea4233b..b62e0dd 100644 --- a/spug_api/spug/settings.py +++ b/spug_api/spug/settings.py @@ -34,6 +34,7 @@ INSTALLED_APPS = [ 'apps.host', 'apps.setting', 'apps.exec', + 'apps.schedule', ] MIDDLEWARE = [ @@ -53,6 +54,7 @@ ASGI_APPLICATION = 'spug.routing.application' DATABASES = { 'default': { + 'ATOMIC_REQUESTS': True, 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), } @@ -77,6 +79,8 @@ CHANNEL_LAYERS = { }, } +SCHEDULE_KEY = 'spug:schedule' + # Internationalization # https://docs.djangoproject.com/en/2.2/topics/i18n/ diff --git a/spug_api/spug/urls.py b/spug_api/spug/urls.py index 135ffed..e6dc2fe 100644 --- a/spug_api/spug/urls.py +++ b/spug_api/spug/urls.py @@ -19,4 +19,5 @@ urlpatterns = [ path('account/', include('apps.account.urls')), path('host/', include('apps.host.urls')), path('exec/', include('apps.exec.urls')), + path('schedule/', include('apps.schedule.urls')), ]