mirror of https://github.com/jumpserver/jumpserver
perf: 优化下 `orgid_nodekey_assetsid_mapping`
parent
615bcadf62
commit
184e8b31e6
|
@ -40,7 +40,7 @@ def compute_parent_key(key):
|
||||||
class NodeQuerySet(models.QuerySet):
|
class NodeQuerySet(models.QuerySet):
|
||||||
def delete(self):
|
def delete(self):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
#
|
|
||||||
|
|
||||||
class FamilyMixin:
|
class FamilyMixin:
|
||||||
__parents = None
|
__parents = None
|
||||||
|
@ -261,6 +261,12 @@ class NodeAllAssetsMappingMixin:
|
||||||
|
|
||||||
# { org_id: { node_key: [ asset1_id, asset2_id ] } }
|
# { org_id: { node_key: [ asset1_id, asset2_id ] } }
|
||||||
orgid_nodekey_assetsid_mapping = defaultdict(dict)
|
orgid_nodekey_assetsid_mapping = defaultdict(dict)
|
||||||
|
locks_for_get_mapping_from_cache = defaultdict(threading.Lock)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_lock(cls, org_id):
|
||||||
|
lock = cls.locks_for_get_mapping_from_cache[str(org_id)]
|
||||||
|
return lock
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_node_all_asset_ids_mapping(cls, org_id):
|
def get_node_all_asset_ids_mapping(cls, org_id):
|
||||||
|
@ -268,6 +274,20 @@ class NodeAllAssetsMappingMixin:
|
||||||
if _mapping:
|
if _mapping:
|
||||||
return _mapping
|
return _mapping
|
||||||
|
|
||||||
|
logger.debug(f'Get node asset mapping from memory failed, acquire thread lock: '
|
||||||
|
f'thread={threading.get_ident()} '
|
||||||
|
f'org_id={org_id}')
|
||||||
|
with cls.get_lock(org_id):
|
||||||
|
logger.debug(f'Acquired thread lock ok. check if mapping is in memory now: '
|
||||||
|
f'thread={threading.get_ident()} '
|
||||||
|
f'org_id={org_id}')
|
||||||
|
_mapping = cls.get_node_all_asset_ids_mapping_from_memory(org_id)
|
||||||
|
if _mapping:
|
||||||
|
logger.debug(f'Mapping is already in memory now: '
|
||||||
|
f'thread={threading.get_ident()} '
|
||||||
|
f'org_id={org_id}')
|
||||||
|
return _mapping
|
||||||
|
|
||||||
_mapping = cls.get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(org_id)
|
_mapping = cls.get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(org_id)
|
||||||
cls.set_node_all_asset_ids_mapping_to_memory(org_id, mapping=_mapping)
|
cls.set_node_all_asset_ids_mapping_to_memory(org_id, mapping=_mapping)
|
||||||
return _mapping
|
return _mapping
|
||||||
|
@ -295,14 +315,10 @@ class NodeAllAssetsMappingMixin:
|
||||||
return mapping
|
return mapping
|
||||||
|
|
||||||
lock_key = f'KEY_LOCK_GENERATE_ORG_{org_id}_NODE_ALL_ASSET_ids_MAPPING'
|
lock_key = f'KEY_LOCK_GENERATE_ORG_{org_id}_NODE_ALL_ASSET_ids_MAPPING'
|
||||||
logger.info(f'Thread[{threading.get_ident()}] acquiring lock[{lock_key}] ...')
|
|
||||||
with DistributedLock(lock_key):
|
with DistributedLock(lock_key):
|
||||||
logger.info(f'Thread[{threading.get_ident()}] acquire lock[{lock_key}] ok')
|
|
||||||
# 这里使用无限期锁,原因是如果这里卡住了,就卡在数据库了,说明
|
# 这里使用无限期锁,原因是如果这里卡住了,就卡在数据库了,说明
|
||||||
# 数据库繁忙,所以不应该再有线程执行这个操作,使数据库忙上加忙
|
# 数据库繁忙,所以不应该再有线程执行这个操作,使数据库忙上加忙
|
||||||
|
|
||||||
# 这里最好先判断内存中有没有,防止同一进程的多个线程重复从 cache 中获取数据,
|
|
||||||
# 但逻辑过于繁琐,直接判断 cache 吧
|
|
||||||
_mapping = cls.get_node_all_asset_ids_mapping_from_cache(org_id)
|
_mapping = cls.get_node_all_asset_ids_mapping_from_cache(org_id)
|
||||||
if _mapping:
|
if _mapping:
|
||||||
return _mapping
|
return _mapping
|
||||||
|
@ -315,6 +331,9 @@ class NodeAllAssetsMappingMixin:
|
||||||
def get_node_all_asset_ids_mapping_from_cache(cls, org_id):
|
def get_node_all_asset_ids_mapping_from_cache(cls, org_id):
|
||||||
cache_key = cls._get_cache_key_for_node_all_asset_ids_mapping(org_id)
|
cache_key = cls._get_cache_key_for_node_all_asset_ids_mapping(org_id)
|
||||||
mapping = cache.get(cache_key)
|
mapping = cache.get(cache_key)
|
||||||
|
logger.info(f'Get node asset mapping from cache {bool(mapping)}: '
|
||||||
|
f'thread={threading.get_ident()} '
|
||||||
|
f'org_id={org_id}')
|
||||||
return mapping
|
return mapping
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -335,6 +354,9 @@ class NodeAllAssetsMappingMixin:
|
||||||
def generate_node_all_asset_ids_mapping(cls, org_id):
|
def generate_node_all_asset_ids_mapping(cls, org_id):
|
||||||
from .asset import Asset
|
from .asset import Asset
|
||||||
|
|
||||||
|
logger.info(f'Generate node asset mapping: '
|
||||||
|
f'thread={threading.get_ident()} '
|
||||||
|
f'org_id={org_id}')
|
||||||
t1 = time.time()
|
t1 = time.time()
|
||||||
with tmp_to_org(org_id):
|
with tmp_to_org(org_id):
|
||||||
node_ids_key = Node.objects.annotate(
|
node_ids_key = Node.objects.annotate(
|
||||||
|
@ -366,7 +388,7 @@ class NodeAllAssetsMappingMixin:
|
||||||
mapping[ancestor_key].update(asset_ids)
|
mapping[ancestor_key].update(asset_ids)
|
||||||
|
|
||||||
t3 = time.time()
|
t3 = time.time()
|
||||||
logger.debug('t1-t2(DB Query): {} s, t3-t2(Generate mapping): {} s'.format(t2-t1, t3-t2))
|
logger.info('t1-t2(DB Query): {} s, t3-t2(Generate mapping): {} s'.format(t2-t1, t3-t2))
|
||||||
return mapping
|
return mapping
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -198,10 +198,12 @@ logger = get_logger(__name__)
|
||||||
|
|
||||||
def timeit(func):
|
def timeit(func):
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
if hasattr(func, '__name__'):
|
|
||||||
name = func.__name__
|
|
||||||
else:
|
|
||||||
name = func
|
name = func
|
||||||
|
for attr in ('__qualname__', '__name__'):
|
||||||
|
if hasattr(func, attr):
|
||||||
|
name = getattr(func, attr)
|
||||||
|
break
|
||||||
|
|
||||||
logger.debug("Start call: {}".format(name))
|
logger.debug("Start call: {}".format(name))
|
||||||
now = time.time()
|
now = time.time()
|
||||||
result = func(*args, **kwargs)
|
result = func(*args, **kwargs)
|
||||||
|
|
Loading…
Reference in New Issue