From 949166eaed9791be5bb198a18767ea5d31cba547 Mon Sep 17 00:00:00 2001 From: jiangweidong Date: Thu, 17 Nov 2022 09:16:48 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20Redis=E5=93=A8=E5=85=B5=E5=88=87?= =?UTF-8?q?=E6=8D=A2=E6=97=B6=E5=AF=BC=E8=87=B4core=E5=92=8Ccelery?= =?UTF-8?q?=E7=BB=84=E4=BB=B6=E7=9B=91=E5=90=AC=E7=BA=BF=E7=A8=8B=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E9=80=80=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/terminal/startup.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/apps/terminal/startup.py b/apps/terminal/startup.py index 7e454da83..a4c574ac2 100644 --- a/apps/terminal/startup.py +++ b/apps/terminal/startup.py @@ -3,7 +3,6 @@ import time import socket import threading from django.conf import settings -from django.db.utils import OperationalError from common.db.utils import close_old_connections from common.decorator import Singleton @@ -45,23 +44,23 @@ class BaseTerminal(object): def start_heartbeat(self): while True: - heartbeat_data = { - 'cpu_load': get_cpu_load(), - 'memory_used': get_memory_usage(), - 'disk_used': get_disk_usage(path=settings.BASE_DIR), - 'sessions': [], - } - status_serializer = StatusSerializer(data=heartbeat_data) - status_serializer.is_valid() - status_serializer.validated_data.pop('sessions', None) - terminal = self.get_or_register_terminal() - status_serializer.validated_data['terminal'] = terminal - try: + heartbeat_data = { + 'cpu_load': get_cpu_load(), + 'memory_used': get_memory_usage(), + 'disk_used': get_disk_usage(path=settings.BASE_DIR), + 'sessions': [], + } + status_serializer = StatusSerializer(data=heartbeat_data) + status_serializer.is_valid() + status_serializer.validated_data.pop('sessions', None) + terminal = self.get_or_register_terminal() + status_serializer.validated_data['terminal'] = terminal status_serializer.save() - time.sleep(self.interval) - except OperationalError: + except Exception: close_old_connections() + finally: + time.sleep(self.interval) def get_or_register_terminal(self): terminal = Terminal.objects.filter( From 90c48d303eb45fa2b2d1a5c162d4160e3f3277d9 Mon Sep 17 00:00:00 2001 From: halo Date: Thu, 17 Nov 2022 13:29:12 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20celery=E4=BB=BB=E5=8A=A1=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E5=AD=97=E6=AE=B5=E5=B7=AE8=E5=B0=8F=E6=97=B6?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jumpserver/settings/libs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/jumpserver/settings/libs.py b/apps/jumpserver/settings/libs.py index ccde975a5..28437fe05 100644 --- a/apps/jumpserver/settings/libs.py +++ b/apps/jumpserver/settings/libs.py @@ -144,6 +144,8 @@ else: 'port': CONFIG.REDIS_PORT, 'db': CONFIG.REDIS_DB_CELERY, } +CELERY_TIMEZONE = CONFIG.TIME_ZONE +CELERY_ENABLE_UTC = False CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle' CELERY_RESULT_BACKEND = CELERY_BROKER_URL From a42641ca9a6377677dc2d3ca97b2fc3ed9c00029 Mon Sep 17 00:00:00 2001 From: ibuler Date: Thu, 17 Nov 2022 13:44:50 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20redis=20?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=AF=BC=E8=87=B4=E7=9A=84=20pub=20sub=20?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../signal_handlers/node_assets_mapping.py | 17 ++--- apps/common/utils/connection.py | 72 +++++++++++++------ apps/orgs/signal_handlers/common.py | 31 ++++---- apps/settings/signal_handlers.py | 14 ++-- 4 files changed, 70 insertions(+), 64 deletions(-) diff --git a/apps/assets/signal_handlers/node_assets_mapping.py b/apps/assets/signal_handlers/node_assets_mapping.py index 71d7c7eb3..b242f3be8 100644 --- a/apps/assets/signal_handlers/node_assets_mapping.py +++ b/apps/assets/signal_handlers/node_assets_mapping.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- # -import os -import threading from django.db.models.signals import ( m2m_changed, post_save, post_delete @@ -9,15 +7,15 @@ from django.db.models.signals import ( from django.dispatch import receiver from django.utils.functional import LazyObject -from common.signals import django_ready -from common.utils.connection import RedisPubSub -from common.utils import get_logger from assets.models import Asset, Node +from common.signals import django_ready +from common.utils import get_logger +from common.utils.connection import RedisPubSub from orgs.models import Organization - logger = get_logger(__file__) + # clear node assets mapping for memory # ------------------------------------ @@ -78,9 +76,4 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): Node.expire_node_all_asset_ids_mapping_from_memory(org_id) Node.expire_node_all_asset_ids_mapping_from_memory(root_org_id) - def keep_subscribe_node_assets_relation(): - node_assets_mapping_for_memory_pub_sub.subscribe(handle_node_relation_change) - - t = threading.Thread(target=keep_subscribe_node_assets_relation) - t.daemon = True - t.start() + node_assets_mapping_for_memory_pub_sub.subscribe(handle_node_relation_change) diff --git a/apps/common/utils/connection.py b/apps/common/utils/connection.py index 9d1e7fd9d..95e827299 100644 --- a/apps/common/utils/connection.py +++ b/apps/common/utils/connection.py @@ -1,8 +1,10 @@ import json import threading -import redis +import time +import redis from django.core.cache import cache +from redis.client import PubSub from common.db.utils import safe_db_connection from common.utils import get_logger @@ -16,15 +18,39 @@ def get_redis_client(db=0): return client -class Subscription: - def __init__(self, ch, sub, ): +class RedisPubSub: + def __init__(self, ch, db=10): self.ch = ch + self.db = db + self.redis = get_redis_client(db) + + def subscribe(self, _next, error=None, complete=None): + ps = self.redis.pubsub() + ps.subscribe(self.ch) + sub = Subscription(self, ps) + sub.keep_handle_msg(_next, error, complete) + return sub + + def resubscribe(self, _next, error=None, complete=None): + self.redis = get_redis_client(self.db) + self.subscribe(_next, error, complete) + + def publish(self, data): + data_json = json.dumps(data) + self.redis.publish(self.ch, data_json) + return True + + +class Subscription: + def __init__(self, pb: RedisPubSub, sub: PubSub): + self.pb = pb + self.ch = pb.ch self.sub = sub + self.unsubscribed = False def _handle_msg(self, _next, error, complete): """ handle arg is the pub published - :param _next: next msg handler :param error: error msg handler :param complete: complete msg handler @@ -53,9 +79,12 @@ class Subscription: error(msg, item) logger.error('Subscribe handler handle msg error: {}'.format(e)) except Exception as e: - # 正常的 websocket 断开时, redis 会断开连接,避免日志太多 - # logger.error('Consume msg error: {}'.format(e)) - pass + if self.unsubscribed: + logger.debug('Subscription unsubscribed') + else: + logger.error('Consume msg error: {}'.format(e)) + self.retry(_next, error, complete) + return try: complete() @@ -75,25 +104,22 @@ class Subscription: return t def unsubscribe(self): + self.unsubscribed = True try: self.sub.close() except Exception as e: logger.error('Unsubscribe msg error: {}'.format(e)) + def retry(self, _next, error, complete): + logger.info('Retry subscribe channel: {}'.format(self.ch)) + times = 0 -class RedisPubSub: - def __init__(self, ch, db=10): - self.ch = ch - self.redis = get_redis_client(db) - - def subscribe(self, _next, error=None, complete=None): - ps = self.redis.pubsub() - ps.subscribe(self.ch) - sub = Subscription(self.ch, ps) - sub.keep_handle_msg(_next, error, complete) - return sub - - def publish(self, data): - data_json = json.dumps(data) - self.redis.publish(self.ch, data_json) - return True + while True: + try: + self.unsubscribe() + self.pb.resubscribe(_next, error, complete) + break + except Exception as e: + logger.error('Retry #{} {} subscribe channel error: {}'.format(times, self.ch, e)) + times += 1 + time.sleep(times * 2) diff --git a/apps/orgs/signal_handlers/common.py b/apps/orgs/signal_handlers/common.py index 876613ba1..05e628883 100644 --- a/apps/orgs/signal_handlers/common.py +++ b/apps/orgs/signal_handlers/common.py @@ -1,29 +1,27 @@ # -*- coding: utf-8 -*- # -import threading from collections import defaultdict from functools import partial -from django.dispatch import receiver -from django.utils.functional import LazyObject from django.db.models.signals import m2m_changed from django.db.models.signals import post_save, pre_delete +from django.dispatch import receiver +from django.utils.functional import LazyObject -from orgs.utils import tmp_to_org -from orgs.models import Organization -from orgs.hands import set_current_org, Node, get_current_org -from perms.models import (AssetPermission, ApplicationPermission) -from users.models import UserGroup, User +from assets.models import CommandFilterRule from assets.models import SystemUser from common.const.signals import PRE_REMOVE, POST_REMOVE from common.decorator import on_transaction_commit from common.signals import django_ready from common.utils import get_logger from common.utils.connection import RedisPubSub -from assets.models import CommandFilterRule +from orgs.hands import set_current_org, Node, get_current_org +from orgs.models import Organization +from orgs.utils import tmp_to_org +from perms.models import (AssetPermission, ApplicationPermission) +from users.models import UserGroup, User from users.signals import post_user_leave_org - logger = get_logger(__file__) @@ -47,14 +45,9 @@ def expire_orgs_mapping_for_memory(org_id): def subscribe_orgs_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire orgs mapping from memory") - def keep_subscribe_org_mapping(): - orgs_mapping_for_memory_pub_sub.subscribe( - lambda org_id: Organization.expire_orgs_mapping() - ) - - t = threading.Thread(target=keep_subscribe_org_mapping) - t.daemon = True - t.start() + orgs_mapping_for_memory_pub_sub.subscribe( + lambda org_id: Organization.expire_orgs_mapping() + ) # 创建对应的root @@ -85,7 +78,7 @@ def on_org_delete(sender, instance, **kwargs): def _remove_users(model, users, org, user_field_name='users'): with tmp_to_org(org): if not isinstance(users, (tuple, list, set)): - users = (users, ) + users = (users,) user_field = getattr(model, user_field_name) m2m_model = user_field.through diff --git a/apps/settings/signal_handlers.py b/apps/settings/signal_handlers.py index 18449a771..c963488f1 100644 --- a/apps/settings/signal_handlers.py +++ b/apps/settings/signal_handlers.py @@ -1,19 +1,18 @@ # -*- coding: utf-8 -*- # import json -import threading from django.conf import LazySettings +from django.db.models.signals import post_save, pre_save from django.db.utils import ProgrammingError, OperationalError from django.dispatch import receiver -from django.db.models.signals import post_save, pre_save from django.utils.functional import LazyObject -from jumpserver.utils import current_request from common.decorator import on_transaction_commit +from common.signals import django_ready from common.utils import get_logger, ssh_key_gen from common.utils.connection import RedisPubSub -from common.signals import django_ready +from jumpserver.utils import current_request from .models import Setting logger = get_logger(__file__) @@ -81,12 +80,7 @@ def on_create_set_created_by(sender, instance=None, **kwargs): def subscribe_settings_change(sender, **kwargs): logger.debug("Start subscribe setting change") - def keep_subscribe_settings_change(): - setting_pub_sub.subscribe(lambda name: Setting.refresh_item(name)) - - t = threading.Thread(target=keep_subscribe_settings_change) - t.daemon = True - t.start() + setting_pub_sub.subscribe(lambda name: Setting.refresh_item(name)) @receiver(django_ready)