From 6028391f21ef1d08ba82d604274afaf13f28993f Mon Sep 17 00:00:00 2001 From: vapao Date: Wed, 8 Dec 2021 09:01:04 +0800 Subject: [PATCH] =?UTF-8?q?U=20=E6=B7=BB=E5=8A=A0worker=E9=98=9F=E5=88=97?= =?UTF-8?q?=E7=9B=91=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../exec/management/commands/runworker.py | 26 ++++++++++++++++- spug_api/apps/notify/models.py | 28 ++++++++++++++++--- spug_web/src/layout/Notification.js | 10 ++++++- 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/spug_api/apps/exec/management/commands/runworker.py b/spug_api/apps/exec/management/commands/runworker.py index 3356a69..c5fc071 100644 --- a/spug_api/apps/exec/management/commands/runworker.py +++ b/spug_api/apps/exec/management/commands/runworker.py @@ -9,7 +9,10 @@ from concurrent.futures import ThreadPoolExecutor from apps.schedule.executors import schedule_worker_handler from apps.monitor.executors import monitor_worker_handler from apps.exec.executors import exec_worker_handler +from apps.notify.models import Notify +from threading import Thread import logging +import time import os EXEC_WORKER_KEY = settings.EXEC_WORKER_KEY @@ -22,13 +25,34 @@ logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(message)s') class Worker: def __init__(self): self.rds = get_redis_connection() - self._executor = ThreadPoolExecutor(max_workers=max(50, os.cpu_count() * 20)) + self._executor = ThreadPoolExecutor(max_workers=max(100, os.cpu_count() * 50)) def job_done(self, future): connections.close_all() + def queue_monitor(self): + counter = 0 + while True: + time.sleep((counter or 1) ** 3 * 10) + qsize = self._executor._work_queue.qsize() + logging.warning(f'do check: {counter} qsize: {qsize}') + if qsize > 0: + if counter > 0: + content = '请检查监控、任务计划或批量执行等避免长耗时任务,必要时可重启服务清空队列。' + try: + Notify.make_system_notify(f'执行队列堆积({qsize})', content) + except Exception as e: + logging.warning(e) + finally: + connections.close_all() + logging.warning(f'!!! 执行队列堆积({qsize})') + counter += 1 + else: + counter = 0 + def run(self): logging.warning('Running worker') + Thread(target=self.queue_monitor).start() self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY) while True: key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) diff --git a/spug_api/apps/notify/models.py b/spug_api/apps/notify/models.py index ecb346a..935017c 100644 --- a/spug_api/apps/notify/models.py +++ b/spug_api/apps/notify/models.py @@ -5,7 +5,7 @@ from django.db import models from django.core.cache import cache from libs import ModelMixin, human_datetime from libs.channel import Channel -import time +import hashlib class Notify(models.Model, ModelMixin): @@ -17,6 +17,7 @@ class Notify(models.Model, ModelMixin): ('monitor', '监控中心'), ('schedule', '任务计划'), ('flag', '应用发布'), + ('alert', '系统警告'), ) title = models.CharField(max_length=255) source = models.CharField(max_length=10, choices=SOURCES) @@ -28,9 +29,28 @@ class Notify(models.Model, ModelMixin): created_at = models.CharField(max_length=20, default=human_datetime) @classmethod - def make_notify(cls, source, type, title, content=None, with_quiet=True): - if not with_quiet or time.time() - cache.get('spug:notify_quiet', 0) > 3600: - cache.set('spug:notify_quiet', time.time()) + def make_system_notify(cls, title, content): + cls._make_notify('alert', '1', title, content) + + @classmethod + def make_monitor_notify(cls, title, content): + cls._make_notify('monitor', '1', title, content) + + @classmethod + def make_schedule_notify(cls, title, content): + cls._make_notify('schedule', '1', title, content) + + @classmethod + def make_deploy_notify(cls, title, content): + cls._make_notify('flag', '1', title, content) + + @classmethod + def _make_notify(cls, source, type, title, content): + tmp_str = f'{source},{type},{title},{content}' + digest = hashlib.md5(tmp_str.encode()).hexdigest() + unique_key = f'spug:notify:{digest}' + if not cache.get(unique_key): # 限制相同内容的发送频率 + cache.set(unique_key, 1, 3600) cls.objects.create(source=source, title=title, type=type, content=content) Channel.send_notify(title, content) diff --git a/spug_web/src/layout/Notification.js b/spug_web/src/layout/Notification.js index 287d274..cc55226 100644 --- a/spug_web/src/layout/Notification.js +++ b/spug_web/src/layout/Notification.js @@ -1,6 +1,12 @@ import React, { useState, useEffect } from 'react'; import { Menu, List, Dropdown, Badge, Button, notification } from 'antd'; -import { NotificationOutlined, MonitorOutlined, FlagOutlined, ScheduleOutlined } from '@ant-design/icons'; +import { + NotificationOutlined, + MonitorOutlined, + FlagOutlined, + ScheduleOutlined, + AlertOutlined +} from '@ant-design/icons'; import { http, X_TOKEN } from 'libs'; import moment from 'moment'; import styles from './layout.module.less'; @@ -17,6 +23,8 @@ function Icon(props) { return case 'flag': return + case 'alert': + return default: return null }