diff --git a/spug_api/apps/account/utils.py b/spug_api/apps/account/utils.py deleted file mode 100644 index c6b2a5c..0000000 --- a/spug_api/apps/account/utils.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug -# Released under the AGPL-3.0 License. -from apps.account.models import History -from datetime import datetime, timedelta - - -def auto_clean_login_history(): - date = datetime.now() - timedelta(days=30) - History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() diff --git a/spug_api/apps/alarm/utils.py b/spug_api/apps/alarm/utils.py deleted file mode 100644 index 56eb5f9..0000000 --- a/spug_api/apps/alarm/utils.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug -# Released under the AGPL-3.0 License. -from apps.alarm.models import Alarm -from datetime import datetime, timedelta - - -def auto_clean_alarm_records(): - date = datetime.now() - timedelta(days=30) - Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() diff --git a/spug_api/apps/deploy/utils.py b/spug_api/apps/deploy/utils.py index 04c1a29..c562633 100644 --- a/spug_api/apps/deploy/utils.py +++ b/spug_api/apps/deploy/utils.py @@ -3,12 +3,10 @@ # Released under the AGPL-3.0 License. from django_redis import get_redis_connection from django.conf import settings -from libs.utils import AttrDict, human_time, human_datetime, parse_time +from libs.utils import AttrDict, human_time, human_datetime from apps.host.models import Host from apps.notify.models import Notify -from apps.deploy.models import DeployRequest from concurrent import futures -from datetime import datetime import requests import subprocess import json @@ -414,11 +412,3 @@ class Helper: self.send_info(key, out) if code != 0: self.send_error(key, f'exit code: {code}') - - -def auto_update_status(): - now = datetime.now() - for req in DeployRequest.objects.filter(status='2'): - if (now - parse_time(req.do_at)).seconds > 3600: - req.status = '-3' - req.save() diff --git a/spug_api/apps/exec/management/commands/runexecutor.py b/spug_api/apps/exec/management/commands/runexecutor.py new file mode 100644 index 0000000..d2a33f9 --- /dev/null +++ b/spug_api/apps/exec/management/commands/runexecutor.py @@ -0,0 +1,35 @@ +# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug +# Copyright: (c) +# Released under the AGPL-3.0 License. +from django.core.management.base import BaseCommand +from django.conf import settings +from django_redis import get_redis_connection +from concurrent.futures import ThreadPoolExecutor +from apps.schedule.executors import schedule_worker_handler +import logging + +MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY +SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY + + +class Worker: + def __init__(self): + self.rds = get_redis_connection() + self._executor = ThreadPoolExecutor(max_workers=100) + + def run(self): + logging.warning('Running worker') + while True: + key, job = self.rds.blpop([SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) + if key.decode() == SCHEDULE_WORKER_KEY: + self._executor.submit(schedule_worker_handler, job) + else: + pass + + +class Command(BaseCommand): + help = 'Start worker process' + + def handle(self, *args, **options): + w = Worker() + w.run() diff --git a/spug_api/apps/schedule/builtin.py b/spug_api/apps/schedule/builtin.py new file mode 100644 index 0000000..71836bf --- /dev/null +++ b/spug_api/apps/schedule/builtin.py @@ -0,0 +1,31 @@ +# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug +# Copyright: (c) +# Released under the AGPL-3.0 License. +from django.db import close_old_connections +from apps.account.models import History +from apps.alarm.models import Alarm +from apps.schedule.models import Task +from apps.deploy.models import DeployRequest +from libs.utils import parse_time +from datetime import datetime, timedelta + + +def auto_run_by_day(): + close_old_connections() + date = datetime.now() - timedelta(days=30) + History.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() + Alarm.objects.filter(created_at__lt=date.strftime('%Y-%m-%d')).delete() + for task in Task.objects.all(): + try: + record = History.objects.filter(task_id=task.id)[50] + History.objects.filter(task_id=task.id, id__lt=record.id).delete() + except IndexError: + pass + + +def auto_run_by_minute(): + now = datetime.now() + for req in DeployRequest.objects.filter(status='2'): + if (now - parse_time(req.do_at)).seconds > 3600: + req.status = '-3' + req.save() diff --git a/spug_api/apps/schedule/executors.py b/spug_api/apps/schedule/executors.py index c78b4b8..4a00efb 100644 --- a/spug_api/apps/schedule/executors.py +++ b/spug_api/apps/schedule/executors.py @@ -1,54 +1,62 @@ # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. -from queue import Queue -from threading import Thread from libs.ssh import AuthenticationException +from django.db import close_old_connections, transaction from apps.host.models import Host -from django.db import close_old_connections +from apps.schedule.models import History, Task +from apps.schedule.utils import send_fail_notify import subprocess import socket import time +import json -def local_executor(q, command): - exit_code, out, now = -1, None, time.time() +def local_executor(command): + code, out, now = 1, None, time.time() + task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: - task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - exit_code = task.wait() + code = task.wait(3600) out = task.stdout.read() + task.stderr.read() - finally: - q.put(('local', exit_code, round(time.time() - now, 3), out.decode())) + out = out.decode() + except subprocess.TimeoutExpired: + # task.kill() + out = 'timeout, wait more than 1 hour' + return code, round(time.time() - now, 3), out -def host_executor(q, host, command): - exit_code, out, now = -1, None, time.time() +def host_executor(host, command): + code, out, now = 1, None, time.time() try: cli = host.get_ssh() - exit_code, out = cli.exec_command(command) - out = out if out else None + code, out = cli.exec_command(command) except AuthenticationException: out = 'ssh authentication fail' except socket.error as e: out = f'network error {e}' - finally: - q.put((host.id, exit_code, round(time.time() - now, 3), out)) + return code, round(time.time() - now, 3), out -def dispatch(command, targets, in_view=False): - if not in_view: +def schedule_worker_handler(job): + history_id, host_id, command = json.loads(job) + if host_id == 'local': + code, duration, out = local_executor(command) + else: close_old_connections() - threads, q = [], Queue() - for t in targets: - if t == 'local': - threads.append(Thread(target=local_executor, args=(q, command))) - elif isinstance(t, int): - host = Host.objects.filter(pk=t).first() - if not host: - raise ValueError(f'unknown host id: {t!r}') - threads.append(Thread(target=host_executor, args=(q, host, command))) + host = Host.objects.filter(pk=host_id).first() + if not host: + code, duration, out = 1, 0, f'unknown host id for {host_id!r}' else: - raise ValueError(f'invalid target: {t!r}') - for t in threads: - t.start() - return [q.get() for _ in threads] + code, duration, out = host_executor(host, command) + close_old_connections() + with transaction.atomic(): + history = History.objects.select_for_update().get(pk=history_id) + output = json.loads(history.output) + output[str(host_id)] = [code, duration, out] + history.output = json.dumps(output) + if all(output.values()): + history.status = '1' if sum(x[0] for x in output.values()) == 0 else '2' + history.save() + if history.status == '2': + task = Task.objects.get(pk=history.task_id) + send_fail_notify(task) diff --git a/spug_api/apps/schedule/models.py b/spug_api/apps/schedule/models.py index d71f37e..b7ba64c 100644 --- a/spug_api/apps/schedule/models.py +++ b/spug_api/apps/schedule/models.py @@ -9,8 +9,8 @@ import json class History(models.Model, ModelMixin): STATUS = ( - (0, '成功'), - (1, '异常'), + (0, '执行中'), + (1, '成功'), (2, '失败'), ) task_id = models.IntegerField() diff --git a/spug_api/apps/schedule/scheduler.py b/spug_api/apps/schedule/scheduler.py index c0ce920..827d5e3 100644 --- a/spug_api/apps/schedule/scheduler.py +++ b/spug_api/apps/schedule/scheduler.py @@ -6,23 +6,21 @@ from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.cron import CronTrigger -from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED +from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR from django_redis import get_redis_connection from django.utils.functional import SimpleLazyObject from django.db import close_old_connections from apps.schedule.models import Task, History from apps.schedule.utils import send_fail_notify from apps.notify.models import Notify -from apps.schedule.executors import dispatch -from apps.schedule.utils import auto_clean_schedule_history -from apps.alarm.utils import auto_clean_alarm_records -from apps.account.utils import auto_clean_login_history -from apps.deploy.utils import auto_update_status +from apps.schedule.builtin import auto_run_by_day, auto_run_by_minute from django.conf import settings from libs import AttrDict, human_datetime import logging import json +SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY + class Scheduler: timezone = settings.TIME_ZONE @@ -42,7 +40,8 @@ class Scheduler: self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) self.scheduler.add_listener( self._handle_event, - EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED) + EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES + ) @classmethod def parse_trigger(cls, trigger, trigger_args): @@ -71,26 +70,24 @@ class Scheduler: elif event.code == EVENT_JOB_ERROR: logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') send_fail_notify(obj, f'执行异常:{event.exception}') - elif event.code == EVENT_JOB_EXECUTED: - if event.retval: - score = 0 - for item in event.retval: - score += 1 if item[1] else 0 - history = History.objects.create( - task_id=event.job_id, - status=2 if score == len(event.retval) else 1 if score else 0, - run_time=human_datetime(event.scheduled_run_time), - output=json.dumps(event.retval) - ) - Task.objects.filter(pk=event.job_id).update(latest=history) - if score != 0: - send_fail_notify(obj) def _init_builtin_jobs(self): - self.scheduler.add_job(auto_clean_alarm_records, 'cron', hour=0, minute=1) - self.scheduler.add_job(auto_clean_login_history, 'cron', hour=0, minute=2) - self.scheduler.add_job(auto_clean_schedule_history, 'cron', hour=0, minute=3) - self.scheduler.add_job(auto_update_status, 'interval', minutes=5) + self.scheduler.add_job(auto_run_by_day, 'cron', hour=1, minute=20) + self.scheduler.add_job(auto_run_by_minute, 'interval', minutes=5) + + def _dispatch(self, task_id, command, targets): + close_old_connections() + output = {x: None for x in targets} + history = History.objects.create( + task_id=task_id, + status='0', + run_time=human_datetime(), + output=json.dumps(output) + ) + Task.objects.filter(pk=task_id).update(latest_id=history.id) + rds_cli = get_redis_connection() + for t in targets: + rds_cli.rpush(SCHEDULE_WORKER_KEY, json.dumps([history.id, t, command])) def _init(self): self.scheduler.start() @@ -98,10 +95,10 @@ class Scheduler: for task in Task.objects.filter(is_active=True): trigger = self.parse_trigger(task.trigger, task.trigger_args) self.scheduler.add_job( - dispatch, + self._dispatch, trigger, id=str(task.id), - args=(task.command, json.loads(task.targets)), + args=(task.id, task.command, json.loads(task.targets)), ) def run(self): @@ -115,10 +112,10 @@ class Scheduler: if task.action in ('add', 'modify'): trigger = self.parse_trigger(task.trigger, task.trigger_args) self.scheduler.add_job( - dispatch, + self._dispatch, trigger, id=str(task.id), - args=(task.command, task.targets), + args=(task.id, task.command, task.targets), replace_existing=True ) elif task.action == 'remove': diff --git a/spug_api/apps/schedule/utils.py b/spug_api/apps/schedule/utils.py index cfd43ab..f175dda 100644 --- a/spug_api/apps/schedule/utils.py +++ b/spug_api/apps/schedule/utils.py @@ -1,7 +1,6 @@ # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. -from apps.schedule.models import Task, History from apps.notify.models import Notify from libs.utils import human_datetime from threading import Thread @@ -9,15 +8,6 @@ import requests import json -def auto_clean_schedule_history(): - for task in Task.objects.all(): - try: - record = History.objects.filter(task_id=task.id)[50] - History.objects.filter(task_id=task.id, id__lt=record.id).delete() - except IndexError: - pass - - def send_fail_notify(task, msg=None): rst_notify = json.loads(task.rst_notify) mode = rst_notify.get('mode') diff --git a/spug_api/apps/schedule/views.py b/spug_api/apps/schedule/views.py index 022dbfa..561b966 100644 --- a/spug_api/apps/schedule/views.py +++ b/spug_api/apps/schedule/views.py @@ -7,7 +7,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apps.schedule.scheduler import Scheduler from apps.schedule.models import Task, History -from apps.schedule.executors import dispatch +from apps.schedule.executors import local_executor, host_executor from apps.host.models import Host from django.conf import settings from libs import json_response, JsonParser, Argument, human_datetime @@ -110,30 +110,43 @@ class HistoryView(View): task = Task.objects.filter(pk=t_id).first() if not task: return json_response(error='未找到指定任务') - data = dispatch(task.command, json.loads(task.targets), True) - score = 0 - for item in data: - score += 1 if item[1] else 0 + outputs, status = {}, 1 + for host_id in json.loads(task.targets): + if host_id == 'local': + code, duration, out = local_executor(task.command) + else: + host = Host.objects.filter(pk=host_id).first() + if not host: + code, duration, out = 1, 0, f'unknown host id for {host_id!r}' + else: + code, duration, out = host_executor(host, task.command) + if code != 0: + status = 2 + outputs[host_id] = [code, duration, out] + history = History.objects.create( - task_id=t_id, - status=2 if score == len(data) else 1 if score else 0, + task_id=task.id, + status=status, run_time=human_datetime(), - output=json.dumps(data) + output=json.dumps(outputs) ) return json_response(history.id) def _fetch_detail(self, h_id): record = History.objects.filter(pk=h_id).first() outputs = json.loads(record.output) - host_ids = (x[0] for x in outputs if isinstance(x[0], int)) - hosts_info = {x.id: x.name for x in Host.objects.filter(id__in=host_ids)} + host_ids = (x for x in outputs.keys() if x != 'local') + hosts_info = {str(x.id): x.name for x in Host.objects.filter(id__in=host_ids)} data = {'run_time': record.run_time, 'success': 0, 'failure': 0, 'duration': 0, 'outputs': []} - for h_id, code, duration, out in outputs: + for host_id, value in outputs.items(): + if not value: + continue + code, duration, out = value key = 'success' if code == 0 else 'failure' data[key] += 1 data['duration'] += duration data['outputs'].append({ - 'name': hosts_info.get(h_id, '本机'), + 'name': hosts_info.get(host_id, '本机'), 'code': code, 'duration': duration, 'output': out}) diff --git a/spug_api/spug/settings.py b/spug_api/spug/settings.py index e5d77fb..3fed071 100644 --- a/spug_api/spug/settings.py +++ b/spug_api/spug/settings.py @@ -103,7 +103,9 @@ TEMPLATES = [ ] SCHEDULE_KEY = 'spug:schedule' +SCHEDULE_WORKER_KEY = 'spug:schedule:worker' MONITOR_KEY = 'spug:monitor' +MONITOR_WORKER_KEY = 'spug:monitor:worker' REQUEST_KEY = 'spug:request' BUILD_KEY = 'spug:build' REPOS_DIR = os.path.join(BASE_DIR, 'repos') diff --git a/spug_web/src/pages/schedule/Record.js b/spug_web/src/pages/schedule/Record.js index d5acb25..c37a602 100644 --- a/spug_web/src/pages/schedule/Record.js +++ b/spug_web/src/pages/schedule/Record.js @@ -26,7 +26,7 @@ class Record extends React.Component { .finally(() => this.setState({loading: false})) } - colors = ['green', 'orange', 'red']; + colors = ['orange', 'green', 'red']; columns = [{ title: '执行时间', diff --git a/spug_web/src/pages/schedule/Table.js b/spug_web/src/pages/schedule/Table.js index ce20ead..793d95d 100644 --- a/spug_web/src/pages/schedule/Table.js +++ b/spug_web/src/pages/schedule/Table.js @@ -17,7 +17,7 @@ class ComTable extends React.Component { store.fetchRecords() } - colors = ['green', 'orange', 'red']; + colors = ['orange', 'green', 'red']; moreMenus = (info) => ( @@ -25,8 +25,10 @@ class ComTable extends React.Component { this.handleTest(info)}>执行测试 - this.handleActive(info)}>{info.is_active ? '禁用任务' : '激活任务'} + this.handleActive(info)}> + {info.is_active ? '禁用任务' : '激活任务'} store.showRecord(info)}>历史记录 @@ -113,7 +115,7 @@ class ComTable extends React.Component { handleTest = (text) => { Modal.confirm({ title: '操作确认', - content: '立即执行该任务(不影响调度规则,且不会触发失败通知)?', + content: '立即以串行模式执行该任务(不影响调度规则,且不会触发失败通知)?', onOk: () => http.post(`/api/schedule/${text.id}/`, null, {timeout: 120000}) .then(res => store.showInfo(text, res)) })