mirror of https://github.com/openspug/spug
A api add schedule module
parent
cd3918c958
commit
76f3f33c31
|
@ -0,0 +1,42 @@
|
|||
from queue import Queue
|
||||
from threading import Thread
|
||||
from libs.ssh import SSH
|
||||
from apps.host.models import Host
|
||||
from apps.setting.utils import AppSetting
|
||||
import subprocess
|
||||
|
||||
|
||||
def local_executor(q, command):
|
||||
exit_code, out = -1, None
|
||||
try:
|
||||
task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
exit_code = task.wait()
|
||||
out = task.stdout.read() + task.stderr.read()
|
||||
finally:
|
||||
q.put(('local', exit_code, out.decode()))
|
||||
|
||||
|
||||
def host_executor(q, host, pkey, command):
|
||||
exit_code, out = -1, None
|
||||
try:
|
||||
cli = SSH(host.hostname, host.port, host.username, pkey=pkey)
|
||||
exit_code, out = cli.exec_command(command)
|
||||
finally:
|
||||
q.put((host.id, exit_code, out.decode()))
|
||||
|
||||
|
||||
def dispatch(command, targets):
|
||||
threads, pkey, q = [], AppSetting.get('private_key'), Queue()
|
||||
for t in targets:
|
||||
if t == 'local':
|
||||
threads.append(Thread(target=local_executor, args=(q, command)))
|
||||
elif isinstance(t, int):
|
||||
host = Host.objects.filter(pk=t).first()
|
||||
if not host:
|
||||
raise ValueError(f'unknown host id: {t!r}')
|
||||
threads.append(Thread(target=host_executor, args=(q, host, pkey, command)))
|
||||
else:
|
||||
raise ValueError(f'invalid target: {t!r}')
|
||||
for t in threads:
|
||||
t.start()
|
||||
return [q.get() for _ in threads]
|
|
@ -0,0 +1,10 @@
|
|||
from django.core.management.base import BaseCommand
|
||||
from apps.schedule.scheduler import Scheduler
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Start schedule process'
|
||||
|
||||
def handle(self, *args, **options):
|
||||
s = Scheduler()
|
||||
s.run()
|
|
@ -0,0 +1,47 @@
|
|||
from django.db import models
|
||||
from libs import ModelMixin, human_time
|
||||
from apps.account.models import User
|
||||
import json
|
||||
|
||||
|
||||
class Task(models.Model, ModelMixin):
|
||||
TRIGGERS = (
|
||||
('date', '一次性'),
|
||||
('calendarinterval', '日历间隔'),
|
||||
('cron', 'UNIX cron'),
|
||||
('interval', '普通间隔')
|
||||
)
|
||||
STATUS = (
|
||||
(0, '成功'),
|
||||
(1, '异常'),
|
||||
(2, '失败'),
|
||||
)
|
||||
name = models.CharField(max_length=50)
|
||||
type = models.CharField(max_length=50)
|
||||
command = models.TextField()
|
||||
targets = models.TextField()
|
||||
trigger = models.CharField(max_length=20, choices=TRIGGERS)
|
||||
trigger_args = models.CharField(max_length=255)
|
||||
is_active = models.BooleanField(default=False)
|
||||
desc = models.CharField(max_length=255, null=True)
|
||||
latest_status = models.SmallIntegerField(choices=STATUS, null=True)
|
||||
latest_run_time = models.CharField(max_length=20, null=True)
|
||||
latest_output = models.TextField(null=True)
|
||||
|
||||
created_at = models.CharField(max_length=20, default=human_time)
|
||||
created_by = models.ForeignKey(User, models.PROTECT, related_name='+')
|
||||
updated_at = models.CharField(max_length=20, null=True)
|
||||
updated_by = models.ForeignKey(User, models.PROTECT, related_name='+', null=True)
|
||||
|
||||
def to_dict(self, *args, **kwargs):
|
||||
tmp = super().to_dict(*args, **kwargs)
|
||||
tmp['targets'] = json.loads(self.targets)
|
||||
tmp['latest_status_alias'] = self.get_latest_status_display()
|
||||
return tmp
|
||||
|
||||
def __repr__(self):
|
||||
return '<Task %r>' % self.name
|
||||
|
||||
class Meta:
|
||||
db_table = 'tasks'
|
||||
ordering = ('-id',)
|
|
@ -0,0 +1,83 @@
|
|||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from apscheduler.triggers.date import DateTrigger
|
||||
from apscheduler import events
|
||||
from django_redis import get_redis_connection
|
||||
from apps.schedule.models import Task
|
||||
from apps.schedule.executors import dispatch
|
||||
from django.conf import settings
|
||||
from libs import AttrDict, human_time
|
||||
import logging
|
||||
import json
|
||||
|
||||
logger = logging.getLogger("django.apps.scheduler")
|
||||
|
||||
|
||||
class Scheduler:
|
||||
timezone = settings.TIME_ZONE
|
||||
|
||||
def __init__(self):
|
||||
self.scheduler = BackgroundScheduler(timezone=self.timezone)
|
||||
self.scheduler.add_listener(self._handle_event, )
|
||||
|
||||
@classmethod
|
||||
def parse_trigger(cls, trigger, trigger_args):
|
||||
if trigger == 'interval':
|
||||
return IntervalTrigger(seconds=int(trigger_args), timezone=cls.timezone)
|
||||
elif trigger == 'date':
|
||||
return DateTrigger(run_date=trigger_args, timezone=cls.timezone)
|
||||
else:
|
||||
raise TypeError(f'unknown schedule policy: {trigger!r}')
|
||||
|
||||
def _handle_event(self, event):
|
||||
if event.code == events.EVENT_SCHEDULER_SHUTDOWN:
|
||||
logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
|
||||
if event.code == events.EVENT_JOB_MAX_INSTANCES:
|
||||
logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}')
|
||||
if event.code == events.EVENT_JOB_ERROR:
|
||||
logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
|
||||
if event.code == events.EVENT_JOB_MISSED:
|
||||
logger.info(f'EVENT_JOB_MISSED: job_id {event.job_id}')
|
||||
if event.code == events.EVENT_JOB_EXECUTED:
|
||||
score = 0
|
||||
for item in event.retval:
|
||||
score += 1 if item[1] else 0
|
||||
Task.objects.filter(pk=event.job_id).update(
|
||||
latest_status=2 if score == len(event.retval) else 1 if score else 0,
|
||||
latest_run_time=human_time(event.scheduled_run_time),
|
||||
latest_output=json.dumps(event.retval)
|
||||
)
|
||||
|
||||
def _init(self):
|
||||
self.scheduler.start()
|
||||
for task in Task.objects.filter(is_active=True):
|
||||
trigger = self.parse_trigger(task.trigger, task.trigger_args)
|
||||
self.scheduler.add_job(
|
||||
dispatch,
|
||||
trigger,
|
||||
id=str(task.id),
|
||||
args=(task.command, json.loads(task.targets)),
|
||||
)
|
||||
|
||||
def run(self):
|
||||
rds_cli = get_redis_connection()
|
||||
self._init()
|
||||
rds_cli.delete(settings.SCHEDULE_KEY)
|
||||
logger.info('Running scheduler')
|
||||
while True:
|
||||
_, data = rds_cli.blpop(settings.SCHEDULE_KEY)
|
||||
task = AttrDict(json.loads(data))
|
||||
print(f'queue: {task!r}')
|
||||
if task.action in ('add', 'modify'):
|
||||
trigger = self.parse_trigger(task.trigger, task.trigger_args)
|
||||
self.scheduler.add_job(
|
||||
dispatch,
|
||||
trigger,
|
||||
id=str(task.id),
|
||||
args=(task.command, task.targets),
|
||||
replace_existing=True
|
||||
)
|
||||
elif task.action == 'remove':
|
||||
job = self.scheduler.get_job(str(task.id))
|
||||
if job:
|
||||
job.remove()
|
|
@ -0,0 +1,7 @@
|
|||
from django.conf.urls import url
|
||||
|
||||
from .views import *
|
||||
|
||||
urlpatterns = [
|
||||
url(r'^$', Schedule.as_view()),
|
||||
]
|
|
@ -0,0 +1,69 @@
|
|||
from django.views.generic import View
|
||||
from django_redis import get_redis_connection
|
||||
from apps.schedule.models import Task
|
||||
from django.conf import settings
|
||||
from libs import json_response, JsonParser, Argument, human_time
|
||||
import json
|
||||
|
||||
|
||||
class Schedule(View):
|
||||
def get(self, request):
|
||||
tasks = Task.objects.all()
|
||||
types = [x['type'] for x in tasks.order_by('type').values('type').distinct()]
|
||||
return json_response({'types': types, 'tasks': [x.to_dict() for x in tasks]})
|
||||
|
||||
def post(self, request):
|
||||
form, error = JsonParser(
|
||||
Argument('id', type=int, required=False),
|
||||
Argument('type', help='请输入任务类型'),
|
||||
Argument('name', help='请输入任务名称'),
|
||||
Argument('command', help='请输入任务内容'),
|
||||
Argument('targets', type=list, filter=lambda x: len(x), help='请选择执行对象'),
|
||||
Argument('trigger', filter=lambda x: x in dict(Task.TRIGGERS), help='请选择触发器类型'),
|
||||
Argument('trigger_args', help='请输入触发器参数'),
|
||||
Argument('desc', required=False),
|
||||
).parse(request.body)
|
||||
if error is None:
|
||||
form.targets = json.dumps(form.targets)
|
||||
if form.id:
|
||||
Task.objects.filter(pk=form.id).update(
|
||||
updated_at=human_time(),
|
||||
updated_by=request.user,
|
||||
**form
|
||||
)
|
||||
form.action = 'modify'
|
||||
rds_cli = get_redis_connection()
|
||||
rds_cli.rpush(settings.SCHEDULE_KEY, json.dumps(form))
|
||||
else:
|
||||
Task.objects.create(created_by=request.user, **form)
|
||||
return json_response(error=error)
|
||||
|
||||
def patch(self, request):
|
||||
form, error = JsonParser(
|
||||
Argument('id', type=int, help='请指定操作对象'),
|
||||
Argument('is_active', type=bool, required=False)
|
||||
).parse(request.body, True)
|
||||
if error is None:
|
||||
Task.objects.filter(pk=form.id).update(**form)
|
||||
if form.get('is_active') is not None:
|
||||
if form.is_active:
|
||||
task = Task.objects.filter(pk=form.id).first()
|
||||
message = {'id': form.id, 'action': 'add'}
|
||||
message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets')))
|
||||
else:
|
||||
message = {'id': form.id, 'action': 'remove'}
|
||||
rds_cli = get_redis_connection()
|
||||
rds_cli.rpush(settings.SCHEDULE_KEY, json.dumps(message))
|
||||
return json_response(error=error)
|
||||
|
||||
def delete(self, request):
|
||||
form, error = JsonParser(
|
||||
Argument('id', type=int, help='请指定操作对象')
|
||||
).parse(request.GET)
|
||||
if error is None:
|
||||
task = Task.objects.filter(pk=form.id).first()
|
||||
if task:
|
||||
if task.is_active:
|
||||
return json_response(error='该任务在运行中,请先停止任务再尝试删除')
|
||||
task.delete()
|
||||
return json_response(error=error)
|
|
@ -1,3 +1,4 @@
|
|||
apscheduler==3.6.3
|
||||
Django==2.2.7
|
||||
channels==2.3.1
|
||||
paramiko==2.6.0
|
||||
|
|
|
@ -34,6 +34,7 @@ INSTALLED_APPS = [
|
|||
'apps.host',
|
||||
'apps.setting',
|
||||
'apps.exec',
|
||||
'apps.schedule',
|
||||
]
|
||||
|
||||
MIDDLEWARE = [
|
||||
|
@ -53,6 +54,7 @@ ASGI_APPLICATION = 'spug.routing.application'
|
|||
|
||||
DATABASES = {
|
||||
'default': {
|
||||
'ATOMIC_REQUESTS': True,
|
||||
'ENGINE': 'django.db.backends.sqlite3',
|
||||
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
|
||||
}
|
||||
|
@ -77,6 +79,8 @@ CHANNEL_LAYERS = {
|
|||
},
|
||||
}
|
||||
|
||||
SCHEDULE_KEY = 'spug:schedule'
|
||||
|
||||
# Internationalization
|
||||
# https://docs.djangoproject.com/en/2.2/topics/i18n/
|
||||
|
||||
|
|
|
@ -19,4 +19,5 @@ urlpatterns = [
|
|||
path('account/', include('apps.account.urls')),
|
||||
path('host/', include('apps.host.urls')),
|
||||
path('exec/', include('apps.exec.urls')),
|
||||
path('schedule/', include('apps.schedule.urls')),
|
||||
]
|
||||
|
|
Loading…
Reference in New Issue