From 184e8b31e6541c31ae0a71e39fcbdba5617bf52a Mon Sep 17 00:00:00 2001 From: xinwen Date: Wed, 10 Mar 2021 10:09:55 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=E4=B8=8B=20`orgid=5F?= =?UTF-8?q?nodekey=5Fassetsid=5Fmapping`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/assets/models/node.py | 38 +++++++++++++++++++++++++++++-------- apps/common/utils/common.py | 10 ++++++---- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/apps/assets/models/node.py b/apps/assets/models/node.py index 9ff9c03bf..f30a60e73 100644 --- a/apps/assets/models/node.py +++ b/apps/assets/models/node.py @@ -40,7 +40,7 @@ def compute_parent_key(key): class NodeQuerySet(models.QuerySet): def delete(self): raise NotImplementedError -# + class FamilyMixin: __parents = None @@ -261,6 +261,12 @@ class NodeAllAssetsMappingMixin: # { org_id: { node_key: [ asset1_id, asset2_id ] } } orgid_nodekey_assetsid_mapping = defaultdict(dict) + locks_for_get_mapping_from_cache = defaultdict(threading.Lock) + + @classmethod + def get_lock(cls, org_id): + lock = cls.locks_for_get_mapping_from_cache[str(org_id)] + return lock @classmethod def get_node_all_asset_ids_mapping(cls, org_id): @@ -268,8 +274,22 @@ class NodeAllAssetsMappingMixin: if _mapping: return _mapping - _mapping = cls.get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(org_id) - cls.set_node_all_asset_ids_mapping_to_memory(org_id, mapping=_mapping) + logger.debug(f'Get node asset mapping from memory failed, acquire thread lock: ' + f'thread={threading.get_ident()} ' + f'org_id={org_id}') + with cls.get_lock(org_id): + logger.debug(f'Acquired thread lock ok. check if mapping is in memory now: ' + f'thread={threading.get_ident()} ' + f'org_id={org_id}') + _mapping = cls.get_node_all_asset_ids_mapping_from_memory(org_id) + if _mapping: + logger.debug(f'Mapping is already in memory now: ' + f'thread={threading.get_ident()} ' + f'org_id={org_id}') + return _mapping + + _mapping = cls.get_node_all_asset_ids_mapping_from_cache_or_generate_to_cache(org_id) + cls.set_node_all_asset_ids_mapping_to_memory(org_id, mapping=_mapping) return _mapping # from memory @@ -295,14 +315,10 @@ class NodeAllAssetsMappingMixin: return mapping lock_key = f'KEY_LOCK_GENERATE_ORG_{org_id}_NODE_ALL_ASSET_ids_MAPPING' - logger.info(f'Thread[{threading.get_ident()}] acquiring lock[{lock_key}] ...') with DistributedLock(lock_key): - logger.info(f'Thread[{threading.get_ident()}] acquire lock[{lock_key}] ok') # 这里使用无限期锁,原因是如果这里卡住了,就卡在数据库了,说明 # 数据库繁忙,所以不应该再有线程执行这个操作,使数据库忙上加忙 - # 这里最好先判断内存中有没有,防止同一进程的多个线程重复从 cache 中获取数据, - # 但逻辑过于繁琐,直接判断 cache 吧 _mapping = cls.get_node_all_asset_ids_mapping_from_cache(org_id) if _mapping: return _mapping @@ -315,6 +331,9 @@ class NodeAllAssetsMappingMixin: def get_node_all_asset_ids_mapping_from_cache(cls, org_id): cache_key = cls._get_cache_key_for_node_all_asset_ids_mapping(org_id) mapping = cache.get(cache_key) + logger.info(f'Get node asset mapping from cache {bool(mapping)}: ' + f'thread={threading.get_ident()} ' + f'org_id={org_id}') return mapping @classmethod @@ -335,6 +354,9 @@ class NodeAllAssetsMappingMixin: def generate_node_all_asset_ids_mapping(cls, org_id): from .asset import Asset + logger.info(f'Generate node asset mapping: ' + f'thread={threading.get_ident()} ' + f'org_id={org_id}') t1 = time.time() with tmp_to_org(org_id): node_ids_key = Node.objects.annotate( @@ -366,7 +388,7 @@ class NodeAllAssetsMappingMixin: mapping[ancestor_key].update(asset_ids) t3 = time.time() - logger.debug('t1-t2(DB Query): {} s, t3-t2(Generate mapping): {} s'.format(t2-t1, t3-t2)) + logger.info('t1-t2(DB Query): {} s, t3-t2(Generate mapping): {} s'.format(t2-t1, t3-t2)) return mapping diff --git a/apps/common/utils/common.py b/apps/common/utils/common.py index d1ecf8579..e24793b55 100644 --- a/apps/common/utils/common.py +++ b/apps/common/utils/common.py @@ -198,10 +198,12 @@ logger = get_logger(__name__) def timeit(func): def wrapper(*args, **kwargs): - if hasattr(func, '__name__'): - name = func.__name__ - else: - name = func + name = func + for attr in ('__qualname__', '__name__'): + if hasattr(func, attr): + name = getattr(func, attr) + break + logger.debug("Start call: {}".format(name)) now = time.time() result = func(*args, **kwargs)