perf: 优化 delay run

pull/9530/head
ibuler 2 years ago
parent 48fe6b975b
commit 223b73c5c6

@ -24,7 +24,6 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs):
if action in refused: if action in refused:
raise ValueError raise ValueError
logger.debug('Recv asset nodes change signal, recompute node assets amount')
mapper = {PRE_ADD: add, POST_REMOVE: sub} mapper = {PRE_ADD: add, POST_REMOVE: sub}
if action not in mapper: if action not in mapper:
return return
@ -40,6 +39,7 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs):
@merge_delay_run(ttl=5) @merge_delay_run(ttl=5)
def update_nodes_assets_amount(node_ids=()): def update_nodes_assets_amount(node_ids=()):
nodes = list(Node.objects.filter(id__in=node_ids)) nodes = list(Node.objects.filter(id__in=node_ids))
logger.debug('Recv asset nodes change signal, recompute node assets amount')
logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids))) logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids)))
if len(node_ids) > 100: if len(node_ids) > 100:

@ -23,6 +23,7 @@ node_assets_mapping_pub_sub = lazy(lambda: RedisPubSub('fm.node_asset_mapping'),
@merge_delay_run(ttl=5) @merge_delay_run(ttl=5)
def expire_node_assets_mapping(org_ids=()): def expire_node_assets_mapping(org_ids=()):
logger.debug("Recv asset nodes changed signal, expire memery node asset mapping")
# 所有进程清除(自己的 memory 数据) # 所有进程清除(自己的 memory 数据)
root_org_id = Organization.ROOT_ID root_org_id = Organization.ROOT_ID
Node.expire_node_all_asset_ids_cache_mapping(root_org_id) Node.expire_node_all_asset_ids_cache_mapping(root_org_id)
@ -53,7 +54,6 @@ def on_node_post_delete(sender, instance, **kwargs):
@receiver(m2m_changed, sender=Asset.nodes.through) @receiver(m2m_changed, sender=Asset.nodes.through)
def on_node_asset_change(sender, instance, **kwargs): def on_node_asset_change(sender, instance, **kwargs):
logger.debug("Recv asset nodes changed signal, expire memery node asset mapping")
expire_node_assets_mapping(org_ids=(instance.org_id,)) expire_node_assets_mapping(org_ids=(instance.org_id,))

@ -175,13 +175,17 @@ def merge_delay_run(ttl=5, key=None):
key_suffix = suffix_key_func(*args, **kwargs) key_suffix = suffix_key_func(*args, **kwargs)
cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}' cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}'
cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {}) cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {})
for k, v in kwargs.items(): for k, v in kwargs.items():
if not isinstance(v, tuple): if not isinstance(v, (tuple, list, set)):
raise ValueError('func kwargs value must be list or tuple: %s' % func.__name__) raise ValueError('func kwargs value must be list or tuple: %s %s' % (func.__name__, v))
if k not in cache_kwargs: if k not in cache_kwargs:
cache_kwargs[k] = v cache_kwargs[k] = v
elif isinstance(v, set):
cache_kwargs[k] = cache_kwargs[k].union(v)
else: else:
cache_kwargs[k] += v cache_kwargs[k] += v
_loop_debouncer_func_args_cache[cache_key] = cache_kwargs
run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs) run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs)
return wrapper return wrapper
@ -190,11 +194,11 @@ def merge_delay_run(ttl=5, key=None):
@delay_run(ttl=5) @delay_run(ttl=5)
def test_delay_run(*username): def test_delay_run(username):
print("Hello, %s, now is %s" % (username, time.time())) print("Hello, %s, now is %s" % (username, time.time()))
@merge_delay_run(ttl=5, key=lambda *users: users[0][0]) @merge_delay_run(ttl=5, key=lambda users=(): users[0][0])
def test_merge_delay_run(users=()): def test_merge_delay_run(users=()):
name = ','.join(users) name = ','.join(users)
time.sleep(2) time.sleep(2)
@ -206,8 +210,8 @@ def do_test():
print("start : %s" % time.time()) print("start : %s" % time.time())
for i in range(100): for i in range(100):
# test_delay_run('test', year=i) # test_delay_run('test', year=i)
test_merge_delay_run('test %s' % i) test_merge_delay_run(users=['test %s' % i])
test_merge_delay_run('best %s' % i) test_merge_delay_run(users=['best %s' % i])
test_delay_run('test run %s' % i) test_delay_run('test run %s' % i)
end = time.time() end = time.time()

@ -40,7 +40,7 @@ def main():
default='all', default='all',
help="resource to generate" help="resource to generate"
) )
parser.add_argument('-c', '--count', type=int, default=10000) parser.add_argument('-c', '--count', type=int, default=1000)
parser.add_argument('-b', '--batch_size', type=int, default=100) parser.add_argument('-b', '--batch_size', type=int, default=100)
parser.add_argument('-o', '--org', type=str, default='') parser.add_argument('-o', '--org', type=str, default='')
args = parser.parse_args() args = parser.parse_args()

@ -44,5 +44,6 @@ class FakeDataGenerator:
using = end - start using = end - start
from_size = created from_size = created
created += len(batch) created += len(batch)
print('Generate %s: %s-%s [{}s]' % (self.resource, from_size, created, using)) print('Generate %s: %s-%s [%s]' % (self.resource, from_size, created, using))
self.after_generate() self.after_generate()
time.sleep(20)

Loading…
Cancel
Save