mirror of https://github.com/jumpserver/jumpserver
perf: 修改 signal handler
@ -2,9 +2,8 @@ from django.db.models.signals import pre_save, post_save
from django.dispatch import receiver
from assets.models import Asset
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.utils import get_logger
from .automations.push_account.manager import PushAccountManager
from .models import Account
logger = get_logger(__name__)
@ -90,6 +90,11 @@ class Migration(migrations.Migration):
field=models.CharField(db_index=True, max_length=1024, verbose_name='Address'),
@ -105,7 +105,7 @@ class Asset(NodesRelationMixin, AbsConnectivity, JMSOrgBaseModel):
Type = const.AllTypes
name = models.CharField(max_length=128, verbose_name=_('Name'))
address = models.CharField(max_length=128, verbose_name=_('IP'), db_index=True)
address = models.CharField(max_length=1024, verbose_name=_('Address'), db_index=True)
platform = models.ForeignKey(Platform, on_delete=models.PROTECT, verbose_name=_("Platform"), related_name='assets')
domain = models.ForeignKey("assets.Domain", null=True, blank=True, related_name='assets',
verbose_name=_("Domain"), on_delete=models.SET_NULL)
@ -1,11 +1,14 @@
# -*- coding: utf-8 -*-
from django.db.models.signals import post_save, m2m_changed, pre_delete, post_delete, pre_save
from django.db.models.signals import (
post_save, m2m_changed, pre_delete, post_delete, pre_save
from django.dispatch import receiver
from assets.models import Asset, Node, Cloud, Device, Host, Web, Database
from assets.tasks.ping import test_assets_connectivity_util
from common.const.signals import POST_ADD, POST_REMOVE, PRE_REMOVE
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.utils import get_logger
logger = get_logger(__file__)
@ -28,8 +31,7 @@ def on_asset_create(sender, instance=None, created=False, **kwargs):
logger.info("Asset create signal recv: {}".format(instance))
# 获取资产硬件信息
# update_assets_fact_util.delay([instance])
# test_asset_connectivity_util.delay([instance])
# 确保资产存在一个节点
has_node = instance.nodes.all().exists()
@ -60,34 +62,6 @@ def on_asset_nodes_add(instance, action, reverse, pk_set, **kwargs):
for node in nodes:
nodes_ancestors_keys.update(Node.get_node_ancestor_keys(node, with_self=True))
# 查询所有祖先节点关联的系统用户,都是要跟资产建立关系的
# system_user_ids = SystemUser.objects.filter(
# nodes__key__in=nodes_ancestors_keys
# ).distinct().values_list('id', flat=True)
# 查询所有已存在的关系
# m2m_model = SystemUser.assets.through
# exist = set(m2m_model.objects.filter(
# systemuser_id__in=system_user_ids, asset_id__in=asset_ids
# ).values_list('systemuser_id', 'asset_id'))
# TODO 优化
# to_create = []
# for system_user_id in system_user_ids:
# asset_ids_to_push = []
# for asset_id in asset_ids:
# if (system_user_id, asset_id) in exist:
# continue
# asset_ids_to_push.append(asset_id)
# to_create.append(m2m_model(
# systemuser_id=system_user_id,
# asset_id=asset_id,
# org_id=instance.org_id
# ))
# if asset_ids_to_push:
# push_system_user_to_assets.delay(system_user_id, asset_ids_to_push)
# m2m_model.objects.bulk_create(to_create)
RELATED_NODE_IDS = '_related_node_ids'
@ -1,23 +1,22 @@
# ~*~ coding: utf-8 ~*~
from celery import shared_task
from django.utils.translation import gettext_noop, gettext_lazy as _
from django.utils.translation import gettext_noop
from common.utils import get_logger
from assets.const import AutomationTypes, GATEWAY_NAME
from common.utils import get_logger
from orgs.utils import org_aware_func
from .common import automation_execute_start
logger = get_logger(__file__)
__all__ = [
def test_connectivity_util(assets, tp, task_name, local_port=None):
def _test_connectivity_util(assets, tp, task_name, local_port=None):
if not assets:
@ -30,8 +29,9 @@ def test_connectivity_util(assets, tp, task_name, local_port=None):
automation_execute_start(task_name, tp, child_snapshot)
def test_asset_connectivity_util(assets, task_name=None, local_port=None):
def test_assets_connectivity_task(assets, task_name=None, local_port=None):
from assets.models import PingAutomation
if task_name is None:
task_name = gettext_noop("Test assets connectivity ")
@ -39,26 +39,24 @@ def test_asset_connectivity_util(assets, task_name=None, local_port=None):
task_name = PingAutomation.generate_unique_name(task_name)
gateway_assets = assets.filter(platform__name=GATEWAY_NAME)
gateway_assets, AutomationTypes.ping_gateway, task_name, local_port
non_gateway_assets = assets.exclude(platform__name=GATEWAY_NAME)
test_connectivity_util(non_gateway_assets, AutomationTypes.ping, task_name)
_test_connectivity_util(non_gateway_assets, AutomationTypes.ping, task_name)
@shared_task(queue="ansible", verbose_name=_('Manually test the connectivity of a asset'))
def test_assets_connectivity_manual(asset_ids, local_port=None):
from assets.models import Asset
assets = Asset.objects.filter(id__in=asset_ids)
task_name = gettext_noop("Test assets connectivity ")
test_asset_connectivity_util(assets, task_name, local_port)
test_assets_connectivity_task.delay(assets, task_name, local_port)
@shared_task(queue="ansible", verbose_name=_('Manually test the connectivity of assets under a node'))
def test_node_assets_connectivity_manual(node_id, local_port=None):
from assets.models import Node
node = Node.objects.get(id=node_id)
task_name = gettext_noop("Test if the assets under the node are connectable ")
assets = node.get_all_assets()
test_asset_connectivity_util(assets, task_name, local_port)
test_assets_connectivity_task.delay(*assets, task_name, local_port)
@ -1,68 +0,0 @@
# -*- coding: utf-8 -*-
import functools
import threading
import time
import uuid
from django.core.cache import cache
from django.db import transaction
def on_transaction_commit(func):
如果不调用on_commit, 对象创建时添加多对多字段值失败
def inner(*args, **kwargs):
transaction.on_commit(lambda: func(*args, **kwargs))
return inner
class Singleton(object):
""" 单例类 """
def __init__(self, cls):
self._cls = cls
self._instance = {}
def __call__(self):
if self._cls not in self._instance:
self._instance[self._cls] = self._cls()
return self._instance[self._cls]
def _run_func_if_is_last(ttl, func, *args, **kwargs):
ix = uuid.uuid4().__str__()
key = f'DELAY_RUN_{func.__name__}'
cache.set(key, ix, ttl)
st = (ttl - 2 > 1) and ttl - 2 or 1
got = cache.get(key, None)
if ix == got:
func(*args, **kwargs)
def delay_run(ttl=5):
def inner(func):
def wrapper(*args, **kwargs):
t = threading.Thread(target=_run_func_if_is_last, args=(ttl, func, *args), kwargs=kwargs)
return wrapper
return inner
def run_it_many(username, year=2000):
print("Hello, %s, now is %s" % (username, year))
if __name__ == '__main__':
for i in range(20):
run_it_many('test', 2000)
@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
import functools
import inspect
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from django.core.cache import cache
from django.db import transaction
def on_transaction_commit(func):
如果不调用on_commit, 对象创建时添加多对多字段值失败
def inner(*args, **kwargs):
transaction.on_commit(lambda: func(*args, **kwargs))
return inner
class Singleton(object):
""" 单例类 """
def __init__(self, cls):
self._cls = cls
self._instance = {}
def __call__(self):
if self._cls not in self._instance:
self._instance[self._cls] = self._cls()
return self._instance[self._cls]
def _run_func_if_is_last(ttl, func, *args, **kwargs):
ix = uuid.uuid4().__str__()
key = f'DELAY_RUN_{func.__name__}'
cache.set(key, ix, ttl)
st = (ttl - 2 > 1) and ttl - 2 or 2
got = cache.get(key, None)
if ix == got:
func(*args, **kwargs)
executor = ThreadPoolExecutor(10)
def delay_run(ttl=5):
def inner(func):
def wrapper(*args, **kwargs):
executor.submit(_run_func_if_is_last, ttl, func, *args, **kwargs)
return wrapper
return inner
def _merge_run(ttl, func, *args, **kwargs):
if not args or not isinstance(args[0], (list, tuple)):
raise ValueError('args[0] must be list or tuple')
key = f'DELAY_MERGE_RUN_{func.__name__}'
ix = uuid.uuid4().__str__()
value = cache.get(key, [])
st = (ttl - 2 > 1) and ttl - 2 or 2
got = cache.get(key, None)
if ix == got:
func(*args, **kwargs)
def merge_delay_run(ttl):
合并 func 参数,延迟执行, 在 ttl 秒内, 只执行最后一次
func 参数必须是 *args
:param ttl:
def inner(func):
sigs = inspect.signature(func)
if len(sigs.parameters) != 1:
raise ValueError('func must have one arguments: %s' % func.__name__)
param = list(sigs.parameters.values())[0]
if not str(param).startswith('*'):
raise ValueError('func args must be startswith *: %s' % func.__name__)
def wrapper(*args):
key = f'DELAY_MERGE_RUN_{func.__name__}'
values = cache.get(key, [])
new_arg = [*values, *args]
cache.set(key, new_arg, ttl)
return delay_run(ttl)(func)(*new_arg)
return wrapper
return inner
def delay_run(ttl=5):
延迟执行函数, 在 ttl 秒内, 只执行最后一次
:param ttl:
def inner(func):
def wrapper(*args, **kwargs):
executor.submit(_run_func_if_is_last, ttl, func, *args, **kwargs)
return wrapper
return inner
def test_delay_run(username, year=2000):
print("Hello, %s, now is %s" % (username, year))
def test_merge_delay_run(*users):
name = ','.join(users)
print("Hello, %s, now is %s" % (name, time.time()))
@ -7,7 +7,7 @@ from django.db.models.signals import post_save
from django.dispatch import receiver
from django.utils.functional import LazyObject
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.utils import get_logger
from common.utils.connection import RedisPubSub
from notifications.backends import BACKEND
@ -10,7 +10,7 @@ from django.dispatch import receiver
from django.utils.functional import LazyObject
from common.const.signals import PRE_REMOVE, POST_REMOVE
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.signals import django_ready
from common.utils import get_logger
from common.utils.connection import RedisPubSub
@ -4,27 +4,25 @@ from collections import defaultdict
from django.conf import settings
from django.core.cache import cache
from users.models import User
from assets.models import Asset
from assets.utils import NodeAssetsUtil
from common.db.models import output_as_string
from common.decorators import on_transaction_commit
from common.utils import get_logger
from common.utils.common import lazyproperty, timeit
from orgs.models import Organization
from orgs.utils import (
from common.decorator import on_transaction_commit
from common.utils import get_logger
from common.utils.common import lazyproperty, timeit
from common.db.models import output_as_string
from perms.locks import UserGrantedTreeRebuildLock
from perms.models import (
from users.models import User
from .permission import AssetPermissionUtil
logger = get_logger(__name__)
@ -78,7 +76,7 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin):
end = time.time()
'Refresh user [{user}] org [{org}] perm tree, user {use_time:.2f}s'
''.format(user=self.user, org=org, use_time=end-start)
''.format(user=self.user, org=org, use_time=end - start)
def _clean_user_perm_tree_for_legacy_org(self):
@ -315,9 +313,9 @@ class UserPermTreeBuildUtil(object):
asset_node_pairs = Asset.nodes.through.objects \
.filter(asset_id__in=self.direct_asset_ids) \
).values_list('str_asset_id', 'str_node_id')
).values_list('str_asset_id', 'str_node_id')
asset_node_pairs = list(asset_node_pairs)
return asset_node_pairs
@ -8,7 +8,7 @@ from django.db.utils import ProgrammingError, OperationalError
from django.dispatch import receiver
from django.utils.functional import LazyObject
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.signals import django_ready
from common.utils import get_logger, ssh_key_gen
from common.utils.connection import RedisPubSub
@ -2,7 +2,7 @@ from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from assets.models import Asset
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.signals import django_ready
from common.utils import get_logger
from ..utils import db_port_manager
@ -2,7 +2,7 @@ from django.db.models.signals import post_save
from django.dispatch import receiver
from django.utils.functional import LazyObject
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.utils import get_logger
from common.utils.connection import RedisPubSub
from ..models import Task
@ -1,16 +1,16 @@
import os
import time
import socket
import threading
import time
from django.conf import settings
from common.db.utils import close_old_connections
from common.decorator import Singleton
from common.utils import get_disk_usage, get_cpu_load, get_memory_usage, get_logger
from .serializers.terminal import TerminalRegistrationSerializer, StatSerializer
from common.decorators import Singleton
from common.utils import get_disk_usage, get_cpu_load, get_memory_usage
from .const import TerminalType
from .models import Terminal
from .serializers.terminal import TerminalRegistrationSerializer, StatSerializer
__all__ = ['CoreTerminal', 'CeleryTerminal']
@ -4,7 +4,7 @@ from django.utils.translation import ugettext_lazy as _
from assets.const import DatabaseTypes
from assets.models import Database
from common.decorator import Singleton
from common.decorators import Singleton
from common.exceptions import JMSException
from common.utils import get_logger, get_object_or_none
from orgs.utils import tmp_to_root_org
@ -1,9 +1,8 @@
# -*- coding: utf-8 -*-
from django.dispatch import receiver
from django.db.models.signals import post_save, m2m_changed
from common.decorator import on_transaction_commit
from common.decorators import on_transaction_commit
from common.utils import get_logger
from tickets.models import Ticket
@ -1,20 +1,19 @@
# -*- coding: utf-8 -*-
from django.dispatch import receiver
from django_auth_ldap.backend import populate_user
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django_cas_ng.signals import cas_user_authenticated
from django.db.models.signals import post_save
from django.dispatch import receiver
from django_auth_ldap.backend import populate_user
from django_cas_ng.signals import cas_user_authenticated
from authentication.backends.oauth2.signals import oauth2_create_or_update_user
from authentication.backends.oidc.signals import openid_create_or_update_user
from authentication.backends.saml2.signals import saml2_create_or_update_user
from authentication.backends.oauth2.signals import oauth2_create_or_update_user
from common.decorators import on_transaction_commit
from common.utils import get_logger
from common.decorator import on_transaction_commit
from .signals import post_user_create
from .models import User, UserPasswordHistory
from .signals import post_user_create
logger = get_logger(__file__)
@ -47,9 +46,9 @@ def user_authenticated_handle(user, created, source, attrs=None, **kwargs):
@receiver(post_save, sender=User)
def save_passwd_change(sender, instance: User, **kwargs):
passwords = UserPasswordHistory.objects\
passwords = UserPasswordHistory.objects \
.filter(user=instance) \
.order_by('-date_created') \
.values_list('password', flat=True)
passwords = passwords[:int(settings.OLD_PASSWORD_HISTORY_LIMIT_COUNT)]
Reference in New Issue