fix: 修复 redis 连接导致的 pub sub 错误

pull/9089/head
ibuler 2022-11-17 13:44:50 +08:00 committed by Jiangjie.Bai
parent 90c48d303e
commit a42641ca9a
4 changed files with 70 additions and 64 deletions

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import os
import threading
from django.db.models.signals import ( from django.db.models.signals import (
m2m_changed, post_save, post_delete m2m_changed, post_save, post_delete
@ -9,15 +7,15 @@ from django.db.models.signals import (
from django.dispatch import receiver from django.dispatch import receiver
from django.utils.functional import LazyObject from django.utils.functional import LazyObject
from common.signals import django_ready
from common.utils.connection import RedisPubSub
from common.utils import get_logger
from assets.models import Asset, Node from assets.models import Asset, Node
from common.signals import django_ready
from common.utils import get_logger
from common.utils.connection import RedisPubSub
from orgs.models import Organization from orgs.models import Organization
logger = get_logger(__file__) logger = get_logger(__file__)
# clear node assets mapping for memory # clear node assets mapping for memory
# ------------------------------------ # ------------------------------------
@ -78,9 +76,4 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs):
Node.expire_node_all_asset_ids_mapping_from_memory(org_id) Node.expire_node_all_asset_ids_mapping_from_memory(org_id)
Node.expire_node_all_asset_ids_mapping_from_memory(root_org_id) Node.expire_node_all_asset_ids_mapping_from_memory(root_org_id)
def keep_subscribe_node_assets_relation(): node_assets_mapping_for_memory_pub_sub.subscribe(handle_node_relation_change)
node_assets_mapping_for_memory_pub_sub.subscribe(handle_node_relation_change)
t = threading.Thread(target=keep_subscribe_node_assets_relation)
t.daemon = True
t.start()

View File

@ -1,8 +1,10 @@
import json import json
import threading import threading
import redis import time
import redis
from django.core.cache import cache from django.core.cache import cache
from redis.client import PubSub
from common.db.utils import safe_db_connection from common.db.utils import safe_db_connection
from common.utils import get_logger from common.utils import get_logger
@ -16,15 +18,39 @@ def get_redis_client(db=0):
return client return client
class Subscription: class RedisPubSub:
def __init__(self, ch, sub, ): def __init__(self, ch, db=10):
self.ch = ch self.ch = ch
self.db = db
self.redis = get_redis_client(db)
def subscribe(self, _next, error=None, complete=None):
ps = self.redis.pubsub()
ps.subscribe(self.ch)
sub = Subscription(self, ps)
sub.keep_handle_msg(_next, error, complete)
return sub
def resubscribe(self, _next, error=None, complete=None):
self.redis = get_redis_client(self.db)
self.subscribe(_next, error, complete)
def publish(self, data):
data_json = json.dumps(data)
self.redis.publish(self.ch, data_json)
return True
class Subscription:
def __init__(self, pb: RedisPubSub, sub: PubSub):
self.pb = pb
self.ch = pb.ch
self.sub = sub self.sub = sub
self.unsubscribed = False
def _handle_msg(self, _next, error, complete): def _handle_msg(self, _next, error, complete):
""" """
handle arg is the pub published handle arg is the pub published
:param _next: next msg handler :param _next: next msg handler
:param error: error msg handler :param error: error msg handler
:param complete: complete msg handler :param complete: complete msg handler
@ -53,9 +79,12 @@ class Subscription:
error(msg, item) error(msg, item)
logger.error('Subscribe handler handle msg error: {}'.format(e)) logger.error('Subscribe handler handle msg error: {}'.format(e))
except Exception as e: except Exception as e:
# 正常的 websocket 断开时, redis 会断开连接,避免日志太多 if self.unsubscribed:
# logger.error('Consume msg error: {}'.format(e)) logger.debug('Subscription unsubscribed')
pass else:
logger.error('Consume msg error: {}'.format(e))
self.retry(_next, error, complete)
return
try: try:
complete() complete()
@ -75,25 +104,22 @@ class Subscription:
return t return t
def unsubscribe(self): def unsubscribe(self):
self.unsubscribed = True
try: try:
self.sub.close() self.sub.close()
except Exception as e: except Exception as e:
logger.error('Unsubscribe msg error: {}'.format(e)) logger.error('Unsubscribe msg error: {}'.format(e))
def retry(self, _next, error, complete):
logger.info('Retry subscribe channel: {}'.format(self.ch))
times = 0
class RedisPubSub: while True:
def __init__(self, ch, db=10): try:
self.ch = ch self.unsubscribe()
self.redis = get_redis_client(db) self.pb.resubscribe(_next, error, complete)
break
def subscribe(self, _next, error=None, complete=None): except Exception as e:
ps = self.redis.pubsub() logger.error('Retry #{} {} subscribe channel error: {}'.format(times, self.ch, e))
ps.subscribe(self.ch) times += 1
sub = Subscription(self.ch, ps) time.sleep(times * 2)
sub.keep_handle_msg(_next, error, complete)
return sub
def publish(self, data):
data_json = json.dumps(data)
self.redis.publish(self.ch, data_json)
return True

View File

@ -1,29 +1,27 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import threading
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from django.dispatch import receiver
from django.utils.functional import LazyObject
from django.db.models.signals import m2m_changed from django.db.models.signals import m2m_changed
from django.db.models.signals import post_save, pre_delete from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from django.utils.functional import LazyObject
from orgs.utils import tmp_to_org from assets.models import CommandFilterRule
from orgs.models import Organization
from orgs.hands import set_current_org, Node, get_current_org
from perms.models import (AssetPermission, ApplicationPermission)
from users.models import UserGroup, User
from assets.models import SystemUser from assets.models import SystemUser
from common.const.signals import PRE_REMOVE, POST_REMOVE from common.const.signals import PRE_REMOVE, POST_REMOVE
from common.decorator import on_transaction_commit from common.decorator import on_transaction_commit
from common.signals import django_ready from common.signals import django_ready
from common.utils import get_logger from common.utils import get_logger
from common.utils.connection import RedisPubSub from common.utils.connection import RedisPubSub
from assets.models import CommandFilterRule from orgs.hands import set_current_org, Node, get_current_org
from orgs.models import Organization
from orgs.utils import tmp_to_org
from perms.models import (AssetPermission, ApplicationPermission)
from users.models import UserGroup, User
from users.signals import post_user_leave_org from users.signals import post_user_leave_org
logger = get_logger(__file__) logger = get_logger(__file__)
@ -47,14 +45,9 @@ def expire_orgs_mapping_for_memory(org_id):
def subscribe_orgs_mapping_expire(sender, **kwargs): def subscribe_orgs_mapping_expire(sender, **kwargs):
logger.debug("Start subscribe for expire orgs mapping from memory") logger.debug("Start subscribe for expire orgs mapping from memory")
def keep_subscribe_org_mapping(): orgs_mapping_for_memory_pub_sub.subscribe(
orgs_mapping_for_memory_pub_sub.subscribe( lambda org_id: Organization.expire_orgs_mapping()
lambda org_id: Organization.expire_orgs_mapping() )
)
t = threading.Thread(target=keep_subscribe_org_mapping)
t.daemon = True
t.start()
# 创建对应的root # 创建对应的root
@ -85,7 +78,7 @@ def on_org_delete(sender, instance, **kwargs):
def _remove_users(model, users, org, user_field_name='users'): def _remove_users(model, users, org, user_field_name='users'):
with tmp_to_org(org): with tmp_to_org(org):
if not isinstance(users, (tuple, list, set)): if not isinstance(users, (tuple, list, set)):
users = (users, ) users = (users,)
user_field = getattr(model, user_field_name) user_field = getattr(model, user_field_name)
m2m_model = user_field.through m2m_model = user_field.through

View File

@ -1,19 +1,18 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import json import json
import threading
from django.conf import LazySettings from django.conf import LazySettings
from django.db.models.signals import post_save, pre_save
from django.db.utils import ProgrammingError, OperationalError from django.db.utils import ProgrammingError, OperationalError
from django.dispatch import receiver from django.dispatch import receiver
from django.db.models.signals import post_save, pre_save
from django.utils.functional import LazyObject from django.utils.functional import LazyObject
from jumpserver.utils import current_request
from common.decorator import on_transaction_commit from common.decorator import on_transaction_commit
from common.signals import django_ready
from common.utils import get_logger, ssh_key_gen from common.utils import get_logger, ssh_key_gen
from common.utils.connection import RedisPubSub from common.utils.connection import RedisPubSub
from common.signals import django_ready from jumpserver.utils import current_request
from .models import Setting from .models import Setting
logger = get_logger(__file__) logger = get_logger(__file__)
@ -81,12 +80,7 @@ def on_create_set_created_by(sender, instance=None, **kwargs):
def subscribe_settings_change(sender, **kwargs): def subscribe_settings_change(sender, **kwargs):
logger.debug("Start subscribe setting change") logger.debug("Start subscribe setting change")
def keep_subscribe_settings_change(): setting_pub_sub.subscribe(lambda name: Setting.refresh_item(name))
setting_pub_sub.subscribe(lambda name: Setting.refresh_item(name))
t = threading.Thread(target=keep_subscribe_settings_change)
t.daemon = True
t.start()
@receiver(django_ready) @receiver(django_ready)