upgrade schedule module

pull/330/head
vapao 2021-04-22 12:55:36 +08:00
parent 00bbfea616
commit 2ed651bd52
13 changed files with 167 additions and 117 deletions

View File

@ -1,9 +0,0 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Released under the AGPL-3.0 License.
from apps.account.models import History
from datetime import datetime, timedelta
def auto_clean_login_history():
date = datetime.now() - timedelta(days=30)
History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete()

View File

@ -1,9 +0,0 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Released under the AGPL-3.0 License.
from apps.alarm.models import Alarm
from datetime import datetime, timedelta
def auto_clean_alarm_records():
date = datetime.now() - timedelta(days=30)
Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete()

View File

@ -3,12 +3,10 @@
# Released under the AGPL-3.0 License.
from django_redis import get_redis_connection
from django.conf import settings
from libs.utils import AttrDict, human_time, human_datetime, parse_time
from libs.utils import AttrDict, human_time, human_datetime
from apps.host.models import Host
from apps.notify.models import Notify
from apps.deploy.models import DeployRequest
from concurrent import futures
from datetime import datetime
import requests
import subprocess
import json
@ -414,11 +412,3 @@ class Helper:
self.send_info(key, out)
if code != 0:
self.send_error(key, f'exit code: {code}')
def auto_update_status():
now = datetime.now()
for req in DeployRequest.objects.filter(status='2'):
if (now - parse_time(req.do_at)).seconds > 3600:
req.status = '-3'
req.save()

View File

@ -0,0 +1,35 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.core.management.base import BaseCommand
from django.conf import settings
from django_redis import get_redis_connection
from concurrent.futures import ThreadPoolExecutor
from apps.schedule.executors import schedule_worker_handler
import logging
MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY
SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY
class Worker:
def __init__(self):
self.rds = get_redis_connection()
self._executor = ThreadPoolExecutor(max_workers=100)
def run(self):
logging.warning('Running worker')
while True:
key, job = self.rds.blpop([SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
if key.decode() == SCHEDULE_WORKER_KEY:
self._executor.submit(schedule_worker_handler, job)
else:
pass
class Command(BaseCommand):
help = 'Start worker process'
def handle(self, *args, **options):
w = Worker()
w.run()

View File

@ -0,0 +1,31 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.db import close_old_connections
from apps.account.models import History
from apps.alarm.models import Alarm
from apps.schedule.models import Task
from apps.deploy.models import DeployRequest
from libs.utils import parse_time
from datetime import datetime, timedelta
def auto_run_by_day():
close_old_connections()
date = datetime.now() - timedelta(days=30)
History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete()
Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete()
for task in Task.objects.all():
try:
record = History.objects.filter(task_id=task.id)[50]
History.objects.filter(task_id=task.id, id__lt=record.id).delete()
except IndexError:
pass
def auto_run_by_minute():
now = datetime.now()
for req in DeployRequest.objects.filter(status='2'):
if (now - parse_time(req.do_at)).seconds > 3600:
req.status = '-3'
req.save()

View File

@ -1,54 +1,62 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from queue import Queue
from threading import Thread
from libs.ssh import AuthenticationException
from django.db import close_old_connections, transaction
from apps.host.models import Host
from django.db import close_old_connections
from apps.schedule.models import History, Task
from apps.schedule.utils import send_fail_notify
import subprocess
import socket
import time
import json
def local_executor(q, command):
exit_code, out, now = -1, None, time.time()
def local_executor(command):
code, out, now = 1, None, time.time()
task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
exit_code = task.wait()
code = task.wait(3600)
out = task.stdout.read() + task.stderr.read()
finally:
q.put(('local', exit_code, round(time.time() - now, 3), out.decode()))
out = out.decode()
except subprocess.TimeoutExpired:
# task.kill()
out = 'timeout, wait more than 1 hour'
return code, round(time.time() - now, 3), out
def host_executor(q, host, command):
exit_code, out, now = -1, None, time.time()
def host_executor(host, command):
code, out, now = 1, None, time.time()
try:
cli = host.get_ssh()
exit_code, out = cli.exec_command(command)
out = out if out else None
code, out = cli.exec_command(command)
except AuthenticationException:
out = 'ssh authentication fail'
except socket.error as e:
out = f'network error {e}'
finally:
q.put((host.id, exit_code, round(time.time() - now, 3), out))
return code, round(time.time() - now, 3), out
def dispatch(command, targets, in_view=False):
if not in_view:
def schedule_worker_handler(job):
history_id, host_id, command = json.loads(job)
if host_id == 'local':
code, duration, out = local_executor(command)
else:
close_old_connections()
threads, q = [], Queue()
for t in targets:
if t == 'local':
threads.append(Thread(target=local_executor, args=(q, command)))
elif isinstance(t, int):
host = Host.objects.filter(pk=t).first()
if not host:
raise ValueError(f'unknown host id: {t!r}')
threads.append(Thread(target=host_executor, args=(q, host, command)))
host = Host.objects.filter(pk=host_id).first()
if not host:
code, duration, out = 1, 0, f'unknown host id for {host_id!r}'
else:
raise ValueError(f'invalid target: {t!r}')
for t in threads:
t.start()
return [q.get() for _ in threads]
code, duration, out = host_executor(host, command)
close_old_connections()
with transaction.atomic():
history = History.objects.select_for_update().get(pk=history_id)
output = json.loads(history.output)
output[str(host_id)] = [code, duration, out]
history.output = json.dumps(output)
if all(output.values()):
history.status = '1' if sum(x[0] for x in output.values()) == 0 else '2'
history.save()
if history.status == '2':
task = Task.objects.get(pk=history.task_id)
send_fail_notify(task)

View File

@ -9,8 +9,8 @@ import json
class History(models.Model, ModelMixin):
STATUS = (
(0, '成功'),
(1, '异常'),
(0, '执行中'),
(1, '成功'),
(2, '失败'),
)
task_id = models.IntegerField()

View File

@ -6,23 +6,21 @@ from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR
from django_redis import get_redis_connection
from django.utils.functional import SimpleLazyObject
from django.db import close_old_connections
from apps.schedule.models import Task, History
from apps.schedule.utils import send_fail_notify
from apps.notify.models import Notify
from apps.schedule.executors import dispatch
from apps.schedule.utils import auto_clean_schedule_history
from apps.alarm.utils import auto_clean_alarm_records
from apps.account.utils import auto_clean_login_history
from apps.deploy.utils import auto_update_status
from apps.schedule.builtin import auto_run_by_day, auto_run_by_minute
from django.conf import settings
from libs import AttrDict, human_datetime
import logging
import json
SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY
class Scheduler:
timezone = settings.TIME_ZONE
@ -42,7 +40,8 @@ class Scheduler:
self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)})
self.scheduler.add_listener(
self._handle_event,
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED)
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES
)
@classmethod
def parse_trigger(cls, trigger, trigger_args):
@ -71,26 +70,24 @@ class Scheduler:
elif event.code == EVENT_JOB_ERROR:
logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
send_fail_notify(obj, f'执行异常:{event.exception}')
elif event.code == EVENT_JOB_EXECUTED:
if event.retval:
score = 0
for item in event.retval:
score += 1 if item[1] else 0
history = History.objects.create(
task_id=event.job_id,
status=2 if score == len(event.retval) else 1 if score else 0,
run_time=human_datetime(event.scheduled_run_time),
output=json.dumps(event.retval)
)
Task.objects.filter(pk=event.job_id).update(latest=history)
if score != 0:
send_fail_notify(obj)
def _init_builtin_jobs(self):
self.scheduler.add_job(auto_clean_alarm_records, 'cron', hour=0, minute=1)
self.scheduler.add_job(auto_clean_login_history, 'cron', hour=0, minute=2)
self.scheduler.add_job(auto_clean_schedule_history, 'cron', hour=0, minute=3)
self.scheduler.add_job(auto_update_status, 'interval', minutes=5)
self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20)
self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=5)
def _dispatch(self, task_id, command, targets):
close_old_connections()
output = {x: None for x in targets}
history = History.objects.create(
task_id=task_id,
status='0',
run_time=human_datetime(),
output=json.dumps(output)
)
Task.objects.filter(pk=task_id).update(latest_id=history.id)
rds_cli = get_redis_connection()
for t in targets:
rds_cli.rpush(SCHEDULE_WORKER_KEY, json.dumps([history.id, t, command]))
def _init(self):
self.scheduler.start()
@ -98,10 +95,10 @@ class Scheduler:
for task in Task.objects.filter(is_active=True):
trigger = self.parse_trigger(task.trigger, task.trigger_args)
self.scheduler.add_job(
dispatch,
self._dispatch,
trigger,
id=str(task.id),
args=(task.command, json.loads(task.targets)),
args=(task.id, task.command, json.loads(task.targets)),
)
def run(self):
@ -115,10 +112,10 @@ class Scheduler:
if task.action in ('add', 'modify'):
trigger = self.parse_trigger(task.trigger, task.trigger_args)
self.scheduler.add_job(
dispatch,
self._dispatch,
trigger,
id=str(task.id),
args=(task.command, task.targets),
args=(task.id, task.command, task.targets),
replace_existing=True
)
elif task.action == 'remove':

View File

@ -1,7 +1,6 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from apps.schedule.models import Task, History
from apps.notify.models import Notify
from libs.utils import human_datetime
from threading import Thread
@ -9,15 +8,6 @@ import requests
import json
def auto_clean_schedule_history():
for task in Task.objects.all():
try:
record = History.objects.filter(task_id=task.id)[50]
History.objects.filter(task_id=task.id, id__lt=record.id).delete()
except IndexError:
pass
def send_fail_notify(task, msg=None):
rst_notify = json.loads(task.rst_notify)
mode = rst_notify.get('mode')

View File

@ -7,7 +7,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apps.schedule.scheduler import Scheduler
from apps.schedule.models import Task, History
from apps.schedule.executors import dispatch
from apps.schedule.executors import local_executor, host_executor
from apps.host.models import Host
from django.conf import settings
from libs import json_response, JsonParser, Argument, human_datetime
@ -110,30 +110,43 @@ class HistoryView(View):
task = Task.objects.filter(pk=t_id).first()
if not task:
return json_response(error='未找到指定任务')
data = dispatch(task.command, json.loads(task.targets), True)
score = 0
for item in data:
score += 1 if item[1] else 0
outputs, status = {}, 1
for host_id in json.loads(task.targets):
if host_id == 'local':
code, duration, out = local_executor(task.command)
else:
host = Host.objects.filter(pk=host_id).first()
if not host:
code, duration, out = 1, 0, f'unknown host id for {host_id!r}'
else:
code, duration, out = host_executor(host, task.command)
if code != 0:
status = 2
outputs[host_id] = [code, duration, out]
history = History.objects.create(
task_id=t_id,
status=2 if score == len(data) else 1 if score else 0,
task_id=task.id,
status=status,
run_time=human_datetime(),
output=json.dumps(data)
output=json.dumps(outputs)
)
return json_response(history.id)
def _fetch_detail(self, h_id):
record = History.objects.filter(pk=h_id).first()
outputs = json.loads(record.output)
host_ids = (x[0] for x in outputs if isinstance(x[0], int))
hosts_info = {x.id: x.name for x in Host.objects.filter(id__in=host_ids)}
host_ids = (x for x in outputs.keys() if x != 'local')
hosts_info = {str(x.id): x.name for x in Host.objects.filter(id__in=host_ids)}
data = {'run_time': record.run_time, 'success': 0, 'failure': 0, 'duration': 0, 'outputs': []}
for h_id, code, duration, out in outputs:
for host_id, value in outputs.items():
if not value:
continue
code, duration, out = value
key = 'success' if code == 0 else 'failure'
data[key] += 1
data['duration'] += duration
data['outputs'].append({
'name': hosts_info.get(h_id, '本机'),
'name': hosts_info.get(host_id, '本机'),
'code': code,
'duration': duration,
'output': out})

View File

@ -103,7 +103,9 @@ TEMPLATES = [
]
SCHEDULE_KEY = 'spug:schedule'
SCHEDULE_WORKER_KEY = 'spug:schedule:worker'
MONITOR_KEY = 'spug:monitor'
MONITOR_WORKER_KEY = 'spug:monitor:worker'
REQUEST_KEY = 'spug:request'
BUILD_KEY = 'spug:build'
REPOS_DIR = os.path.join(BASE_DIR, 'repos')

View File

@ -26,7 +26,7 @@ class Record extends React.Component {
.finally(() => this.setState({loading: false}))
}
colors = ['green', 'orange', 'red'];
colors = ['orange', 'green', 'red'];
columns = [{
title: '执行时间',

View File

@ -17,7 +17,7 @@ class ComTable extends React.Component {
store.fetchRecords()
}
colors = ['green', 'orange', 'red'];
colors = ['orange', 'green', 'red'];
moreMenus = (info) => (
<Menu>
@ -25,8 +25,10 @@ class ComTable extends React.Component {
<LinkButton onClick={() => this.handleTest(info)}>执行测试</LinkButton>
</Menu.Item>
<Menu.Item>
<LinkButton auth="schedule.schedule.edit"
onClick={() => this.handleActive(info)}>{info.is_active ? '禁用任务' : '激活任务'}</LinkButton>
<LinkButton
auth="schedule.schedule.edit"
onClick={() => this.handleActive(info)}>
{info.is_active ? '禁用任务' : '激活任务'}</LinkButton>
</Menu.Item>
<Menu.Item>
<LinkButton onClick={() => store.showRecord(info)}>历史记录</LinkButton>
@ -113,7 +115,7 @@ class ComTable extends React.Component {
handleTest = (text) => {
Modal.confirm({
title: '操作确认',
content: '立即执行该任务(不影响调度规则,且不会触发失败通知)?',
content: '立即以串行模式执行该任务(不影响调度规则,且不会触发失败通知)?',
onOk: () => http.post(`/api/schedule/${text.id}/`, null, {timeout: 120000})
.then(res => store.showInfo(text, res))
})