mirror of https://github.com/jumpserver/jumpserver
fix: 消息订阅redis连接未关闭
parent
54b2360843
commit
2869338e2c
|
@ -23,6 +23,7 @@ class RedisPubSub:
|
||||||
def __init__(self, ch, db=10):
|
def __init__(self, ch, db=10):
|
||||||
self.ch = ch
|
self.ch = ch
|
||||||
self.redis = get_redis_client(db)
|
self.redis = get_redis_client(db)
|
||||||
|
self.subscriber = None
|
||||||
|
|
||||||
def subscribe(self):
|
def subscribe(self):
|
||||||
ps = self.redis.pubsub()
|
ps = self.redis.pubsub()
|
||||||
|
@ -41,7 +42,9 @@ class RedisPubSub:
|
||||||
:param handle: lambda item: do_something
|
:param handle: lambda item: do_something
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
self.close_handle_msg()
|
||||||
sub = self.subscribe()
|
sub = self.subscribe()
|
||||||
|
self.subscriber = sub
|
||||||
msgs = sub.listen()
|
msgs = sub.listen()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -65,3 +68,8 @@ class RedisPubSub:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Redis observer close error: ", e)
|
logger.error("Redis observer close error: ", e)
|
||||||
|
|
||||||
|
def close_handle_msg(self):
|
||||||
|
if self.subscriber:
|
||||||
|
self.subscriber.close()
|
||||||
|
self.subscriber = None
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ from channels.generic.websocket import JsonWebsocketConsumer
|
||||||
from common.utils import get_logger
|
from common.utils import get_logger
|
||||||
from common.db.utils import safe_db_connection
|
from common.db.utils import safe_db_connection
|
||||||
from .site_msg import SiteMessageUtil
|
from .site_msg import SiteMessageUtil
|
||||||
from .signals_handler import new_site_msg_chan
|
from .signals_handler import NewSiteMsgSubPub
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -13,6 +13,10 @@ logger = get_logger(__name__)
|
||||||
class SiteMsgWebsocket(JsonWebsocketConsumer):
|
class SiteMsgWebsocket(JsonWebsocketConsumer):
|
||||||
refresh_every_seconds = 10
|
refresh_every_seconds = 10
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(SiteMsgWebsocket, self).__init__(*args, **kwargs)
|
||||||
|
self.subscriber = None
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
user = self.scope["user"]
|
user = self.scope["user"]
|
||||||
if user.is_authenticated:
|
if user.is_authenticated:
|
||||||
|
@ -23,6 +27,10 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
|
||||||
else:
|
else:
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
|
def disconnect(self, code):
|
||||||
|
if self.subscriber:
|
||||||
|
self.subscriber.close_handle_msg()
|
||||||
|
|
||||||
def receive(self, text_data=None, bytes_data=None, **kwargs):
|
def receive(self, text_data=None, bytes_data=None, **kwargs):
|
||||||
data = json.loads(text_data)
|
data = json.loads(text_data)
|
||||||
refresh_every_seconds = data.get('refresh_every_seconds')
|
refresh_every_seconds = data.get('refresh_every_seconds')
|
||||||
|
@ -56,4 +64,6 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
|
||||||
if user_id in users:
|
if user_id in users:
|
||||||
ws.send_unread_msg_count()
|
ws.send_unread_msg_count()
|
||||||
|
|
||||||
new_site_msg_chan.keep_handle_msg(handle_new_site_msg_recv)
|
subscriber = NewSiteMsgSubPub()
|
||||||
|
self.subscriber = subscriber
|
||||||
|
subscriber.keep_handle_msg(handle_new_site_msg_recv)
|
||||||
|
|
Loading…
Reference in New Issue