mirror of https://github.com/openspug/spug
				
				
				
			A api add monitor scheduler
							parent
							
								
									76c2bcf922
								
							
						
					
					
						commit
						588395150a
					
				| 
						 | 
				
			
			@ -0,0 +1,48 @@
 | 
			
		|||
from libs.ssh import SSH
 | 
			
		||||
from apps.host.models import Host
 | 
			
		||||
from apps.setting.utils import AppSetting
 | 
			
		||||
from socket import socket
 | 
			
		||||
import requests
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
logging.captureWarnings(True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def site_check(url):
 | 
			
		||||
    status_code = -1
 | 
			
		||||
    try:
 | 
			
		||||
        res = requests.get(url, timeout=10, verify=False)
 | 
			
		||||
        status_code = res.status_code
 | 
			
		||||
    finally:
 | 
			
		||||
        return status_code == 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def port_check(addr, port):
 | 
			
		||||
    sock = socket()
 | 
			
		||||
    sock.settimeout(5)
 | 
			
		||||
    return sock.connect_ex((addr, int(port))) == 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def host_executor(host, pkey, command):
 | 
			
		||||
    exit_code = -1
 | 
			
		||||
    try:
 | 
			
		||||
        cli = SSH(host.hostname, host.port, host.username, pkey=pkey)
 | 
			
		||||
        exit_code, _ = cli.exec_command(command)
 | 
			
		||||
    finally:
 | 
			
		||||
        return exit_code == 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def dispatch(tp, addr, extra):
 | 
			
		||||
    if tp == '1':
 | 
			
		||||
        return site_check(addr)
 | 
			
		||||
    elif tp == '2':
 | 
			
		||||
        return port_check(addr, extra)
 | 
			
		||||
    elif tp == '3':
 | 
			
		||||
        command = f'ps -ef|grep -v grep|grep {extra!r}'
 | 
			
		||||
    elif tp == '4':
 | 
			
		||||
        command = extra
 | 
			
		||||
    else:
 | 
			
		||||
        raise TypeError(f'invalid monitor type: {tp!r}')
 | 
			
		||||
    pkey = AppSetting.get('private_key')
 | 
			
		||||
    host = Host.objects.filter(pk=addr).first()
 | 
			
		||||
    return host_executor(host, pkey, command)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,10 @@
 | 
			
		|||
from django.core.management.base import BaseCommand
 | 
			
		||||
from apps.monitor.scheduler import Scheduler
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Command(BaseCommand):
 | 
			
		||||
    help = 'Start monitor process'
 | 
			
		||||
 | 
			
		||||
    def handle(self, *args, **options):
 | 
			
		||||
        s = Scheduler()
 | 
			
		||||
        s.run()
 | 
			
		||||
| 
						 | 
				
			
			@ -10,6 +10,10 @@ class Detection(models.Model, ModelMixin):
 | 
			
		|||
        ('3', '进程检测'),
 | 
			
		||||
        ('4', '自定义脚本'),
 | 
			
		||||
    )
 | 
			
		||||
    STATUS = (
 | 
			
		||||
        (0, '成功'),
 | 
			
		||||
        (1, '失败'),
 | 
			
		||||
    )
 | 
			
		||||
    name = models.CharField(max_length=50)
 | 
			
		||||
    type = models.CharField(max_length=2, choices=TYPES)
 | 
			
		||||
    addr = models.CharField(max_length=255)
 | 
			
		||||
| 
						 | 
				
			
			@ -19,6 +23,11 @@ class Detection(models.Model, ModelMixin):
 | 
			
		|||
    rate = models.IntegerField(default=5)
 | 
			
		||||
    threshold = models.IntegerField(default=3)
 | 
			
		||||
    quiet = models.IntegerField(default=24 * 60)
 | 
			
		||||
    fault_times = models.SmallIntegerField(default=0)
 | 
			
		||||
    latest_status = models.SmallIntegerField(choices=STATUS, null=True)
 | 
			
		||||
    latest_run_time = models.CharField(max_length=20, null=True)
 | 
			
		||||
    latest_fault_time = models.IntegerField(null=True)
 | 
			
		||||
    latest_notify_time = models.IntegerField(default=0)
 | 
			
		||||
 | 
			
		||||
    created_at = models.CharField(max_length=20, default=human_time)
 | 
			
		||||
    created_by = models.ForeignKey(User, models.PROTECT, related_name='+')
 | 
			
		||||
| 
						 | 
				
			
			@ -28,6 +37,7 @@ class Detection(models.Model, ModelMixin):
 | 
			
		|||
    def to_dict(self, *args, **kwargs):
 | 
			
		||||
        tmp = super().to_dict(*args, **kwargs)
 | 
			
		||||
        tmp['type_alias'] = self.get_type_display()
 | 
			
		||||
        tmp['latest_status_alias'] = self.get_latest_status_display()
 | 
			
		||||
        return tmp
 | 
			
		||||
 | 
			
		||||
    def __repr__(self):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,90 @@
 | 
			
		|||
from apscheduler.schedulers.background import BackgroundScheduler
 | 
			
		||||
from apscheduler.triggers.interval import IntervalTrigger
 | 
			
		||||
from apscheduler import events
 | 
			
		||||
from django_redis import get_redis_connection
 | 
			
		||||
from apps.monitor.models import Detection
 | 
			
		||||
from apps.monitor.executors import dispatch
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from libs import AttrDict, human_time
 | 
			
		||||
import logging
 | 
			
		||||
import json
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger("django.apps.monitor")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Scheduler:
 | 
			
		||||
    timezone = settings.TIME_ZONE
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.scheduler = BackgroundScheduler(timezone=self.timezone)
 | 
			
		||||
        self.scheduler.add_listener(self._handle_event, )
 | 
			
		||||
 | 
			
		||||
    def _handle_notify(self, obj, old_status):
 | 
			
		||||
        if obj.latest_status == 0:
 | 
			
		||||
            if old_status == 1:
 | 
			
		||||
                logger.info(f'{human_time()} recover job_id: {obj.id}')
 | 
			
		||||
        else:
 | 
			
		||||
            if obj.fault_times >= obj.threshold:
 | 
			
		||||
                if time.time() - obj.latest_notify_time >= obj.quiet * 60:
 | 
			
		||||
                    obj.latest_notify_time = int(time.time())
 | 
			
		||||
                    obj.save()
 | 
			
		||||
                    logger.info(f'{human_time()} notify job_id: {obj.id}')
 | 
			
		||||
 | 
			
		||||
    def _handle_event(self, event):
 | 
			
		||||
        # TODO: notify to user
 | 
			
		||||
        if event.code == events.EVENT_SCHEDULER_SHUTDOWN:
 | 
			
		||||
            logger.info(f'EVENT_SCHEDULER_SHUTDOWN: {event}')
 | 
			
		||||
        if event.code == events.EVENT_JOB_MAX_INSTANCES:
 | 
			
		||||
            logger.info(f'EVENT_JOB_MAX_INSTANCES: {event}')
 | 
			
		||||
        if event.code == events.EVENT_JOB_ERROR:
 | 
			
		||||
            logger.info(f'EVENT_JOB_ERROR: job_id {event.job_id} exception: {event.exception}')
 | 
			
		||||
        if event.code == events.EVENT_JOB_MISSED:
 | 
			
		||||
            logger.info(f'EVENT_JOB_MISSED: job_id {event.job_id}')
 | 
			
		||||
        if event.code == events.EVENT_JOB_EXECUTED:
 | 
			
		||||
            obj = Detection.objects.filter(pk=event.job_id).first()
 | 
			
		||||
            old_status = obj.latest_status
 | 
			
		||||
            obj.latest_status = 0 if event.retval else 1
 | 
			
		||||
            obj.latest_run_time = human_time(event.scheduled_run_time)
 | 
			
		||||
            if old_status == 0 and event.retval is False:
 | 
			
		||||
                obj.latest_fault_time = int(time.time())
 | 
			
		||||
            if obj.latest_status == 0:
 | 
			
		||||
                obj.latest_notify_time = 0
 | 
			
		||||
                obj.fault_times = 0
 | 
			
		||||
            else:
 | 
			
		||||
                obj.fault_times += 1
 | 
			
		||||
            obj.save()
 | 
			
		||||
            self._handle_notify(obj, old_status)
 | 
			
		||||
 | 
			
		||||
    def _init(self):
 | 
			
		||||
        self.scheduler.start()
 | 
			
		||||
        for item in Detection.objects.filter(is_active=True):
 | 
			
		||||
            trigger = IntervalTrigger(minutes=int(item.rate), timezone=self.timezone)
 | 
			
		||||
            self.scheduler.add_job(
 | 
			
		||||
                dispatch,
 | 
			
		||||
                trigger,
 | 
			
		||||
                id=str(item.id),
 | 
			
		||||
                args=(item.type, item.addr, item.extra),
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
        rds_cli = get_redis_connection()
 | 
			
		||||
        self._init()
 | 
			
		||||
        rds_cli.delete(settings.MONITOR_KEY)
 | 
			
		||||
        logger.info('Running monitor')
 | 
			
		||||
        while True:
 | 
			
		||||
            _, data = rds_cli.blpop(settings.MONITOR_KEY)
 | 
			
		||||
            task = AttrDict(json.loads(data))
 | 
			
		||||
            if task.action in ('add', 'modify'):
 | 
			
		||||
                trigger = IntervalTrigger(minutes=int(task.rate), timezone=self.timezone)
 | 
			
		||||
                self.scheduler.add_job(
 | 
			
		||||
                    dispatch,
 | 
			
		||||
                    trigger,
 | 
			
		||||
                    id=str(task.id),
 | 
			
		||||
                    args=(task.type, task.addr, task.extra),
 | 
			
		||||
                    replace_existing=True
 | 
			
		||||
                )
 | 
			
		||||
            elif task.action == 'remove':
 | 
			
		||||
                job = self.scheduler.get_job(str(task.id))
 | 
			
		||||
                if job:
 | 
			
		||||
                    job.remove()
 | 
			
		||||
| 
						 | 
				
			
			@ -1,6 +1,9 @@
 | 
			
		|||
from django.views.generic import View
 | 
			
		||||
from libs import json_response, JsonParser, Argument, human_time
 | 
			
		||||
from apps.monitor.models import Detection
 | 
			
		||||
from django_redis import get_redis_connection
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DetectionView(View):
 | 
			
		||||
| 
						 | 
				
			
			@ -22,12 +25,38 @@ class DetectionView(View):
 | 
			
		|||
        ).parse(request.body)
 | 
			
		||||
        if error is None:
 | 
			
		||||
            if form.id:
 | 
			
		||||
                form.updated_at = human_time()
 | 
			
		||||
                form.updated_by = request.user
 | 
			
		||||
                Detection.objects.filter(pk=form.pop('id')).update(**form)
 | 
			
		||||
                Detection.objects.filter(pk=form.id).update(
 | 
			
		||||
                    updated_at=human_time(),
 | 
			
		||||
                    updated_by=request.user,
 | 
			
		||||
                    **form)
 | 
			
		||||
                task = Detection.objects.filter(pk=form.id).first()
 | 
			
		||||
                if task and task.is_active:
 | 
			
		||||
                    form.action = 'modify'
 | 
			
		||||
                    rds_cli = get_redis_connection()
 | 
			
		||||
                    rds_cli.rpush(settings.MONITOR_KEY, json.dumps(form))
 | 
			
		||||
            else:
 | 
			
		||||
                form.created_by = request.user
 | 
			
		||||
                Detection.objects.create(**form)
 | 
			
		||||
                Detection.objects.create(created_by=request.user, **form)
 | 
			
		||||
                form.action = 'add'
 | 
			
		||||
                rds_cli = get_redis_connection()
 | 
			
		||||
                rds_cli.rpush(settings.MONITOR_KEY, json.dumps(form))
 | 
			
		||||
        return json_response(error=error)
 | 
			
		||||
 | 
			
		||||
    def patch(self, request):
 | 
			
		||||
        form, error = JsonParser(
 | 
			
		||||
            Argument('id', type=int, help='请指定操作对象'),
 | 
			
		||||
            Argument('is_active', type=bool, required=False)
 | 
			
		||||
        ).parse(request.body, True)
 | 
			
		||||
        if error is None:
 | 
			
		||||
            Detection.objects.filter(pk=form.id).update(**form)
 | 
			
		||||
            if form.get('is_active') is not None:
 | 
			
		||||
                if form.is_active:
 | 
			
		||||
                    task = Detection.objects.filter(pk=form.id).first()
 | 
			
		||||
                    message = {'id': form.id, 'action': 'add'}
 | 
			
		||||
                    message.update(task.to_dict(selects=('addr', 'extra', 'rate', 'type')))
 | 
			
		||||
                else:
 | 
			
		||||
                    message = {'id': form.id, 'action': 'remove'}
 | 
			
		||||
                rds_cli = get_redis_connection()
 | 
			
		||||
                rds_cli.rpush(settings.MONITOR_KEY, json.dumps(message))
 | 
			
		||||
        return json_response(error=error)
 | 
			
		||||
 | 
			
		||||
    def delete(self, request):
 | 
			
		||||
| 
						 | 
				
			
			@ -35,5 +64,9 @@ class DetectionView(View):
 | 
			
		|||
            Argument('id', type=int, help='请指定操作对象')
 | 
			
		||||
        ).parse(request.GET)
 | 
			
		||||
        if error is None:
 | 
			
		||||
            Detection.objects.filter(pk=form.id).delete()
 | 
			
		||||
            task = Detection.objects.filter(pk=form.id).first()
 | 
			
		||||
            if task:
 | 
			
		||||
                if task.is_active:
 | 
			
		||||
                    return json_response(error='该监控项正在运行中,请先停止后再尝试删除')
 | 
			
		||||
                task.delete()
 | 
			
		||||
        return json_response(error=error)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,4 +2,5 @@ apscheduler==3.6.3
 | 
			
		|||
Django==2.2.7
 | 
			
		||||
channels==2.3.1
 | 
			
		||||
paramiko==2.6.0
 | 
			
		||||
django-redis==4.10.0
 | 
			
		||||
django-redis==4.10.0
 | 
			
		||||
requests==2.22.0
 | 
			
		||||
| 
						 | 
				
			
			@ -81,6 +81,7 @@ CHANNEL_LAYERS = {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
SCHEDULE_KEY = 'spug:schedule'
 | 
			
		||||
MONITOR_KEY = 'spug:monitor'
 | 
			
		||||
 | 
			
		||||
# Internationalization
 | 
			
		||||
# https://docs.djangoproject.com/en/2.2/topics/i18n/
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue