diff --git a/apps/assets/signals_handler/node_assets_mapping.py b/apps/assets/signals_handler/node_assets_mapping.py index 30eb9ed0b..e598c8698 100644 --- a/apps/assets/signals_handler/node_assets_mapping.py +++ b/apps/assets/signals_handler/node_assets_mapping.py @@ -84,10 +84,10 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): subscribe = node_assets_mapping_for_memory_pub_sub.subscribe() msgs = subscribe.listen() # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 - close_old_connections() for message in msgs: if message["type"] != "message": continue + close_old_connections() org_id = message['data'].decode() root_org_id = Organization.ROOT_ID Node.expire_node_all_asset_ids_mapping_from_memory(org_id) @@ -96,6 +96,7 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs): "Expire node assets id mapping from memory of org={}, pid={}" "".format(str(org_id), os.getpid()) ) + close_old_connections() except Exception as e: logger.exception(f'subscribe_node_assets_mapping_expire: {e}') Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory() diff --git a/apps/notifications/ws.py b/apps/notifications/ws.py index 423639356..b5cd55fb1 100644 --- a/apps/notifications/ws.py +++ b/apps/notifications/ws.py @@ -52,11 +52,10 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): try: msgs = self.chan.listen() # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 - close_old_connections() for message in msgs: if message['type'] != 'message': continue - + close_old_connections() try: msg = json.loads(message['data'].decode()) except json.JSONDecoder as e: @@ -70,6 +69,7 @@ class SiteMsgWebsocket(JsonWebsocketConsumer): logger.debug('Message users: {}'.format(users)) if user_id in users: self.send_unread_msg_count() + close_old_connections() except ConnectionError: logger.error('Redis chan closed') finally: diff --git a/apps/settings/signals_handler.py b/apps/settings/signals_handler.py index 483b26ee2..b247afd91 100644 --- a/apps/settings/signals_handler.py +++ b/apps/settings/signals_handler.py @@ -86,17 +86,17 @@ def subscribe_settings_change(sender, **kwargs): sub = setting_pub_sub.subscribe() msgs = sub.listen() # 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中 - close_old_connections() for msg in msgs: if msg["type"] != "message": continue + close_old_connections() item = msg['data'].decode() logger.debug("Found setting change: {}".format(str(item))) Setting.refresh_item(item) + close_old_connections() except Exception as e: logger.exception(f'subscribe_settings_change: {e}') Setting.refresh_all_settings() - finally: close_old_connections() t = threading.Thread(target=keep_subscribe_settings_change)