# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. from django.views.generic import View from django_redis import get_redis_connection from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apps.schedule.scheduler import Scheduler from apps.schedule.models import Task, History from apps.schedule.executors import dispatch from apps.host.models import Host from django.conf import settings from libs import json_response, JsonParser, Argument, human_datetime import json class Schedule(View): def get(self, request): tasks = Task.objects.all() types = [x['type'] for x in tasks.order_by('type').values('type').distinct()] return json_response({'types': types, 'tasks': [x.to_dict() for x in tasks]}) def post(self, request): form, error = JsonParser( Argument('id', type=int, required=False), Argument('type', help='请输入任务类型'), Argument('name', help='请输入任务名称'), Argument('command', help='请输入任务内容'), Argument('rst_notify', type=dict, help='请选择执行失败通知方式'), Argument('targets', type=list, filter=lambda x: len(x), help='请选择执行对象'), Argument('trigger', filter=lambda x: x in dict(Task.TRIGGERS), help='请选择触发器类型'), Argument('trigger_args', help='请输入触发器参数'), Argument('desc', required=False), ).parse(request.body) if error is None: form.targets = json.dumps(form.targets) form.rst_notify = json.dumps(form.rst_notify) if form.trigger == 'cron': args = json.loads(form.trigger_args)['rule'].split() if len(args) != 5: return json_response(error='无效的执行规则,请更正后再试') minute, hour, day, month, week = args week = '0' if week == '7' else week try: CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=week) except ValueError: return json_response(error='无效的执行规则,请更正后再试') if form.id: Task.objects.filter(pk=form.id).update( updated_at=human_datetime(), updated_by=request.user, **form ) task = Task.objects.filter(pk=form.id).first() if task and task.is_active: form.action = 'modify' form.targets = json.loads(form.targets) rds_cli = get_redis_connection() rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(form)) else: Task.objects.create(created_by=request.user, **form) return json_response(error=error) def patch(self, request): form, error = JsonParser( Argument('id', type=int, help='请指定操作对象'), Argument('is_active', type=bool, required=False) ).parse(request.body, True) if error is None: Task.objects.filter(pk=form.id).update(**form) if form.get('is_active') is not None: if form.is_active: task = Task.objects.filter(pk=form.id).first() message = {'id': form.id, 'action': 'add'} message.update(task.to_dict(selects=('trigger', 'trigger_args', 'command', 'targets'))) else: message = {'id': form.id, 'action': 'remove'} rds_cli = get_redis_connection() rds_cli.lpush(settings.SCHEDULE_KEY, json.dumps(message)) return json_response(error=error) def delete(self, request): form, error = JsonParser( Argument('id', type=int, help='请指定操作对象') ).parse(request.GET) if error is None: task = Task.objects.filter(pk=form.id).first() if task: if task.is_active: return json_response(error='该任务在运行中,请先停止任务再尝试删除') task.delete() History.objects.filter(task_id=task.id).delete() return json_response(error=error) class HistoryView(View): def get(self, request, t_id): task = Task.objects.filter(pk=t_id).first() if not task: return json_response(error='未找到指定任务') h_id = request.GET.get('id') if h_id: h_id = task.latest_id if h_id == 'latest' else h_id return json_response(self._fetch_detail(h_id)) histories = History.objects.filter(task_id=t_id) return json_response([x.to_list() for x in histories]) def post(self, request, t_id): task = Task.objects.filter(pk=t_id).first() if not task: return json_response(error='未找到指定任务') data = dispatch(task.command, json.loads(task.targets), True) score = 0 for item in data: score += 1 if item[1] else 0 history = History.objects.create( task_id=t_id, status=2 if score == len(data) else 1 if score else 0, run_time=human_datetime(), output=json.dumps(data) ) return json_response(history.id) def _fetch_detail(self, h_id): record = History.objects.filter(pk=h_id).first() outputs = json.loads(record.output) host_ids = (x[0] for x in outputs if isinstance(x[0], int)) hosts_info = {x.id: x.name for x in Host.objects.filter(id__in=host_ids)} data = {'run_time': record.run_time, 'success': 0, 'failure': 0, 'duration': 0, 'outputs': []} for h_id, code, duration, out in outputs: key = 'success' if code == 0 else 'failure' data[key] += 1 data['duration'] += duration data['outputs'].append({ 'name': hosts_info.get(h_id, '本机'), 'code': code, 'duration': duration, 'output': out}) data['duration'] = f"{data['duration'] / len(outputs):.3f}" return data def next_run_time(request): form, error = JsonParser( Argument('rule', help='参数错误'), Argument('start', required=False), Argument('stop', required=False) ).parse(request.body) if error is None: try: minute, hour, day, month, week = form.rule.split() week = Scheduler.covert_week(week) trigger = CronTrigger(minute=minute, hour=hour, day=day, month=month, day_of_week=week, start_date=form.start, end_date=form.stop) except (ValueError, KeyError): return json_response({'success': False, 'msg': '无效的执行规则'}) scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE) scheduler.start() job = scheduler.add_job(lambda: None, trigger) run_time = job.next_run_time scheduler.shutdown() if run_time: return json_response({'success': True, 'msg': run_time.strftime('%Y-%m-%d %H:%M:%S')}) else: return json_response({'success': False, 'msg': '无法被触发'}) return json_response(error=error)