From 223b73c5c6e89bfaf28c0b260e6bf7e4c3b8c2d9 Mon Sep 17 00:00:00 2001 From: ibuler Date: Mon, 13 Feb 2023 20:04:17 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=20delay=20run?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../assets/signal_handlers/node_assets_amount.py | 2 +- .../signal_handlers/node_assets_mapping.py | 2 +- apps/common/decorators.py | 16 ++++++++++------ utils/generate_fake_data/generate.py | 2 +- utils/generate_fake_data/resources/base.py | 3 ++- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/apps/assets/signal_handlers/node_assets_amount.py b/apps/assets/signal_handlers/node_assets_amount.py index cc1c50b84..361f3a176 100644 --- a/apps/assets/signal_handlers/node_assets_amount.py +++ b/apps/assets/signal_handlers/node_assets_amount.py @@ -24,7 +24,6 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs): if action in refused: raise ValueError - logger.debug('Recv asset nodes change signal, recompute node assets amount') mapper = {PRE_ADD: add, POST_REMOVE: sub} if action not in mapper: return @@ -40,6 +39,7 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs): @merge_delay_run(ttl=5) def update_nodes_assets_amount(node_ids=()): nodes = list(Node.objects.filter(id__in=node_ids)) + logger.debug('Recv asset nodes change signal, recompute node assets amount') logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids))) if len(node_ids) > 100: diff --git a/apps/assets/signal_handlers/node_assets_mapping.py b/apps/assets/signal_handlers/node_assets_mapping.py index b84474033..34b707dab 100644 --- a/apps/assets/signal_handlers/node_assets_mapping.py +++ b/apps/assets/signal_handlers/node_assets_mapping.py @@ -23,6 +23,7 @@ node_assets_mapping_pub_sub = lazy(lambda: RedisPubSub('fm.node_asset_mapping'), @merge_delay_run(ttl=5) def expire_node_assets_mapping(org_ids=()): + logger.debug("Recv asset nodes changed signal, expire memery node asset mapping") # 所有进程清除(自己的 memory 数据) root_org_id = Organization.ROOT_ID Node.expire_node_all_asset_ids_cache_mapping(root_org_id) @@ -53,7 +54,6 @@ def on_node_post_delete(sender, instance, **kwargs): @receiver(m2m_changed, sender=Asset.nodes.through) def on_node_asset_change(sender, instance, **kwargs): - logger.debug("Recv asset nodes changed signal, expire memery node asset mapping") expire_node_assets_mapping(org_ids=(instance.org_id,)) diff --git a/apps/common/decorators.py b/apps/common/decorators.py index 4e31ac0e5..97c715d44 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -175,13 +175,17 @@ def merge_delay_run(ttl=5, key=None): key_suffix = suffix_key_func(*args, **kwargs) cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}' cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {}) + for k, v in kwargs.items(): - if not isinstance(v, tuple): - raise ValueError('func kwargs value must be list or tuple: %s' % func.__name__) + if not isinstance(v, (tuple, list, set)): + raise ValueError('func kwargs value must be list or tuple: %s %s' % (func.__name__, v)) if k not in cache_kwargs: cache_kwargs[k] = v + elif isinstance(v, set): + cache_kwargs[k] = cache_kwargs[k].union(v) else: cache_kwargs[k] += v + _loop_debouncer_func_args_cache[cache_key] = cache_kwargs run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs) return wrapper @@ -190,11 +194,11 @@ def merge_delay_run(ttl=5, key=None): @delay_run(ttl=5) -def test_delay_run(*username): +def test_delay_run(username): print("Hello, %s, now is %s" % (username, time.time())) -@merge_delay_run(ttl=5, key=lambda *users: users[0][0]) +@merge_delay_run(ttl=5, key=lambda users=(): users[0][0]) def test_merge_delay_run(users=()): name = ','.join(users) time.sleep(2) @@ -206,8 +210,8 @@ def do_test(): print("start : %s" % time.time()) for i in range(100): # test_delay_run('test', year=i) - test_merge_delay_run('test %s' % i) - test_merge_delay_run('best %s' % i) + test_merge_delay_run(users=['test %s' % i]) + test_merge_delay_run(users=['best %s' % i]) test_delay_run('test run %s' % i) end = time.time() diff --git a/utils/generate_fake_data/generate.py b/utils/generate_fake_data/generate.py index 394e27113..0931cca8f 100644 --- a/utils/generate_fake_data/generate.py +++ b/utils/generate_fake_data/generate.py @@ -40,7 +40,7 @@ def main(): default='all', help="resource to generate" ) - parser.add_argument('-c', '--count', type=int, default=10000) + parser.add_argument('-c', '--count', type=int, default=1000) parser.add_argument('-b', '--batch_size', type=int, default=100) parser.add_argument('-o', '--org', type=str, default='') args = parser.parse_args() diff --git a/utils/generate_fake_data/resources/base.py b/utils/generate_fake_data/resources/base.py index 828789dd7..39942d5f1 100644 --- a/utils/generate_fake_data/resources/base.py +++ b/utils/generate_fake_data/resources/base.py @@ -44,5 +44,6 @@ class FakeDataGenerator: using = end - start from_size = created created += len(batch) - print('Generate %s: %s-%s [{}s]' % (self.resource, from_size, created, using)) + print('Generate %s: %s-%s [%s]' % (self.resource, from_size, created, using)) self.after_generate() + time.sleep(20)