mirror of https://github.com/jumpserver/jumpserver
fix: 修复数据库连接没有关闭问题 (#7227)
* fix: 修复数据库连接没有关闭的bug perf: websocket 断开也添加关闭数据库连接 * fix: 修复数据库连接没有关闭问题 Co-authored-by: ibuler <ibuler@qq.com>pull/7229/head
parent
6e5dcc738e
commit
d2df8acd84
|
@ -10,6 +10,7 @@ from django.dispatch import receiver
|
|||
from django.utils.functional import LazyObject
|
||||
|
||||
from common.signals import django_ready
|
||||
from common.db.utils import close_old_connections
|
||||
from common.utils.connection import RedisPubSub
|
||||
from common.utils import get_logger
|
||||
from assets.models import Asset, Node
|
||||
|
@ -77,11 +78,14 @@ def on_node_asset_change(sender, instance, **kwargs):
|
|||
def subscribe_node_assets_mapping_expire(sender, **kwargs):
|
||||
logger.debug("Start subscribe for expire node assets id mapping from memory")
|
||||
|
||||
def keep_subscribe():
|
||||
def keep_subscribe_node_assets_relation():
|
||||
while True:
|
||||
try:
|
||||
subscribe = node_assets_mapping_for_memory_pub_sub.subscribe()
|
||||
for message in subscribe.listen():
|
||||
msgs = subscribe.listen()
|
||||
# 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中
|
||||
close_old_connections()
|
||||
for message in msgs:
|
||||
if message["type"] != "message":
|
||||
continue
|
||||
org_id = message['data'].decode()
|
||||
|
@ -95,7 +99,10 @@ def subscribe_node_assets_mapping_expire(sender, **kwargs):
|
|||
except Exception as e:
|
||||
logger.exception(f'subscribe_node_assets_mapping_expire: {e}')
|
||||
Node.expire_all_orgs_node_all_asset_ids_mapping_from_memory()
|
||||
finally:
|
||||
# 请求结束,关闭连接
|
||||
close_old_connections()
|
||||
|
||||
t = threading.Thread(target=keep_subscribe)
|
||||
t = threading.Thread(target=keep_subscribe_node_assets_relation)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from common.utils import get_logger
|
||||
from django.db import connections
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
||||
|
@ -38,3 +39,8 @@ def get_objects(model, pks):
|
|||
not_found_pks = pks - exists_pks
|
||||
logger.error(f'DoesNotExist: <{model.__name__}: {not_found_pks}>')
|
||||
return objs
|
||||
|
||||
|
||||
def close_old_connections():
|
||||
for conn in connections.all():
|
||||
conn.close_if_unusable_or_obsolete()
|
||||
|
|
|
@ -3,6 +3,7 @@ import json
|
|||
from redis.exceptions import ConnectionError
|
||||
from channels.generic.websocket import JsonWebsocketConsumer
|
||||
|
||||
from common.db.utils import close_old_connections
|
||||
from common.utils import get_logger
|
||||
from .site_msg import SiteMessageUtil
|
||||
from .signals_handler import new_site_msg_chan
|
||||
|
@ -49,27 +50,40 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
|
|||
self.send_unread_msg_count()
|
||||
|
||||
try:
|
||||
for message in self.chan.listen():
|
||||
msgs = self.chan.listen()
|
||||
# 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中
|
||||
close_old_connections()
|
||||
for message in msgs:
|
||||
if message['type'] != 'message':
|
||||
continue
|
||||
|
||||
try:
|
||||
msg = json.loads(message['data'].decode())
|
||||
logger.debug('New site msg recv, may be mine: {}'.format(msg))
|
||||
if not msg:
|
||||
continue
|
||||
users = msg.get('users', [])
|
||||
logger.debug('Message users: {}'.format(users))
|
||||
if user_id in users:
|
||||
self.send_unread_msg_count()
|
||||
except json.JSONDecoder as e:
|
||||
logger.debug('Decode json error: ', e)
|
||||
continue
|
||||
if not msg:
|
||||
continue
|
||||
|
||||
logger.debug('New site msg recv, may be mine: {}'.format(msg))
|
||||
users = msg.get('users', [])
|
||||
logger.debug('Message users: {}'.format(users))
|
||||
if user_id in users:
|
||||
self.send_unread_msg_count()
|
||||
except ConnectionError:
|
||||
logger.debug('Redis chan closed')
|
||||
logger.error('Redis chan closed')
|
||||
finally:
|
||||
logger.info('Notification ws thread end')
|
||||
close_old_connections()
|
||||
|
||||
def disconnect(self, close_code):
|
||||
if self.chan is not None:
|
||||
try:
|
||||
try:
|
||||
if self.chan is not None:
|
||||
self.chan.close()
|
||||
except:
|
||||
pass
|
||||
self.close()
|
||||
self.close()
|
||||
finally:
|
||||
close_old_connections()
|
||||
logger.info('Notification websocket disconnect')
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -137,9 +137,14 @@ def check_server_performance_period():
|
|||
|
||||
@shared_task(queue="ansible")
|
||||
def hello(name, callback=None):
|
||||
from users.models import User
|
||||
import time
|
||||
time.sleep(10)
|
||||
|
||||
count = User.objects.count()
|
||||
print("Hello {}".format(name))
|
||||
print("Count: ", count)
|
||||
time.sleep(1)
|
||||
return count
|
||||
|
||||
|
||||
@shared_task
|
||||
|
|
|
@ -2,12 +2,12 @@ import time
|
|||
import os
|
||||
import threading
|
||||
import json
|
||||
from channels.generic.websocket import JsonWebsocketConsumer
|
||||
|
||||
from common.utils import get_logger
|
||||
|
||||
from common.db.utils import close_old_connections
|
||||
from .celery.utils import get_celery_task_log_path
|
||||
from .ansible.utils import get_ansible_task_log_path
|
||||
from channels.generic.websocket import JsonWebsocketConsumer
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
@ -86,3 +86,4 @@ class TaskLogWebsocket(JsonWebsocketConsumer):
|
|||
def disconnect(self, close_code):
|
||||
self.disconnected = True
|
||||
self.close()
|
||||
close_old_connections()
|
||||
|
|
|
@ -6,6 +6,7 @@ from functools import partial
|
|||
|
||||
from django.dispatch import receiver
|
||||
from django.utils.functional import LazyObject
|
||||
from common.db.utils import close_old_connections
|
||||
from django.db.models.signals import m2m_changed
|
||||
from django.db.models.signals import post_save, post_delete, pre_delete
|
||||
|
||||
|
@ -45,11 +46,14 @@ def expire_orgs_mapping_for_memory(org_id):
|
|||
def subscribe_orgs_mapping_expire(sender, **kwargs):
|
||||
logger.debug("Start subscribe for expire orgs mapping from memory")
|
||||
|
||||
def keep_subscribe():
|
||||
def keep_subscribe_org_mapping():
|
||||
while True:
|
||||
try:
|
||||
subscribe = orgs_mapping_for_memory_pub_sub.subscribe()
|
||||
for message in subscribe.listen():
|
||||
msgs = subscribe.listen()
|
||||
# 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中
|
||||
close_old_connections()
|
||||
for message in msgs:
|
||||
if message['type'] != 'message':
|
||||
continue
|
||||
if message['data'] == b'error':
|
||||
|
@ -59,8 +63,11 @@ def subscribe_orgs_mapping_expire(sender, **kwargs):
|
|||
except Exception as e:
|
||||
logger.exception(f'subscribe_orgs_mapping_expire: {e}')
|
||||
Organization.expire_orgs_mapping()
|
||||
finally:
|
||||
# 结束收关闭连接
|
||||
close_old_connections()
|
||||
|
||||
t = threading.Thread(target=keep_subscribe)
|
||||
t = threading.Thread(target=keep_subscribe_org_mapping)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
|
|
|
@ -6,12 +6,12 @@ import threading
|
|||
from django.dispatch import receiver
|
||||
from django.db.models.signals import post_save, pre_save
|
||||
from django.utils.functional import LazyObject
|
||||
from django.db import close_old_connections
|
||||
|
||||
from jumpserver.utils import current_request
|
||||
from common.decorator import on_transaction_commit
|
||||
from common.utils import get_logger, ssh_key_gen
|
||||
from common.utils.connection import RedisPubSub
|
||||
from common.db.utils import close_old_connections
|
||||
from common.signals import django_ready
|
||||
from .models import Setting
|
||||
|
||||
|
@ -80,12 +80,14 @@ def on_create_set_created_by(sender, instance=None, **kwargs):
|
|||
def subscribe_settings_change(sender, **kwargs):
|
||||
logger.debug("Start subscribe setting change")
|
||||
|
||||
def keep_subscribe():
|
||||
def keep_subscribe_settings_change():
|
||||
while True:
|
||||
try:
|
||||
sub = setting_pub_sub.subscribe()
|
||||
for msg in sub.listen():
|
||||
close_old_connections()
|
||||
msgs = sub.listen()
|
||||
# 开始之前关闭连接,因为server端可能关闭了连接,而 client 还在 CONN_MAX_AGE 中
|
||||
close_old_connections()
|
||||
for msg in msgs:
|
||||
if msg["type"] != "message":
|
||||
continue
|
||||
item = msg['data'].decode()
|
||||
|
@ -93,9 +95,10 @@ def subscribe_settings_change(sender, **kwargs):
|
|||
Setting.refresh_item(item)
|
||||
except Exception as e:
|
||||
logger.exception(f'subscribe_settings_change: {e}')
|
||||
close_old_connections()
|
||||
Setting.refresh_all_settings()
|
||||
finally:
|
||||
close_old_connections()
|
||||
|
||||
t = threading.Thread(target=keep_subscribe)
|
||||
t = threading.Thread(target=keep_subscribe_settings_change)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
|
|
@ -24,9 +24,9 @@ from copy import deepcopy
|
|||
|
||||
from common.const import LDAP_AD_ACCOUNT_DISABLE
|
||||
from common.utils import timeit, get_logger
|
||||
from common.db.utils import close_old_connections
|
||||
from users.utils import construct_user_email
|
||||
from users.models import User
|
||||
from orgs.models import Organization
|
||||
from authentication.backends.ldap import LDAPAuthorizationBackend, LDAPUser
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
@ -331,15 +331,17 @@ class LDAPSyncUtil(object):
|
|||
|
||||
def perform_sync(self):
|
||||
logger.info('Start perform sync ldap users from server to cache')
|
||||
self.pre_sync()
|
||||
try:
|
||||
self.pre_sync()
|
||||
self.sync()
|
||||
self.post_sync()
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
logger.error(error_msg)
|
||||
self.set_task_error_msg(error_msg)
|
||||
self.post_sync()
|
||||
logger.info('End perform sync ldap users from server to cache')
|
||||
finally:
|
||||
logger.info('End perform sync ldap users from server to cache')
|
||||
close_old_connections()
|
||||
|
||||
|
||||
class LDAPImportUtil(object):
|
||||
|
|
|
@ -3,8 +3,12 @@ import time
|
|||
import socket
|
||||
import threading
|
||||
from django.conf import settings
|
||||
from django.db.utils import OperationalError
|
||||
|
||||
from common.db.utils import close_old_connections
|
||||
from common.decorator import Singleton
|
||||
from common.utils import get_disk_usage, get_cpu_load, get_memory_usage, get_logger
|
||||
|
||||
from .serializers.terminal import TerminalRegistrationSerializer, StatusSerializer
|
||||
from .const import TerminalTypeChoices
|
||||
from .models.terminal import Terminal
|
||||
|
@ -52,9 +56,12 @@ class BaseTerminal(object):
|
|||
status_serializer.validated_data.pop('sessions', None)
|
||||
terminal = self.get_or_register_terminal()
|
||||
status_serializer.validated_data['terminal'] = terminal
|
||||
status_serializer.save()
|
||||
|
||||
time.sleep(self.interval)
|
||||
try:
|
||||
status_serializer.save()
|
||||
time.sleep(self.interval)
|
||||
except OperationalError:
|
||||
close_old_connections()
|
||||
|
||||
def get_or_register_terminal(self):
|
||||
terminal = Terminal.objects.filter(
|
||||
|
|
Loading…
Reference in New Issue