Browse Source

fix: 修复 close connection 的问题

pull/7289/head^2
ibuler 3 years ago committed by Jiangjie.Bai
parent
commit
c8a76c9afb
  1. 3
      apps/assets/signals_handler/node_assets_mapping.py
  2. 4
      apps/notifications/ws.py
  3. 4
      apps/settings/signals_handler.py

3
apps/assets/signals_handler/node_assets_mapping.py

@ -84,10 +84,10 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs):
subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() subscribe = node_assets_mapping_for_memory_pub_sub.subscribe()
msgs = subscribe.listen() msgs = subscribe.listen()
# 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中
close_old_connections()
for message in msgs: for message in msgs:
if message["type"] != "message": if message["type"] != "message":
continue continue
close_old_connections()
org_id = message['data'].decode() org_id = message['data'].decode()
root_org_id = Organization.ROOT_ID root_org_id = Organization.ROOT_ID
Node.expire_node_all_asset_ids_mapping_from_memory(org_id) Node.expire_node_all_asset_ids_mapping_from_memory(org_id)
@ -96,6 +96,7 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs):
"Expire node assets id mapping from memory of org={}, pid={}" "Expire node assets id mapping from memory of org={}, pid={}"
"".format(str(org_id), os.getpid()) "".format(str(org_id), os.getpid())
) )
close_old_connections()
except Exception as e: except Exception as e:
logger.exception(f'subscribe_node_assets_mapping_expire: {e}') logger.exception(f'subscribe_node_assets_mapping_expire: {e}')
Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory() Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory()

4
apps/notifications/ws.py

@ -52,11 +52,10 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
try: try:
msgs = self.chan.listen() msgs = self.chan.listen()
# 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中
close_old_connections()
for message in msgs: for message in msgs:
if message['type'] != 'message': if message['type'] != 'message':
continue continue
close_old_connections()
try: try:
msg = json.loads(message['data'].decode()) msg = json.loads(message['data'].decode())
except json.JSONDecoder as e: except json.JSONDecoder as e:
@ -70,6 +69,7 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
logger.debug('Message users: {}'.format(users)) logger.debug('Message users: {}'.format(users))
if user_id in users: if user_id in users:
self.send_unread_msg_count() self.send_unread_msg_count()
close_old_connections()
except ConnectionError: except ConnectionError:
logger.error('Redis chan closed') logger.error('Redis chan closed')
finally: finally:

4
apps/settings/signals_handler.py

@ -86,17 +86,17 @@ def subscribe_settings_change(sender, **kwargs):
sub = setting_pub_sub.subscribe() sub = setting_pub_sub.subscribe()
msgs = sub.listen() msgs = sub.listen()
# 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中
close_old_connections()
for msg in msgs: for msg in msgs:
if msg["type"] != "message": if msg["type"] != "message":
continue continue
close_old_connections()
item = msg['data'].decode() item = msg['data'].decode()
logger.debug("Found setting change: {}".format(str(item))) logger.debug("Found setting change: {}".format(str(item)))
Setting.refresh_item(item) Setting.refresh_item(item)
close_old_connections()
except Exception as e: except Exception as e:
logger.exception(f'subscribe_settings_change: {e}') logger.exception(f'subscribe_settings_change: {e}')
Setting.refresh_all_settings() Setting.refresh_all_settings()
finally:
close_old_connections() close_old_connections()
t = threading.Thread(target=keep_subscribe_settings_change) t = threading.Thread(target=keep_subscribe_settings_change)

Loading…
Cancel
Save