mirror of https://github.com/openspug/spug
U 添加worker队列监测
parent
8966567024
commit
6028391f21
|
@ -9,7 +9,10 @@ from concurrent.futures import ThreadPoolExecutor
|
||||||
from apps.schedule.executors import schedule_worker_handler
|
from apps.schedule.executors import schedule_worker_handler
|
||||||
from apps.monitor.executors import monitor_worker_handler
|
from apps.monitor.executors import monitor_worker_handler
|
||||||
from apps.exec.executors import exec_worker_handler
|
from apps.exec.executors import exec_worker_handler
|
||||||
|
from apps.notify.models import Notify
|
||||||
|
from threading import Thread
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
import os
|
import os
|
||||||
|
|
||||||
EXEC_WORKER_KEY = settings.EXEC_WORKER_KEY
|
EXEC_WORKER_KEY = settings.EXEC_WORKER_KEY
|
||||||
|
@ -22,13 +25,34 @@ logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(message)s')
|
||||||
class Worker:
|
class Worker:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.rds = get_redis_connection()
|
self.rds = get_redis_connection()
|
||||||
self._executor = ThreadPoolExecutor(max_workers=max(50, os.cpu_count() * 20))
|
self._executor = ThreadPoolExecutor(max_workers=max(100, os.cpu_count() * 50))
|
||||||
|
|
||||||
def job_done(self, future):
|
def job_done(self, future):
|
||||||
connections.close_all()
|
connections.close_all()
|
||||||
|
|
||||||
|
def queue_monitor(self):
|
||||||
|
counter = 0
|
||||||
|
while True:
|
||||||
|
time.sleep((counter or 1) ** 3 * 10)
|
||||||
|
qsize = self._executor._work_queue.qsize()
|
||||||
|
logging.warning(f'do check: {counter} qsize: {qsize}')
|
||||||
|
if qsize > 0:
|
||||||
|
if counter > 0:
|
||||||
|
content = '请检查监控、任务计划或批量执行等避免长耗时任务,必要时可重启服务清空队列。'
|
||||||
|
try:
|
||||||
|
Notify.make_system_notify(f'执行队列堆积({qsize})', content)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(e)
|
||||||
|
finally:
|
||||||
|
connections.close_all()
|
||||||
|
logging.warning(f'!!! 执行队列堆积({qsize})')
|
||||||
|
counter += 1
|
||||||
|
else:
|
||||||
|
counter = 0
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logging.warning('Running worker')
|
logging.warning('Running worker')
|
||||||
|
Thread(target=self.queue_monitor).start()
|
||||||
self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY)
|
self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY)
|
||||||
while True:
|
while True:
|
||||||
key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
|
key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
|
||||||
|
|
|
@ -5,7 +5,7 @@ from django.db import models
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from libs import ModelMixin, human_datetime
|
from libs import ModelMixin, human_datetime
|
||||||
from libs.channel import Channel
|
from libs.channel import Channel
|
||||||
import time
|
import hashlib
|
||||||
|
|
||||||
|
|
||||||
class Notify(models.Model, ModelMixin):
|
class Notify(models.Model, ModelMixin):
|
||||||
|
@ -17,6 +17,7 @@ class Notify(models.Model, ModelMixin):
|
||||||
('monitor', '监控中心'),
|
('monitor', '监控中心'),
|
||||||
('schedule', '任务计划'),
|
('schedule', '任务计划'),
|
||||||
('flag', '应用发布'),
|
('flag', '应用发布'),
|
||||||
|
('alert', '系统警告'),
|
||||||
)
|
)
|
||||||
title = models.CharField(max_length=255)
|
title = models.CharField(max_length=255)
|
||||||
source = models.CharField(max_length=10, choices=SOURCES)
|
source = models.CharField(max_length=10, choices=SOURCES)
|
||||||
|
@ -28,9 +29,28 @@ class Notify(models.Model, ModelMixin):
|
||||||
created_at = models.CharField(max_length=20, default=human_datetime)
|
created_at = models.CharField(max_length=20, default=human_datetime)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def make_notify(cls, source, type, title, content=None, with_quiet=True):
|
def make_system_notify(cls, title, content):
|
||||||
if not with_quiet or time.time() - cache.get('spug:notify_quiet', 0) > 3600:
|
cls._make_notify('alert', '1', title, content)
|
||||||
cache.set('spug:notify_quiet', time.time())
|
|
||||||
|
@classmethod
|
||||||
|
def make_monitor_notify(cls, title, content):
|
||||||
|
cls._make_notify('monitor', '1', title, content)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def make_schedule_notify(cls, title, content):
|
||||||
|
cls._make_notify('schedule', '1', title, content)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def make_deploy_notify(cls, title, content):
|
||||||
|
cls._make_notify('flag', '1', title, content)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _make_notify(cls, source, type, title, content):
|
||||||
|
tmp_str = f'{source},{type},{title},{content}'
|
||||||
|
digest = hashlib.md5(tmp_str.encode()).hexdigest()
|
||||||
|
unique_key = f'spug:notify:{digest}'
|
||||||
|
if not cache.get(unique_key): # 限制相同内容的发送频率
|
||||||
|
cache.set(unique_key, 1, 3600)
|
||||||
cls.objects.create(source=source, title=title, type=type, content=content)
|
cls.objects.create(source=source, title=title, type=type, content=content)
|
||||||
Channel.send_notify(title, content)
|
Channel.send_notify(title, content)
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,12 @@
|
||||||
import React, { useState, useEffect } from 'react';
|
import React, { useState, useEffect } from 'react';
|
||||||
import { Menu, List, Dropdown, Badge, Button, notification } from 'antd';
|
import { Menu, List, Dropdown, Badge, Button, notification } from 'antd';
|
||||||
import { NotificationOutlined, MonitorOutlined, FlagOutlined, ScheduleOutlined } from '@ant-design/icons';
|
import {
|
||||||
|
NotificationOutlined,
|
||||||
|
MonitorOutlined,
|
||||||
|
FlagOutlined,
|
||||||
|
ScheduleOutlined,
|
||||||
|
AlertOutlined
|
||||||
|
} from '@ant-design/icons';
|
||||||
import { http, X_TOKEN } from 'libs';
|
import { http, X_TOKEN } from 'libs';
|
||||||
import moment from 'moment';
|
import moment from 'moment';
|
||||||
import styles from './layout.module.less';
|
import styles from './layout.module.less';
|
||||||
|
@ -17,6 +23,8 @@ function Icon(props) {
|
||||||
return <ScheduleOutlined style={{fontSize: 24, color: '#1890ff'}}/>
|
return <ScheduleOutlined style={{fontSize: 24, color: '#1890ff'}}/>
|
||||||
case 'flag':
|
case 'flag':
|
||||||
return <FlagOutlined style={{fontSize: 24, color: '#1890ff'}}/>
|
return <FlagOutlined style={{fontSize: 24, color: '#1890ff'}}/>
|
||||||
|
case 'alert':
|
||||||
|
return <AlertOutlined style={{fontSize: 24, color: '#ff4d4f'}}/>
|
||||||
default:
|
default:
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue