diff --git a/spug_api/apps/monitor/executors.py b/spug_api/apps/monitor/executors.py index ee689d2..5630d68 100644 --- a/spug_api/apps/monitor/executors.py +++ b/spug_api/apps/monitor/executors.py @@ -1,7 +1,6 @@ # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. -from django.db import close_old_connections from django_redis import get_redis_connection from apps.host.models import Host from apps.monitor.utils import handle_notify @@ -84,7 +83,6 @@ def monitor_worker_handler(job): elif tp not in ('3', '4'): is_ok, message = False, f'invalid monitor type for {tp!r}' else: - close_old_connections() command = f'ps -ef|grep -v grep|grep {extra!r}' if tp == '3' else extra host = Host.objects.filter(pk=addr).first() if not host: diff --git a/spug_api/apps/monitor/scheduler.py b/spug_api/apps/monitor/scheduler.py index a4c21d9..148fdbf 100644 --- a/spug_api/apps/monitor/scheduler.py +++ b/spug_api/apps/monitor/scheduler.py @@ -4,13 +4,10 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from django_redis import get_redis_connection -from django.utils.functional import SimpleLazyObject +from django.conf import settings from django.db import connections from apps.monitor.models import Detection -from apps.notify.models import Notify -from django.conf import settings from libs import AttrDict, human_datetime from datetime import datetime, timedelta from random import randint @@ -25,29 +22,13 @@ class Scheduler: def __init__(self): self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) - self.scheduler.add_listener( - self._handle_event, - EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED - ) - - def _handle_event(self, event): - obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first()) - if event.code == EVENT_SCHEDULER_SHUTDOWN: - logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}') - Notify.make_notify('monitor', '1', '调度器已关闭', '调度器意外关闭,你可以在github上提交issue', False) - elif event.code == EVENT_JOB_MAX_INSTANCES: - logging.warning(f'EVENT_JOB_MAX_INSTANCES: {event}') - Notify.make_notify('monitor', '1', f'{obj.name} - 达到调度实例上限', '一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时') - elif event.code == EVENT_JOB_ERROR: - logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') - Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}') - connections.close_all() def _dispatch(self, task_id, tp, targets, extra, threshold, quiet): Detection.objects.filter(pk=task_id).update(latest_run_time=human_datetime()) rds_cli = get_redis_connection() for t in json.loads(targets): rds_cli.rpush(MONITOR_WORKER_KEY, json.dumps([task_id, tp, t, extra, threshold, quiet])) + connections.close_all() def _init(self): self.scheduler.start() @@ -61,6 +42,7 @@ class Scheduler: args=(item.id, item.type, item.targets, item.extra, item.threshold, item.quiet), next_run_time=now + timedelta(seconds=randint(0, 60)) ) + connections.close_all() def run(self): rds_cli = get_redis_connection() diff --git a/spug_api/apps/monitor/utils.py b/spug_api/apps/monitor/utils.py index 2fe9e5d..2231e10 100644 --- a/spug_api/apps/monitor/utils.py +++ b/spug_api/apps/monitor/utils.py @@ -40,4 +40,4 @@ def handle_notify(task_id, target, is_ok, out, fault_times): _record_alarm(det, target, duration, event) grp = json.loads(det.notify_grp) notify = Notification(grp, event, target, det.name, out, duration) - notify.dispatch(json.loads(det.notify_mode)) + notify.dispatch_monitor(json.loads(det.notify_mode)) diff --git a/spug_api/libs/spug.py b/spug_api/libs/spug.py index 622de62..1144d33 100644 --- a/spug_api/libs/spug.py +++ b/spug_api/libs/spug.py @@ -43,130 +43,133 @@ class Notification: try: res = requests.post(url, json=data, timeout=30) except Exception as e: - Notify.make_notify(notify_source, '1', '通知发送失败', f'接口调用异常:{e}') - return + return Notify.make_system_notify('通知发送失败', f'接口调用异常: {e}') if res.status_code != 200: - Notify.make_notify(notify_source, '1', '通知发送失败', f'返回状态码:{res.status_code}, 请求URL:{res.url}') + return Notify.make_system_notify('通知发送失败', f'返回状态码:{res.status_code}, 请求URL:{res.url}') + if mode in ['dd', 'wx']: res = res.json() - if res.get('errcode') != 0: - Notify.make_notify(notify_source, '1', '通知发送失败', f'返回数据:{res}') + if res.get('errcode') == 0: + return elif mode == 'spug': res = res.json() - if res.get('error'): - Notify.make_notify(notify_source, '1', '通知发送失败', f'错误信息:{res}') + if not res.get('error'): + return elif mode == 'fs': res = res.json() - if res.get('StatusCode') != 0: - Notify.make_notify(notify_source, '1', '通知发送失败', f'错误信息:{res}') + if res.get('StatusCode') == 0: + return + else: + raise NotImplementedError + Notify.make_system_notify('通知发送失败', f'返回数据:{res}') - def _by_wx(self): + def _monitor_by_wx(self, users): if not self.spug_key: - Notify.make_notify(notify_source, '1', '发送报警信息失败', '未配置报警服务调用凭据,请在系统管理/系统设置/基本设置/调用凭据中配置。') + Notify.make_monitor_notify('发送报警信息失败', '未配置报警服务调用凭据,请在系统管理/系统设置/基本设置/调用凭据中配置。') return - users = set(x.wx_token for x in Contact.objects.filter(id__in=self.u_ids, wx_token__isnull=False)) - if users: + data = { + 'token': self.spug_key, + 'event': self.event, + 'subject': f'{self.title} >> {self.target}', + 'desc': self.message, + 'remark': f'故障持续{self.duration}' if self.event == '2' else None, + 'users': list(users) + } + self.handle_request(f'{spug_server}/apis/notify/wx/', data, 'spug') + + def _monitor_by_email(self, users): + mail_service = AppSetting.get_default('mail_service', {}) + body = [ + f'告警名称:{self.title}', + f'告警对象:{self.target}', + f'{"告警" if self.event == "1" else "恢复"}时间:{human_datetime()}', + f'告警描述:{self.message}' + ] + if self.event == '2': + body.append('故障持续:' + self.duration) + if mail_service.get('server'): + event_map = {'1': '监控告警通知', '2': '告警恢复通知'} + subject = f'{event_map[self.event]}-{self.title}' + mail = Mail(**mail_service) + mail.send_text_mail(users, subject, '\r\n'.join(body) + '\r\n\r\n自动发送,请勿回复。') + elif self.spug_key: data = { 'token': self.spug_key, 'event': self.event, - 'subject': f'{self.title} >> {self.target}', - 'desc': self.message, - 'remark': f'故障持续{self.duration}' if self.event == '2' else None, + 'subject': self.title, + 'body': '\r\n'.join(body), 'users': list(users) } - self.handle_request(f'{spug_server}/apis/notify/wx/', data, 'spug') + self.handle_request(f'{spug_server}/apis/notify/mail/', data, 'spug') else: - Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的微信Token。') + Notify.make_monitor_notify('发送报警信息失败', '未配置报警服务调用凭据,请在系统管理/系统设置/报警服务设置中配置。') - def _by_email(self): - users = set(x.email for x in Contact.objects.filter(id__in=self.u_ids, email__isnull=False)) - if users: - mail_service = AppSetting.get_default('mail_service', {}) - body = [ - f'告警名称:{self.title}', - f'告警对象:{self.target}', - f'{"告警" if self.event == "1" else "恢复"}时间:{human_datetime()}', - f'告警描述:{self.message}' - ] - if self.event == '2': - body.append('故障持续:' + self.duration) - if mail_service.get('server'): - event_map = {'1': '监控告警通知', '2': '告警恢复通知'} - subject = f'{event_map[self.event]}-{self.title}' - mail = Mail(**mail_service) - mail.send_text_mail(users, subject, '\r\n'.join(body) + '\r\n\r\n自动发送,请勿回复。') - elif self.spug_key: - data = { - 'token': self.spug_key, - 'event': self.event, - 'subject': self.title, - 'body': '\r\n'.join(body), - 'users': list(users) - } - self.handle_request(f'{spug_server}/apis/notify/mail/', data, 'spug') - else: - Notify.make_notify(notify_source, '1', '发送报警信息失败', '未配置报警服务调用凭据,请在系统管理/系统设置/报警服务设置中配置。') - else: - Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的邮件地址。') - - def _by_dd(self): - users = set(x.ding for x in Contact.objects.filter(id__in=self.u_ids, ding__isnull=False)) - if users: - texts = [ - '## %s ## ' % ('监控告警通知' if self.event == '1' else '告警恢复通知'), - f'**告警名称:** {self.title} ', - f'**告警对象:** {self.target} ', - f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ', - f'**告警描述:** {self.message} ', - ] - if self.event == '2': - texts.append(f'**持续时间:** {self.duration} ') - data = { - 'msgtype': 'markdown', - 'markdown': { - 'title': '监控告警通知', - 'text': '\n\n'.join(texts) + '\n\n> ###### 来自 Spug运维平台' - }, - 'at': { - 'isAtAll': True - } + def _monitor_by_dd(self, users): + texts = [ + '## %s ## ' % ('监控告警通知' if self.event == '1' else '告警恢复通知'), + f'**告警名称:** {self.title} ', + f'**告警对象:** {self.target} ', + f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ', + f'**告警描述:** {self.message} ', + ] + if self.event == '2': + texts.append(f'**持续时间:** {self.duration} ') + data = { + 'msgtype': 'markdown', + 'markdown': { + 'title': '监控告警通知', + 'text': '\n\n'.join(texts) + '\n\n> ###### 来自 Spug运维平台' + }, + 'at': { + 'isAtAll': True } - for url in users: - self.handle_request(url, data, 'dd') - else: - Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的钉钉。') + } + for url in users: + self.handle_request(url, data, 'dd') - def _by_qy_wx(self): - users = set(x.qy_wx for x in Contact.objects.filter(id__in=self.u_ids, qy_wx__isnull=False)) - if users: - color, title = ('warning', '监控告警通知') if self.event == '1' else ('info', '告警恢复通知') - texts = [ - f'## {title}', - f'**告警名称:** {self.title} ', - f'**告警对象:** {self.target}', - f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ', - f'**告警描述:** {self.message} ', - ] - if self.event == '2': - texts.append(f'**持续时间:** {self.duration} ') - data = { - 'msgtype': 'markdown', - 'markdown': { - 'content': '\n'.join(texts) + '\n> 来自 Spug运维平台' - } + def _monitor_by_qy_wx(self, users): + color, title = ('warning', '监控告警通知') if self.event == '1' else ('info', '告警恢复通知') + texts = [ + f'## {title}', + f'**告警名称:** {self.title} ', + f'**告警对象:** {self.target}', + f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ', + f'**告警描述:** {self.message} ', + ] + if self.event == '2': + texts.append(f'**持续时间:** {self.duration} ') + data = { + 'msgtype': 'markdown', + 'markdown': { + 'content': '\n'.join(texts) + '\n> 来自 Spug运维平台' } - for url in users: - self.handle_request(url, data, 'wx') - else: - Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的企业微信。') + } + for url in users: + self.handle_request(url, data, 'wx') - def dispatch(self, modes): + def dispatch_monitor(self, modes): for mode in modes: if mode == '1': - Thread(target=self._by_wx).start() + users = set(x.wx_token for x in Contact.objects.filter(id__in=self.u_ids, wx_token__isnull=False)) + if not users: + Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的微信Token。') + continue + self._monitor_by_wx(users) elif mode == '3': - Thread(target=self._by_dd).start() + users = set(x.ding for x in Contact.objects.filter(id__in=self.u_ids, ding__isnull=False)) + if not users: + Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的钉钉。') + continue + self._monitor_by_dd(users) elif mode == '4': - Thread(target=self._by_email).start() + users = set(x.email for x in Contact.objects.filter(id__in=self.u_ids, email__isnull=False)) + if not users: + Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的邮件地址。') + continue + self._monitor_by_email(users) elif mode == '5': - Thread(target=self._by_qy_wx).start() + users = set(x.qy_wx for x in Contact.objects.filter(id__in=self.u_ids, qy_wx__isnull=False)) + if not users: + Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的企业微信。') + continue + self._monitor_by_qy_wx(users)