From a42641ca9a6377677dc2d3ca97b2fc3ed9c00029 Mon Sep 17 00:00:00 2001 From: ibuler Date: Thu, 17 Nov 2022 13:44:50 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20redis=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E5=AF=BC=E8=87=B4=E7=9A=84=20pub=20sub=20=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../signal_handlers/node_assets_mapping.py | 17 ++--- apps/common/utils/connection.py | 72 +++++++++++++------ apps/orgs/signal_handlers/common.py | 31 ++++---- apps/settings/signal_handlers.py | 14 ++-- 4 files changed, 70 insertions(+), 64 deletions(-) diff --git a/apps/assets/signal_handlers/node_assets_mapping.py b/apps/assets/signal_handlers/node_assets_mapping.py index 71d7c7eb3..b242f3be8 100644 --- a/apps/assets/signal_handlers/node_assets_mapping.py +++ b/apps/assets/signal_handlers/node_assets_mapping.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- # -import os -import threading from django.db.models.signals import ( m2m_changed, post_save, post_delete @@ -9,15 +7,15 @@ from django.db.models.signals import ( from django.dispatch import receiver from django.utils.functional import LazyObject -from common.signals import django_ready -from common.utils.connection import RedisPubSub -from common.utils import get_logger from assets.models import Asset, Node +from common.signals import django_ready +from common.utils import get_logger +from common.utils.connection import RedisPubSub from orgs.models import Organization - logger = get_logger(__file__) + # clear node assets mapping for memory # ------------------------------------ @@ -78,9 +76,4 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): Node.expire_node_all_asset_ids_mapping_from_memory(org_id) Node.expire_node_all_asset_ids_mapping_from_memory(root_org_id) - def keep_subscribe_node_assets_relation(): - node_assets_mapping_for_memory_pub_sub.subscribe(handle_node_relation_change) - - t = threading.Thread(target=keep_subscribe_node_assets_relation) - t.daemon = True - t.start() + node_assets_mapping_for_memory_pub_sub.subscribe(handle_node_relation_change) diff --git a/apps/common/utils/connection.py b/apps/common/utils/connection.py index 9d1e7fd9d..95e827299 100644 --- a/apps/common/utils/connection.py +++ b/apps/common/utils/connection.py @@ -1,8 +1,10 @@ import json import threading -import redis +import time +import redis from django.core.cache import cache +from redis.client import PubSub from common.db.utils import safe_db_connection from common.utils import get_logger @@ -16,15 +18,39 @@ def get_redis_client(db=0): return client -class Subscription: - def __init__(self, ch, sub, ): +class RedisPubSub: + def __init__(self, ch, db=10): self.ch = ch + self.db = db + self.redis = get_redis_client(db) + + def subscribe(self, _next, error=None, complete=None): + ps = self.redis.pubsub() + ps.subscribe(self.ch) + sub = Subscription(self, ps) + sub.keep_handle_msg(_next, error, complete) + return sub + + def resubscribe(self, _next, error=None, complete=None): + self.redis = get_redis_client(self.db) + self.subscribe(_next, error, complete) + + def publish(self, data): + data_json = json.dumps(data) + self.redis.publish(self.ch, data_json) + return True + + +class Subscription: + def __init__(self, pb: RedisPubSub, sub: PubSub): + self.pb = pb + self.ch = pb.ch self.sub = sub + self.unsubscribed = False def _handle_msg(self, _next, error, complete): """ handle arg is the pub published - :param _next: next msg handler :param error: error msg handler :param complete: complete msg handler @@ -53,9 +79,12 @@ class Subscription: error(msg, item) logger.error('Subscribe handler handle msg error: {}'.format(e)) except Exception as e: - # 正常的 websocket 断开时, redis 会断开连接,避免日志太多 - # logger.error('Consume msg error: {}'.format(e)) - pass + if self.unsubscribed: + logger.debug('Subscription unsubscribed') + else: + logger.error('Consume msg error: {}'.format(e)) + self.retry(_next, error, complete) + return try: complete() @@ -75,25 +104,22 @@ class Subscription: return t def unsubscribe(self): + self.unsubscribed = True try: self.sub.close() except Exception as e: logger.error('Unsubscribe msg error: {}'.format(e)) + def retry(self, _next, error, complete): + logger.info('Retry subscribe channel: {}'.format(self.ch)) + times = 0 -class RedisPubSub: - def __init__(self, ch, db=10): - self.ch = ch - self.redis = get_redis_client(db) - - def subscribe(self, _next, error=None, complete=None): - ps = self.redis.pubsub() - ps.subscribe(self.ch) - sub = Subscription(self.ch, ps) - sub.keep_handle_msg(_next, error, complete) - return sub - - def publish(self, data): - data_json = json.dumps(data) - self.redis.publish(self.ch, data_json) - return True + while True: + try: + self.unsubscribe() + self.pb.resubscribe(_next, error, complete) + break + except Exception as e: + logger.error('Retry #{} {} subscribe channel error: {}'.format(times, self.ch, e)) + times += 1 + time.sleep(times * 2) diff --git a/apps/orgs/signal_handlers/common.py b/apps/orgs/signal_handlers/common.py index 876613ba1..05e628883 100644 --- a/apps/orgs/signal_handlers/common.py +++ b/apps/orgs/signal_handlers/common.py @@ -1,29 +1,27 @@ # -*- coding: utf-8 -*- # -import threading from collections import defaultdict from functools import partial -from django.dispatch import receiver -from django.utils.functional import LazyObject from django.db.models.signals import m2m_changed from django.db.models.signals import post_save, pre_delete +from django.dispatch import receiver +from django.utils.functional import LazyObject -from orgs.utils import tmp_to_org -from orgs.models import Organization -from orgs.hands import set_current_org, Node, get_current_org -from perms.models import (AssetPermission, ApplicationPermission) -from users.models import UserGroup, User +from assets.models import CommandFilterRule from assets.models import SystemUser from common.const.signals import PRE_REMOVE, POST_REMOVE from common.decorator import on_transaction_commit from common.signals import django_ready from common.utils import get_logger from common.utils.connection import RedisPubSub -from assets.models import CommandFilterRule +from orgs.hands import set_current_org, Node, get_current_org +from orgs.models import Organization +from orgs.utils import tmp_to_org +from perms.models import (AssetPermission, ApplicationPermission) +from users.models import UserGroup, User from users.signals import post_user_leave_org - logger = get_logger(__file__) @@ -47,14 +45,9 @@ def expire_orgs_mapping_for_memory(org_id): def subscribe_orgs_mapping_expire(sender, **kwargs): logger.debug("Start subscribe for expire orgs mapping from memory") - def keep_subscribe_org_mapping(): - orgs_mapping_for_memory_pub_sub.subscribe( - lambda org_id: Organization.expire_orgs_mapping() - ) - - t = threading.Thread(target=keep_subscribe_org_mapping) - t.daemon = True - t.start() + orgs_mapping_for_memory_pub_sub.subscribe( + lambda org_id: Organization.expire_orgs_mapping() + ) # 创建对应的root @@ -85,7 +78,7 @@ def on_org_delete(sender, instance, **kwargs): def _remove_users(model, users, org, user_field_name='users'): with tmp_to_org(org): if not isinstance(users, (tuple, list, set)): - users = (users, ) + users = (users,) user_field = getattr(model, user_field_name) m2m_model = user_field.through diff --git a/apps/settings/signal_handlers.py b/apps/settings/signal_handlers.py index 18449a771..c963488f1 100644 --- a/apps/settings/signal_handlers.py +++ b/apps/settings/signal_handlers.py @@ -1,19 +1,18 @@ # -*- coding: utf-8 -*- # import json -import threading from django.conf import LazySettings +from django.db.models.signals import post_save, pre_save from django.db.utils import ProgrammingError, OperationalError from django.dispatch import receiver -from django.db.models.signals import post_save, pre_save from django.utils.functional import LazyObject -from jumpserver.utils import current_request from common.decorator import on_transaction_commit +from common.signals import django_ready from common.utils import get_logger, ssh_key_gen from common.utils.connection import RedisPubSub -from common.signals import django_ready +from jumpserver.utils import current_request from .models import Setting logger = get_logger(__file__) @@ -81,12 +80,7 @@ def on_create_set_created_by(sender, instance=None, **kwargs): def subscribe_settings_change(sender, **kwargs): logger.debug("Start subscribe setting change") - def keep_subscribe_settings_change(): - setting_pub_sub.subscribe(lambda name: Setting.refresh_item(name)) - - t = threading.Thread(target=keep_subscribe_settings_change) - t.daemon = True - t.start() + setting_pub_sub.subscribe(lambda name: Setting.refresh_item(name)) @receiver(django_ready)