from flask import Blueprint from flask_restful import reqparse, Resource, Api from flask_apscheduler import APScheduler from config import vendors,regions from units import token_auth,consul_kv import json blueprint = Blueprint('jobs',__name__) api = Api(blueprint) parser = reqparse.RequestParser() parser.add_argument('job_id',type=str) parser.add_argument('job_dict',type=dict) parser.add_argument('query_dict',type=str) def init(): global Scheduler Scheduler = APScheduler() return Scheduler def deljob(jobid): Scheduler.remove_job(jobid) def addjob(job_id,job_func,job_args,job_interval): Scheduler.add_job(id=job_id, func=job_func, args=job_args, trigger='interval', minutes=job_interval, replace_existing=True) def runjob(jobid): Scheduler.run_job(jobid) class Jobs(Resource): decorators = [token_auth.auth.login_required] def get(self): args = parser.parse_args() query_dict = json.loads(args['query_dict']) if query_dict['vendor'] != '': query_dict['vendor'] = {v : k for k, v in vendors.items()}[query_dict['vendor']] query_set = set({k:v for k,v in query_dict.items() if v != ''}.items()) job_list = list(consul_kv.get_kv_dict(f'ConsulManager/jobs').values()) job_run_dict = {job.id:job.next_run_time.strftime("%m%d/%H:%M") for job in Scheduler.get_jobs()} job_count_dict = consul_kv.get_kv_dict('ConsulManager/record/jobs') jobs = [] for i in job_list: vendor,account,itype = i['id'].split('/')[0:3] job_info_dict = {'vendor':vendor,'account':account,'itype':itype} if query_set.issubset(job_info_dict.items()): pass else: continue region = i['args'][-1] if len(i['args']) == 2 else 'none' interval = i['minutes'] if f'ConsulManager/record/jobs/{i["id"]}' in job_count_dict: count = job_count_dict[f'ConsulManager/record/jobs/{i["id"]}']['count'] runtime = job_count_dict[f'ConsulManager/record/jobs/{i["id"]}']['update'] on = job_count_dict[f'ConsulManager/record/jobs/{i["id"]}'].get('on',0) off = job_count_dict[f'ConsulManager/record/jobs/{i["id"]}'].get('off',0) else: count = '无' runtime = '无' on,off = 0,0 jobs.append({'region':regions[vendor][region],'vendor':vendors[vendor],'account':account,'itype':itype, 'interval':interval,'jobid':i['id'],'nextime':job_run_dict[i['id']],'on':on,'off':off, 'count':count, 'runtime':runtime}) vendor_list = sorted(list(set([i['vendor'] for i in jobs]))) account_list = sorted(list(set([i['account'] for i in jobs]))) itype_list = sorted(list(set([i['itype'] for i in jobs]))) return {'code': 20000,'all_jobs':jobs,'vendor_list':vendor_list,'account_list':account_list,'itype_list':itype_list} def post(self): args = parser.parse_args() job_dict = args['job_dict'] job_status = job_dict['dialogStatus'] if job_status == 'create': ak = job_dict['ak'] sk = job_dict['sk'] consul_kv.put_aksk(job_dict['vendor'],job_dict['account'],ak,sk) proj_job_id = f"{job_dict['vendor']}/{job_dict['account']}/group" proj_job_func = f"__main__:{job_dict['vendor']}.group" proj_job_args = [job_dict['account']] proj_job_interval = int(job_dict['proj_interval']) ecs_job_id = f"{job_dict['vendor']}/{job_dict['account']}/ecs/{job_dict['region']}" ecs_job_func = f"__main__:{job_dict['vendor']}.ecs" ecs_job_args = [job_dict['account'],job_dict['region']] ecs_job_interval = int(job_dict['ecs_interval']) Scheduler.add_job(id=proj_job_id, func=proj_job_func, args=proj_job_args, trigger='interval', minutes=proj_job_interval, replace_existing=True) Scheduler.add_job(id=ecs_job_id, func=ecs_job_func, args=ecs_job_args, trigger='interval', minutes=ecs_job_interval, replace_existing=True) proj_job_dict = {'id':proj_job_id,'func':proj_job_func,'args':proj_job_args,'minutes':proj_job_interval, "trigger": "interval","replace_existing": True} Scheduler.run_job(proj_job_id) ecs_job_dict = {'id':ecs_job_id,'func':ecs_job_func,'args':ecs_job_args,'minutes':ecs_job_interval, "trigger": "interval","replace_existing": True} record_dict = consul_kv.get_value(f"ConsulManager/record/jobs/{proj_job_id}") if record_dict['status'] == 20000: consul_kv.put_kv(f'ConsulManager/jobs/{proj_job_id}',proj_job_dict) consul_kv.put_kv(f'ConsulManager/jobs/{ecs_job_id}',ecs_job_dict) else: Scheduler.remove_job(proj_job_id) Scheduler.remove_job(ecs_job_id) return {'code': record_dict['status'], 'data': f"{record_dict['update']}:{record_dict['msg']}"} elif job_status == 'update': jobid = job_dict['jobid'] interval = int(job_dict['interval']) Scheduler.modify_job(jobid,trigger='interval',minutes=interval) upjob_dict = consul_kv.get_value(f'ConsulManager/jobs/{jobid}') upjob_dict['minutes'] = interval consul_kv.put_kv(f'ConsulManager/jobs/{jobid}',upjob_dict) return {'code': 20000, 'data': '更新成功!'} elif job_status == 'run': Scheduler.run_job(job_dict['jobid']) record_dict = consul_kv.get_value(f"ConsulManager/record/jobs/{job_dict['jobid']}") return {'code': record_dict['status'], 'data': f"{record_dict['update']}:{record_dict['msg']}"} def delete(self): args = parser.parse_args() job_id = args['job_id'] Scheduler.remove_job(job_id) del_job = consul_kv.del_key(f'ConsulManager/jobs/{job_id}') return {'code': 20000, 'data': '删除成功!'} api.add_resource(Jobs, '/api/jobs')