mirror of https://github.com/jumpserver/jumpserver
perf: 修改 signals
parent
37a52c420f
commit
6a0fbc6ac2
|
@ -10,6 +10,7 @@ from common.const.signals import PRE_ADD, POST_REMOVE, PRE_CLEAR
|
||||||
from common.decorators import on_transaction_commit, merge_delay_run
|
from common.decorators import on_transaction_commit, merge_delay_run
|
||||||
from common.utils import get_logger
|
from common.utils import get_logger
|
||||||
from orgs.utils import tmp_to_org
|
from orgs.utils import tmp_to_org
|
||||||
|
from ..tasks import check_node_assets_amount_task
|
||||||
|
|
||||||
logger = get_logger(__file__)
|
logger = get_logger(__file__)
|
||||||
|
|
||||||
|
@ -33,13 +34,18 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs):
|
||||||
node_ids = [instance.id]
|
node_ids = [instance.id]
|
||||||
else:
|
else:
|
||||||
node_ids = pk_set
|
node_ids = pk_set
|
||||||
update_node_assets_amount(*node_ids)
|
update_nodes_assets_amount(*node_ids)
|
||||||
|
|
||||||
|
|
||||||
@merge_delay_run(ttl=5)
|
@merge_delay_run(ttl=5)
|
||||||
def update_node_assets_amount(*node_ids):
|
def update_nodes_assets_amount(*node_ids):
|
||||||
nodes = list(Node.objects.filter(id__in=node_ids))
|
nodes = list(Node.objects.filter(id__in=node_ids))
|
||||||
logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids)))
|
logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids)))
|
||||||
|
|
||||||
|
if len(node_ids) > 100:
|
||||||
|
check_node_assets_amount_task.delay()
|
||||||
|
return
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
node.assets_amount = node.get_assets_amount()
|
node.assets_amount = node.get_assets_amount()
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
# ~*~ coding: utf-8 ~*~
|
# ~*~ coding: utf-8 ~*~
|
||||||
#
|
#
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from common.db.models import output_as_string
|
||||||
|
from common.struct import Stack
|
||||||
from common.utils import get_logger, dict_get_any, is_uuid, get_object_or_none, timeit
|
from common.utils import get_logger, dict_get_any, is_uuid, get_object_or_none, timeit
|
||||||
from common.utils.http import is_true
|
from common.utils.http import is_true
|
||||||
from common.struct import Stack
|
|
||||||
from common.db.models import output_as_string
|
|
||||||
from orgs.utils import ensure_in_real_or_default_org, current_org
|
from orgs.utils import ensure_in_real_or_default_org, current_org
|
||||||
|
|
||||||
from ..locks import NodeTreeUpdateLock
|
from ..locks import NodeTreeUpdateLock
|
||||||
from ..models import Node, Asset
|
from ..models import Node, Asset
|
||||||
|
|
||||||
|
@ -25,11 +25,11 @@ def check_node_assets_amount():
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
nodeid_nodekey_mapper[node.id] = node.key
|
nodeid_nodekey_mapper[node.id] = node.key
|
||||||
|
|
||||||
for nodeid, assetid in nodeid_assetid_pairs:
|
for node_id, asset_id in nodeid_assetid_pairs:
|
||||||
if nodeid not in nodeid_nodekey_mapper:
|
if node_id not in nodeid_nodekey_mapper:
|
||||||
continue
|
continue
|
||||||
nodekey = nodeid_nodekey_mapper[nodeid]
|
node_key = nodeid_nodekey_mapper[node_id]
|
||||||
nodekey_assetids_mapper[nodekey].add(assetid)
|
nodekey_assetids_mapper[node_key].add(asset_id)
|
||||||
|
|
||||||
util = NodeAssetsUtil(nodes, nodekey_assetids_mapper)
|
util = NodeAssetsUtil(nodes, nodekey_assetids_mapper)
|
||||||
util.generate()
|
util.generate()
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
#
|
#
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from celery import shared_task
|
||||||
from django.apps import apps
|
from django.apps import apps
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.auth import BACKEND_SESSION_KEY
|
from django.contrib.auth import BACKEND_SESSION_KEY
|
||||||
|
@ -26,6 +27,7 @@ from common.signals import django_ready
|
||||||
from common.utils import get_request_ip, get_logger, get_syslogger
|
from common.utils import get_request_ip, get_logger, get_syslogger
|
||||||
from common.utils.encode import data_to_json
|
from common.utils.encode import data_to_json
|
||||||
from jumpserver.utils import current_request
|
from jumpserver.utils import current_request
|
||||||
|
from orgs.utils import org_aware_func
|
||||||
from terminal.models import Session, Command
|
from terminal.models import Session, Command
|
||||||
from terminal.serializers import SessionSerializer, SessionCommandSerializer
|
from terminal.serializers import SessionSerializer, SessionCommandSerializer
|
||||||
from users.models import User
|
from users.models import User
|
||||||
|
@ -69,24 +71,18 @@ M2M_ACTION = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@receiver(m2m_changed)
|
@shared_task(verbose_name=_("Create m2m operate log"))
|
||||||
def on_m2m_changed(sender, action, instance, reverse, model, pk_set, **kwargs):
|
@org_aware_func('instance')
|
||||||
if action not in M2M_ACTION:
|
def create_m2m_operate_log(instance, action, model, pk_set):
|
||||||
return
|
|
||||||
if not instance:
|
|
||||||
return
|
|
||||||
return
|
|
||||||
|
|
||||||
resource_type = instance._meta.verbose_name
|
|
||||||
current_instance = model_to_dict(instance, include_model_fields=False)
|
current_instance = model_to_dict(instance, include_model_fields=False)
|
||||||
|
resource_type = instance._meta.verbose_name
|
||||||
|
field_name = str(model._meta.verbose_name)
|
||||||
|
action = M2M_ACTION[action]
|
||||||
instance_id = current_instance.get('id')
|
instance_id = current_instance.get('id')
|
||||||
log_id, before_instance = get_instance_dict_from_cache(instance_id)
|
log_id, before_instance = get_instance_dict_from_cache(instance_id)
|
||||||
|
|
||||||
field_name = str(model._meta.verbose_name)
|
|
||||||
objs = model.objects.filter(pk__in=pk_set)
|
objs = model.objects.filter(pk__in=pk_set)
|
||||||
objs_display = [str(o) for o in objs]
|
objs_display = [str(o) for o in objs]
|
||||||
action = M2M_ACTION[action]
|
|
||||||
changed_field = current_instance.get(field_name, [])
|
changed_field = current_instance.get(field_name, [])
|
||||||
|
|
||||||
after, before, before_value = None, None, None
|
after, before, before_value = None, None, None
|
||||||
|
@ -107,10 +103,20 @@ def on_m2m_changed(sender, action, instance, reverse, model, pk_set, **kwargs):
|
||||||
|
|
||||||
create_or_update_operate_log(
|
create_or_update_operate_log(
|
||||||
ActionChoices.update, resource_type,
|
ActionChoices.update, resource_type,
|
||||||
resource=instance, log_id=log_id, before=before, after=after
|
resource=instance, log_id=log_id,
|
||||||
|
before=before, after=after
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@receiver(m2m_changed)
|
||||||
|
def on_m2m_changed(sender, action, instance, model, pk_set, **kwargs):
|
||||||
|
if action not in M2M_ACTION:
|
||||||
|
return
|
||||||
|
if not instance:
|
||||||
|
return
|
||||||
|
create_m2m_operate_log.delay(instance, action, model, pk_set)
|
||||||
|
|
||||||
|
|
||||||
def signal_of_operate_log_whether_continue(sender, instance, created, update_fields=None):
|
def signal_of_operate_log_whether_continue(sender, instance, created, update_fields=None):
|
||||||
condition = True
|
condition = True
|
||||||
if not instance:
|
if not instance:
|
||||||
|
@ -129,16 +135,21 @@ def signal_of_operate_log_whether_continue(sender, instance, created, update_fie
|
||||||
return condition
|
return condition
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(verbose_name=_("Create operate log"))
|
||||||
|
@org_aware_func('instance')
|
||||||
|
def create_operate_log(instance, created, update_fields=None):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@receiver(pre_save)
|
@receiver(pre_save)
|
||||||
def on_object_pre_create_or_update(sender, instance=None, raw=False, using=None, update_fields=None, **kwargs):
|
def on_object_pre_create_or_update(sender, instance=None, update_fields=None, **kwargs):
|
||||||
ok = signal_of_operate_log_whether_continue(
|
ok = signal_of_operate_log_whether_continue(
|
||||||
sender, instance, False, update_fields
|
sender, instance, False, update_fields
|
||||||
)
|
)
|
||||||
if not ok:
|
if not ok:
|
||||||
return
|
return
|
||||||
|
|
||||||
# users.PrivateToken Model 没有 id 有 pk字段
|
instance_id = getattr(instance, 'pk', None)
|
||||||
instance_id = getattr(instance, 'id', getattr(instance, 'pk', None))
|
|
||||||
instance_before_data = {'id': instance_id}
|
instance_before_data = {'id': instance_id}
|
||||||
raw_instance = type(instance).objects.filter(pk=instance_id).first()
|
raw_instance = type(instance).objects.filter(pk=instance_id).first()
|
||||||
|
|
||||||
|
@ -152,7 +163,6 @@ def on_object_pre_create_or_update(sender, instance=None, raw=False, using=None,
|
||||||
|
|
||||||
@receiver(post_save)
|
@receiver(post_save)
|
||||||
def on_object_created_or_update(sender, instance=None, created=False, update_fields=None, **kwargs):
|
def on_object_created_or_update(sender, instance=None, created=False, update_fields=None, **kwargs):
|
||||||
return
|
|
||||||
ok = signal_of_operate_log_whether_continue(
|
ok = signal_of_operate_log_whether_continue(
|
||||||
sender, instance, created, update_fields
|
sender, instance, created, update_fields
|
||||||
)
|
)
|
||||||
|
|
|
@ -65,7 +65,7 @@ def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs):
|
||||||
logger.error('delay run error: %s' % e)
|
logger.error('delay run error: %s' % e)
|
||||||
|
|
||||||
|
|
||||||
executor = ThreadPoolExecutor(10)
|
executor = ThreadPoolExecutor(20)
|
||||||
|
|
||||||
|
|
||||||
def delay_run(ttl=5, key=None):
|
def delay_run(ttl=5, key=None):
|
||||||
|
|
Loading…
Reference in New Issue