fix: 修复 redis 连接过多的问题

pull/7579/head
ibuler 3 years ago committed by Jiangjie.Bai
parent b3397c6aeb
commit 55fae1667d

@ -1,4 +1,5 @@
import json import json
import threading
import redis import redis
from django.conf import settings from django.conf import settings
@ -19,49 +20,83 @@ def get_redis_client(db):
return rc return rc
class RedisPubSub: class Subscription:
def __init__(self, ch, db=10): def __init__(self, ch, sub, ):
self.ch = ch self.ch = ch
self.redis = get_redis_client(db) self.sub = sub
def subscribe(self):
ps = self.redis.pubsub()
ps.subscribe(self.ch)
return ps
def publish(self, data): def _handle_msg(self, _next, error, complete):
data_json = json.dumps(data)
self.redis.publish(self.ch, data_json)
return True
def keep_handle_msg(self, handle):
""" """
handle arg is the pub published handle arg is the pub published
:param handle: lambda item: do_something :param _next: next msg handler
:param error: error msg handler
:param complete: complete msg handler
:return: :return:
""" """
sub = self.subscribe() msgs = self.sub.listen()
msgs = sub.listen()
if error is None:
error = lambda m, i: None
if complete is None:
complete = lambda: None
try: try:
for msg in msgs: for msg in msgs:
if msg["type"] != "message": if msg["type"] != "message":
continue continue
item = None
try: try:
item_json = msg['data'].decode() item_json = msg['data'].decode()
item = json.loads(item_json) item = json.loads(item_json)
with safe_db_connection(): with safe_db_connection():
handle(item) _next(item)
except Exception as e: except Exception as e:
error(msg, item)
logger.error('Subscribe handler handle msg error: ', e) logger.error('Subscribe handler handle msg error: ', e)
except Exception as e: except Exception as e:
logger.error('Consume msg error: ', e) logger.error('Consume msg error: ', e)
try: try:
sub.close() complete()
except Exception as e:
logger.error('Complete subscribe error: {}'.format(e))
pass
try:
self.unsubscribe()
except Exception as e:
logger.error("Redis observer close error: {}".format(e))
def keep_handle_msg(self, _next, error, complete):
t = threading.Thread(target=self._handle_msg, args=(_next, error, complete))
t.daemon = True
t.start()
return t
def unsubscribe(self):
try:
self.sub.close()
except Exception as e: except Exception as e:
logger.error("Redis observer close error: ", e) logger.error('Unsubscribe msg error: {}'.format(e))
class RedisPubSub:
def __init__(self, ch, db=10):
self.ch = ch
self.redis = get_redis_client(db)
def subscribe(self, _next, error=None, complete=None):
ps = self.redis.pubsub()
sub = Subscription(self.ch, ps)
sub.keep_handle_msg(_next, error, complete)
return sub
def publish(self, data):
data_json = json.dumps(data)
self.redis.publish(self.ch, data_json)
return True

@ -12,14 +12,13 @@ logger = get_logger(__name__)
class SiteMsgWebsocket(JsonWebsocketConsumer): class SiteMsgWebsocket(JsonWebsocketConsumer):
refresh_every_seconds = 10 refresh_every_seconds = 10
sub = None
def connect(self): def connect(self):
user = self.scope["user"] user = self.scope["user"]
if user.is_authenticated: if user.is_authenticated:
self.accept() self.accept()
self.sub = self.watch_recv_new_site_msg()
thread = threading.Thread(target=self.watch_recv_new_site_msg)
thread.start()
else: else:
self.close() self.close()
@ -56,4 +55,9 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
if user_id in users: if user_id in users:
ws.send_unread_msg_count() ws.send_unread_msg_count()
new_site_msg_chan.keep_handle_msg(handle_new_site_msg_recv) return new_site_msg_chan.subscribe(handle_new_site_msg_recv)
def disconnect(self, code):
if self.sub:
self.sub.unsubscribe()

Loading…
Cancel
Save