pull/418/head
vapao 2021-12-08 15:03:30 +08:00
parent 6028391f21
commit 69a8940b5e
4 changed files with 108 additions and 125 deletions

View File

@ -1,7 +1,6 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com> # Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License. # Released under the AGPL-3.0 License.
from django.db import close_old_connections
from django_redis import get_redis_connection from django_redis import get_redis_connection
from apps.host.models import Host from apps.host.models import Host
from apps.monitor.utils import handle_notify from apps.monitor.utils import handle_notify
@ -84,7 +83,6 @@ def monitor_worker_handler(job):
elif tp not in ('3', '4'): elif tp not in ('3', '4'):
is_ok, message = False, f'invalid monitor type for {tp!r}' is_ok, message = False, f'invalid monitor type for {tp!r}'
else: else:
close_old_connections()
command = f'ps -ef|grep -v grep|grep {extra!r}' if tp == '3' else extra command = f'ps -ef|grep -v grep|grep {extra!r}' if tp == '3' else extra
host = Host.objects.filter(pk=addr).first() host = Host.objects.filter(pk=addr).first()
if not host: if not host:

View File

@ -4,13 +4,10 @@
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.events import EVENT_SCHEDULER_SHUTDOWN, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from django_redis import get_redis_connection from django_redis import get_redis_connection
from django.utils.functional import SimpleLazyObject from django.conf import settings
from django.db import connections from django.db import connections
from apps.monitor.models import Detection from apps.monitor.models import Detection
from apps.notify.models import Notify
from django.conf import settings
from libs import AttrDict, human_datetime from libs import AttrDict, human_datetime
from datetime import datetime, timedelta from datetime import datetime, timedelta
from random import randint from random import randint
@ -25,29 +22,13 @@ class Scheduler:
def __init__(self): def __init__(self):
self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)}) self.scheduler = BackgroundScheduler(timezone=self.timezone, executors={'default': ThreadPoolExecutor(30)})
self.scheduler.add_listener(
self._handle_event,
EVENT_SCHEDULER_SHUTDOWN | EVENT_JOB_ERROR | EVENT_JOB_MAX_INSTANCES | EVENT_JOB_EXECUTED
)
def _handle_event(self, event):
obj = SimpleLazyObject(lambda: Detection.objects.filter(pk=event.job_id).first())
if event.code == EVENT_SCHEDULER_SHUTDOWN:
logging.warning(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
Notify.make_notify('monitor', '1', '调度器已关闭', '调度器意外关闭你可以在github上提交issue', False)
elif event.code == EVENT_JOB_MAX_INSTANCES:
logging.warning(f'EVENT_JOB_MAX_INSTANCES: {event}')
Notify.make_notify('monitor', '1', f'{obj.name} - 达到调度实例上限', '一般为上个周期的执行任务还未结束,请增加调度间隔或减少任务执行耗时')
elif event.code == EVENT_JOB_ERROR:
logging.warning(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
Notify.make_notify('monitor', '1', f'{obj.name} - 执行异常', f'{event.exception}')
connections.close_all()
def _dispatch(self, task_id, tp, targets, extra, threshold, quiet): def _dispatch(self, task_id, tp, targets, extra, threshold, quiet):
Detection.objects.filter(pk=task_id).update(latest_run_time=human_datetime()) Detection.objects.filter(pk=task_id).update(latest_run_time=human_datetime())
rds_cli = get_redis_connection() rds_cli = get_redis_connection()
for t in json.loads(targets): for t in json.loads(targets):
rds_cli.rpush(MONITOR_WORKER_KEY, json.dumps([task_id, tp, t, extra, threshold, quiet])) rds_cli.rpush(MONITOR_WORKER_KEY, json.dumps([task_id, tp, t, extra, threshold, quiet]))
connections.close_all()
def _init(self): def _init(self):
self.scheduler.start() self.scheduler.start()
@ -61,6 +42,7 @@ class Scheduler:
args=(item.id, item.type, item.targets, item.extra, item.threshold, item.quiet), args=(item.id, item.type, item.targets, item.extra, item.threshold, item.quiet),
next_run_time=now + timedelta(seconds=randint(0, 60)) next_run_time=now + timedelta(seconds=randint(0, 60))
) )
connections.close_all()
def run(self): def run(self):
rds_cli = get_redis_connection() rds_cli = get_redis_connection()

View File

@ -40,4 +40,4 @@ def handle_notify(task_id, target, is_ok, out, fault_times):
_record_alarm(det, target, duration, event) _record_alarm(det, target, duration, event)
grp = json.loads(det.notify_grp) grp = json.loads(det.notify_grp)
notify = Notification(grp, event, target, det.name, out, duration) notify = Notification(grp, event, target, det.name, out, duration)
notify.dispatch(json.loads(det.notify_mode)) notify.dispatch_monitor(json.loads(det.notify_mode))

View File

@ -43,130 +43,133 @@ class Notification:
try: try:
res = requests.post(url, json=data, timeout=30) res = requests.post(url, json=data, timeout=30)
except Exception as e: except Exception as e:
Notify.make_notify(notify_source, '1', '通知发送失败', f'接口调用异常:{e}') return Notify.make_system_notify('通知发送失败', f'接口调用异常: {e}')
return
if res.status_code != 200: if res.status_code != 200:
Notify.make_notify(notify_source, '1', '通知发送失败', f'返回状态码:{res.status_code}, 请求URL{res.url}') return Notify.make_system_notify('通知发送失败', f'返回状态码:{res.status_code}, 请求URL{res.url}')
if mode in ['dd', 'wx']: if mode in ['dd', 'wx']:
res = res.json() res = res.json()
if res.get('errcode') != 0: if res.get('errcode') == 0:
Notify.make_notify(notify_source, '1', '通知发送失败', f'返回数据:{res}') return
elif mode == 'spug': elif mode == 'spug':
res = res.json() res = res.json()
if res.get('error'): if not res.get('error'):
Notify.make_notify(notify_source, '1', '通知发送失败', f'错误信息:{res}') return
elif mode == 'fs': elif mode == 'fs':
res = res.json() res = res.json()
if res.get('StatusCode') != 0: if res.get('StatusCode') == 0:
Notify.make_notify(notify_source, '1', '通知发送失败', f'错误信息:{res}') return
else:
raise NotImplementedError
Notify.make_system_notify('通知发送失败', f'返回数据:{res}')
def _by_wx(self): def _monitor_by_wx(self, users):
if not self.spug_key: if not self.spug_key:
Notify.make_notify(notify_source, '1', '发送报警信息失败', '未配置报警服务调用凭据,请在系统管理/系统设置/基本设置/调用凭据中配置。') Notify.make_monitor_notify('发送报警信息失败', '未配置报警服务调用凭据,请在系统管理/系统设置/基本设置/调用凭据中配置。')
return return
users = set(x.wx_token for x in Contact.objects.filter(id__in=self.u_ids, wx_token__isnull=False)) data = {
if users: 'token': self.spug_key,
'event': self.event,
'subject': f'{self.title} >> {self.target}',
'desc': self.message,
'remark': f'故障持续{self.duration}' if self.event == '2' else None,
'users': list(users)
}
self.handle_request(f'{spug_server}/apis/notify/wx/', data, 'spug')
def _monitor_by_email(self, users):
mail_service = AppSetting.get_default('mail_service', {})
body = [
f'告警名称:{self.title}',
f'告警对象:{self.target}',
f'{"告警" if self.event == "1" else "恢复"}时间:{human_datetime()}',
f'告警描述:{self.message}'
]
if self.event == '2':
body.append('故障持续:' + self.duration)
if mail_service.get('server'):
event_map = {'1': '监控告警通知', '2': '告警恢复通知'}
subject = f'{event_map[self.event]}-{self.title}'
mail = Mail(**mail_service)
mail.send_text_mail(users, subject, '\r\n'.join(body) + '\r\n\r\n自动发送,请勿回复。')
elif self.spug_key:
data = { data = {
'token': self.spug_key, 'token': self.spug_key,
'event': self.event, 'event': self.event,
'subject': f'{self.title} >> {self.target}', 'subject': self.title,
'desc': self.message, 'body': '\r\n'.join(body),
'remark': f'故障持续{self.duration}' if self.event == '2' else None,
'users': list(users) 'users': list(users)
} }
self.handle_request(f'{spug_server}/apis/notify/wx/', data, 'spug') self.handle_request(f'{spug_server}/apis/notify/mail/', data, 'spug')
else: else:
Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象请确保设置了相关报警联系人的微信Token。') Notify.make_monitor_notify('发送报警信息失败', '配置报警服务调用凭据,请在系统管理/系统设置/报警服务设置中配置')
def _by_email(self): def _monitor_by_dd(self, users):
users = set(x.email for x in Contact.objects.filter(id__in=self.u_ids, email__isnull=False)) texts = [
if users: '## %s ## ' % ('监控告警通知' if self.event == '1' else '告警恢复通知'),
mail_service = AppSetting.get_default('mail_service', {}) f'**告警名称:** <font color="#{"f90202" if self.event == "1" else "008000"}">{self.title}</font> ',
body = [ f'**告警对象:** {self.target} ',
f'告警名称:{self.title}', f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ',
f'告警对象:{self.target}', f'**告警描述:** {self.message} ',
f'{"告警" if self.event == "1" else "恢复"}时间:{human_datetime()}', ]
f'告警描述:{self.message}' if self.event == '2':
] texts.append(f'**持续时间:** {self.duration} ')
if self.event == '2': data = {
body.append('故障持续:' + self.duration) 'msgtype': 'markdown',
if mail_service.get('server'): 'markdown': {
event_map = {'1': '监控告警通知', '2': '告警恢复通知'} 'title': '监控告警通知',
subject = f'{event_map[self.event]}-{self.title}' 'text': '\n\n'.join(texts) + '\n\n> ###### 来自 Spug运维平台'
mail = Mail(**mail_service) },
mail.send_text_mail(users, subject, '\r\n'.join(body) + '\r\n\r\n自动发送,请勿回复。') 'at': {
elif self.spug_key: 'isAtAll': True
data = {
'token': self.spug_key,
'event': self.event,
'subject': self.title,
'body': '\r\n'.join(body),
'users': list(users)
}
self.handle_request(f'{spug_server}/apis/notify/mail/', data, 'spug')
else:
Notify.make_notify(notify_source, '1', '发送报警信息失败', '未配置报警服务调用凭据,请在系统管理/系统设置/报警服务设置中配置。')
else:
Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的邮件地址。')
def _by_dd(self):
users = set(x.ding for x in Contact.objects.filter(id__in=self.u_ids, ding__isnull=False))
if users:
texts = [
'## %s ## ' % ('监控告警通知' if self.event == '1' else '告警恢复通知'),
f'**告警名称:** <font color="#{"f90202" if self.event == "1" else "008000"}">{self.title}</font> ',
f'**告警对象:** {self.target} ',
f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ',
f'**告警描述:** {self.message} ',
]
if self.event == '2':
texts.append(f'**持续时间:** {self.duration} ')
data = {
'msgtype': 'markdown',
'markdown': {
'title': '监控告警通知',
'text': '\n\n'.join(texts) + '\n\n> ###### 来自 Spug运维平台'
},
'at': {
'isAtAll': True
}
} }
for url in users: }
self.handle_request(url, data, 'dd') for url in users:
else: self.handle_request(url, data, 'dd')
Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的钉钉。')
def _by_qy_wx(self): def _monitor_by_qy_wx(self, users):
users = set(x.qy_wx for x in Contact.objects.filter(id__in=self.u_ids, qy_wx__isnull=False)) color, title = ('warning', '监控告警通知') if self.event == '1' else ('info', '告警恢复通知')
if users: texts = [
color, title = ('warning', '监控告警通知') if self.event == '1' else ('info', '告警恢复通知') f'## {title}',
texts = [ f'**告警名称:** <font color="{color}">{self.title}</font> ',
f'## {title}', f'**告警对象:** {self.target}',
f'**告警名称:** <font color="{color}">{self.title}</font> ', f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ',
f'**告警对象:** {self.target}', f'**告警描述:** {self.message} ',
f'**{"告警" if self.event == "1" else "恢复"}时间:** {human_datetime()} ', ]
f'**告警描述:** {self.message} ', if self.event == '2':
] texts.append(f'**持续时间:** {self.duration} ')
if self.event == '2': data = {
texts.append(f'**持续时间:** {self.duration} ') 'msgtype': 'markdown',
data = { 'markdown': {
'msgtype': 'markdown', 'content': '\n'.join(texts) + '\n> 来自 Spug运维平台'
'markdown': {
'content': '\n'.join(texts) + '\n> 来自 Spug运维平台'
}
} }
for url in users: }
self.handle_request(url, data, 'wx') for url in users:
else: self.handle_request(url, data, 'wx')
Notify.make_notify(notify_source, '1', '发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的企业微信。')
def dispatch(self, modes): def dispatch_monitor(self, modes):
for mode in modes: for mode in modes:
if mode == '1': if mode == '1':
Thread(target=self._by_wx).start() users = set(x.wx_token for x in Contact.objects.filter(id__in=self.u_ids, wx_token__isnull=False))
if not users:
Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象请确保设置了相关报警联系人的微信Token。')
continue
self._monitor_by_wx(users)
elif mode == '3': elif mode == '3':
Thread(target=self._by_dd).start() users = set(x.ding for x in Contact.objects.filter(id__in=self.u_ids, ding__isnull=False))
if not users:
Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的钉钉。')
continue
self._monitor_by_dd(users)
elif mode == '4': elif mode == '4':
Thread(target=self._by_email).start() users = set(x.email for x in Contact.objects.filter(id__in=self.u_ids, email__isnull=False))
if not users:
Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的邮件地址。')
continue
self._monitor_by_email(users)
elif mode == '5': elif mode == '5':
Thread(target=self._by_qy_wx).start() users = set(x.qy_wx for x in Contact.objects.filter(id__in=self.u_ids, qy_wx__isnull=False))
if not users:
Notify.make_monitor_notify('发送报警信息失败', '未找到可用的通知对象,请确保设置了相关报警联系人的企业微信。')
continue
self._monitor_by_qy_wx(users)