diff --git a/spug_api/apps/monitor/executors.py b/spug_api/apps/monitor/executors.py new file mode 100644 index 0000000..fe5814b --- /dev/null +++ b/spug_api/apps/monitor/executors.py @@ -0,0 +1,48 @@ +from libs.ssh import SSH +from apps.host.models import Host +from apps.setting.utils import AppSetting +from socket import socket +import requests +import logging + +logging.captureWarnings(True) + + +def site_check(url): + status_code = -1 + try: + res = requests.get(url, timeout=10, verify=False) + status_code = res.status_code + finally: + return status_code == 200 + + +def port_check(addr, port): + sock = socket() + sock.settimeout(5) + return sock.connect_ex((addr, int(port))) == 0 + + +def host_executor(host, pkey, command): + exit_code = -1 + try: + cli = SSH(host.hostname, host.port, host.username, pkey=pkey) + exit_code, _ = cli.exec_command(command) + finally: + return exit_code == 0 + + +def dispatch(tp, addr, extra): + if tp == '1': + return site_check(addr) + elif tp == '2': + return port_check(addr, extra) + elif tp == '3': + command = f'ps -ef|grep -v grep|grep {extra!r}' + elif tp == '4': + command = extra + else: + raise TypeError(f'invalid monitor type: {tp!r}') + pkey = AppSetting.get('private_key') + host = Host.objects.filter(pk=addr).first() + return host_executor(host, pkey, command) diff --git a/spug_api/apps/monitor/management/commands/runmonitor.py b/spug_api/apps/monitor/management/commands/runmonitor.py new file mode 100644 index 0000000..2ba3b7a --- /dev/null +++ b/spug_api/apps/monitor/management/commands/runmonitor.py @@ -0,0 +1,10 @@ +from django.core.management.base import BaseCommand +from apps.monitor.scheduler import Scheduler + + +class Command(BaseCommand): + help = 'Start monitor process' + + def handle(self, *args, **options): + s = Scheduler() + s.run() diff --git a/spug_api/apps/monitor/models.py b/spug_api/apps/monitor/models.py index 4ddcac6..aa5221e 100644 --- a/spug_api/apps/monitor/models.py +++ b/spug_api/apps/monitor/models.py @@ -10,6 +10,10 @@ class Detection(models.Model, ModelMixin): ('3', '进程检测'), ('4', '自定义脚本'), ) + STATUS = ( + (0, '成功'), + (1, '失败'), + ) name = models.CharField(max_length=50) type = models.CharField(max_length=2, choices=TYPES) addr = models.CharField(max_length=255) @@ -19,6 +23,11 @@ class Detection(models.Model, ModelMixin): rate = models.IntegerField(default=5) threshold = models.IntegerField(default=3) quiet = models.IntegerField(default=24 * 60) + fault_times = models.SmallIntegerField(default=0) + latest_status = models.SmallIntegerField(choices=STATUS, null=True) + latest_run_time = models.CharField(max_length=20, null=True) + latest_fault_time = models.IntegerField(null=True) + latest_notify_time = models.IntegerField(default=0) created_at = models.CharField(max_length=20, default=human_time) created_by = models.ForeignKey(User, models.PROTECT, related_name='+') @@ -28,6 +37,7 @@ class Detection(models.Model, ModelMixin): def to_dict(self, *args, **kwargs): tmp = super().to_dict(*args, **kwargs) tmp['type_alias'] = self.get_type_display() + tmp['latest_status_alias'] = self.get_latest_status_display() return tmp def __repr__(self): diff --git a/spug_api/apps/monitor/scheduler.py b/spug_api/apps/monitor/scheduler.py new file mode 100644 index 0000000..fe68788 --- /dev/null +++ b/spug_api/apps/monitor/scheduler.py @@ -0,0 +1,90 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler import events +from django_redis import get_redis_connection +from apps.monitor.models import Detection +from apps.monitor.executors import dispatch +from django.conf import settings +from libs import AttrDict, human_time +import logging +import json +import time + +logger = logging.getLogger("django.apps.monitor") + + +class Scheduler: + timezone = settings.TIME_ZONE + + def __init__(self): + self.scheduler = BackgroundScheduler(timezone=self.timezone) + self.scheduler.add_listener(self._handle_event, ) + + def _handle_notify(self, obj, old_status): + if obj.latest_status == 0: + if old_status == 1: + logger.info(f'{human_time()} recover job_id: {obj.id}') + else: + if obj.fault_times >= obj.threshold: + if time.time() - obj.latest_notify_time >= obj.quiet * 60: + obj.latest_notify_time = int(time.time()) + obj.save() + logger.info(f'{human_time()} notify job_id: {obj.id}') + + def _handle_event(self, event): + # TODO: notify to user + if event.code == events.EVENT_SCHEDULER_SHUTDOWN: + logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}') + if event.code == events.EVENT_JOB_MAX_INSTANCES: + logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}') + if event.code == events.EVENT_JOB_ERROR: + logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}') + if event.code == events.EVENT_JOB_MISSED: + logger.info(f'EVENT_JOB_MISSED: job_id {event.job_id}') + if event.code == events.EVENT_JOB_EXECUTED: + obj = Detection.objects.filter(pk=event.job_id).first() + old_status = obj.latest_status + obj.latest_status = 0 if event.retval else 1 + obj.latest_run_time = human_time(event.scheduled_run_time) + if old_status == 0 and event.retval is False: + obj.latest_fault_time = int(time.time()) + if obj.latest_status == 0: + obj.latest_notify_time = 0 + obj.fault_times = 0 + else: + obj.fault_times += 1 + obj.save() + self._handle_notify(obj, old_status) + + def _init(self): + self.scheduler.start() + for item in Detection.objects.filter(is_active=True): + trigger = IntervalTrigger(minutes=int(item.rate), timezone=self.timezone) + self.scheduler.add_job( + dispatch, + trigger, + id=str(item.id), + args=(item.type, item.addr, item.extra), + ) + + def run(self): + rds_cli = get_redis_connection() + self._init() + rds_cli.delete(settings.MONITOR_KEY) + logger.info('Running monitor') + while True: + _, data = rds_cli.blpop(settings.MONITOR_KEY) + task = AttrDict(json.loads(data)) + if task.action in ('add', 'modify'): + trigger = IntervalTrigger(minutes=int(task.rate), timezone=self.timezone) + self.scheduler.add_job( + dispatch, + trigger, + id=str(task.id), + args=(task.type, task.addr, task.extra), + replace_existing=True + ) + elif task.action == 'remove': + job = self.scheduler.get_job(str(task.id)) + if job: + job.remove() diff --git a/spug_api/apps/monitor/views.py b/spug_api/apps/monitor/views.py index c4d90fc..4c32ccc 100644 --- a/spug_api/apps/monitor/views.py +++ b/spug_api/apps/monitor/views.py @@ -1,6 +1,9 @@ from django.views.generic import View from libs import json_response, JsonParser, Argument, human_time from apps.monitor.models import Detection +from django_redis import get_redis_connection +from django.conf import settings +import json class DetectionView(View): @@ -22,12 +25,38 @@ class DetectionView(View): ).parse(request.body) if error is None: if form.id: - form.updated_at = human_time() - form.updated_by = request.user - Detection.objects.filter(pk=form.pop('id')).update(**form) + Detection.objects.filter(pk=form.id).update( + updated_at=human_time(), + updated_by=request.user, + **form) + task = Detection.objects.filter(pk=form.id).first() + if task and task.is_active: + form.action = 'modify' + rds_cli = get_redis_connection() + rds_cli.rpush(settings.MONITOR_KEY, json.dumps(form)) else: - form.created_by = request.user - Detection.objects.create(**form) + Detection.objects.create(created_by=request.user, **form) + form.action = 'add' + rds_cli = get_redis_connection() + rds_cli.rpush(settings.MONITOR_KEY, json.dumps(form)) + return json_response(error=error) + + def patch(self, request): + form, error = JsonParser( + Argument('id', type=int, help='请指定操作对象'), + Argument('is_active', type=bool, required=False) + ).parse(request.body, True) + if error is None: + Detection.objects.filter(pk=form.id).update(**form) + if form.get('is_active') is not None: + if form.is_active: + task = Detection.objects.filter(pk=form.id).first() + message = {'id': form.id, 'action': 'add'} + message.update(task.to_dict(selects=('addr', 'extra', 'rate', 'type'))) + else: + message = {'id': form.id, 'action': 'remove'} + rds_cli = get_redis_connection() + rds_cli.rpush(settings.MONITOR_KEY, json.dumps(message)) return json_response(error=error) def delete(self, request): @@ -35,5 +64,9 @@ class DetectionView(View): Argument('id', type=int, help='请指定操作对象') ).parse(request.GET) if error is None: - Detection.objects.filter(pk=form.id).delete() + task = Detection.objects.filter(pk=form.id).first() + if task: + if task.is_active: + return json_response(error='该监控项正在运行中,请先停止后再尝试删除') + task.delete() return json_response(error=error) diff --git a/spug_api/requirements.txt b/spug_api/requirements.txt index cb83f47..0f0a34f 100644 --- a/spug_api/requirements.txt +++ b/spug_api/requirements.txt @@ -2,4 +2,5 @@ apscheduler==3.6.3 Django==2.2.7 channels==2.3.1 paramiko==2.6.0 -django-redis==4.10.0 \ No newline at end of file +django-redis==4.10.0 +requests==2.22.0 \ No newline at end of file diff --git a/spug_api/spug/settings.py b/spug_api/spug/settings.py index 79c4dff..5895f4b 100644 --- a/spug_api/spug/settings.py +++ b/spug_api/spug/settings.py @@ -81,6 +81,7 @@ CHANNEL_LAYERS = { } SCHEDULE_KEY = 'spug:schedule' +MONITOR_KEY = 'spug:monitor' # Internationalization # https://docs.djangoproject.com/en/2.2/topics/i18n/