diff --git a/apps/assets/signal_handlers/asset.py b/apps/assets/signal_handlers/asset.py index 94aa86ff2..88ebab11e 100644 --- a/apps/assets/signal_handlers/asset.py +++ b/apps/assets/signal_handlers/asset.py @@ -21,13 +21,13 @@ def on_node_pre_save(sender, instance: Node, **kwargs): @merge_delay_run(ttl=5, key=key_by_org) -def test_assets_connectivity_handler(*assets): +def test_assets_connectivity_handler(assets=()): task_name = gettext_noop("Test assets connectivity ") test_assets_connectivity_task.delay(assets, task_name) @merge_delay_run(ttl=5, key=key_by_org) -def gather_assets_facts_handler(*assets): +def gather_assets_facts_handler(assets=()): if not assets: logger.info("No assets to update hardware info") return @@ -36,7 +36,7 @@ def gather_assets_facts_handler(*assets): @merge_delay_run(ttl=5, key=key_by_org) -def ensure_asset_has_node(*assets): +def ensure_asset_has_node(assets=()): asset_ids = [asset.id for asset in assets] has_ids = Asset.nodes.through.objects \ .filter(asset_id__in=asset_ids) \ @@ -60,16 +60,16 @@ def on_asset_create(sender, instance=None, created=False, **kwargs): return logger.info("Asset create signal recv: {}".format(instance)) - ensure_asset_has_node(instance) + ensure_asset_has_node(assets=(instance,)) # 获取资产硬件信息 auto_info = instance.auto_info if auto_info.get('ping_enabled'): logger.debug('Asset {} ping enabled, test connectivity'.format(instance.name)) - test_assets_connectivity_handler(instance) + test_assets_connectivity_handler(assets=(instance,)) if auto_info.get('gather_facts_enabled'): logger.debug('Asset {} gather facts enabled, gather facts'.format(instance.name)) - gather_assets_facts_handler(instance) + gather_assets_facts_handler(assets=(instance,)) RELATED_NODE_IDS = '_related_node_ids' diff --git a/apps/assets/signal_handlers/node_assets_amount.py b/apps/assets/signal_handlers/node_assets_amount.py index 314171a71..cc1c50b84 100644 --- a/apps/assets/signal_handlers/node_assets_amount.py +++ b/apps/assets/signal_handlers/node_assets_amount.py @@ -34,11 +34,11 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs): node_ids = [instance.id] else: node_ids = pk_set - update_nodes_assets_amount(*node_ids) + update_nodes_assets_amount(node_ids=node_ids) @merge_delay_run(ttl=5) -def update_nodes_assets_amount(*node_ids): +def update_nodes_assets_amount(node_ids=()): nodes = list(Node.objects.filter(id__in=node_ids)) logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids))) diff --git a/apps/assets/signal_handlers/node_assets_mapping.py b/apps/assets/signal_handlers/node_assets_mapping.py index 54855a29a..b84474033 100644 --- a/apps/assets/signal_handlers/node_assets_mapping.py +++ b/apps/assets/signal_handlers/node_assets_mapping.py @@ -22,7 +22,7 @@ node_assets_mapping_pub_sub = lazy(lambda: RedisPubSub('fm.node_asset_mapping'), @merge_delay_run(ttl=5) -def expire_node_assets_mapping(*org_ids): +def expire_node_assets_mapping(org_ids=()): # 所有进程清除(自己的 memory 数据) root_org_id = Organization.ROOT_ID Node.expire_node_all_asset_ids_cache_mapping(root_org_id) @@ -43,18 +43,18 @@ def on_node_post_create(sender, instance, created, update_fields, **kwargs): need_expire = False if need_expire: - expire_node_assets_mapping(instance.org_id) + expire_node_assets_mapping(org_ids=(instance.org_id,)) @receiver(post_delete, sender=Node) def on_node_post_delete(sender, instance, **kwargs): - expire_node_assets_mapping(instance.org_id) + expire_node_assets_mapping(org_ids=(instance.org_id,)) @receiver(m2m_changed, sender=Asset.nodes.through) def on_node_asset_change(sender, instance, **kwargs): logger.debug("Recv asset nodes changed signal, expire memery node asset mapping") - expire_node_assets_mapping(instance.org_id) + expire_node_assets_mapping(org_ids=(instance.org_id,)) @receiver(django_ready) diff --git a/apps/common/decorators.py b/apps/common/decorators.py index bdc541a9a..4e31ac0e5 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -41,7 +41,10 @@ def default_suffix_key(*args, **kwargs): def key_by_org(*args, **kwargs): - return args[0].org_id + values = list(kwargs.values()) + if not values: + return 'default' + return values[0].org_id class EventLoopThread(threading.Thread): @@ -79,6 +82,15 @@ def cancel_or_remove_debouncer_task(cache_key): task.cancel() +def run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs): + cancel_or_remove_debouncer_task(cache_key) + run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func) + loop = _loop_thread.get_loop() + _debouncer = Debouncer(run_func_partial, lambda: True, ttl, loop=loop, executor=executor) + task = asyncio.run_coroutine_threadsafe(_debouncer(*args, **kwargs), loop=loop) + _loop_debouncer_func_task_cache[cache_key] = task + + class Debouncer(object): def __init__(self, callback, check, delay, loop=None, executor=None): self.callback = callback @@ -113,53 +125,68 @@ def _run_func_with_org(key, org, func, *args, **kwargs): _loop_debouncer_func_args_cache.pop(key, None) -def delay_run(ttl=5, key=None, merge_args=False): +def delay_run(ttl=5, key=None): """ 延迟执行函数, 在 ttl 秒内, 只执行最后一次 :param ttl: :param key: 是否合并参数, 一个 callback - :param merge_args: 是否合并之前的参数, bool :return: """ def inner(func): - sigs = inspect.signature(func) - if len(sigs.parameters) != 1: - raise ValueError('func must have one arguments: %s' % func.__name__) - param = list(sigs.parameters.values())[0] - if not str(param).startswith('*') or param.kind == param.VAR_KEYWORD: - raise ValueError('func args must be startswith *: %s and not have **kwargs ' % func.__name__) suffix_key_func = key if key else default_suffix_key @functools.wraps(func) - def wrapper(*args): + def wrapper(*args, **kwargs): from orgs.utils import get_current_org org = get_current_org() func_name = f'{func.__module__}_{func.__name__}' key_suffix = suffix_key_func(*args) cache_key = f'DELAY_RUN_{func_name}_{key_suffix}' - new_arg = args - if merge_args: - values = _loop_debouncer_func_args_cache.get(cache_key, []) - new_arg = [*values, *args] - _loop_debouncer_func_args_cache[cache_key] = new_arg - - cancel_or_remove_debouncer_task(cache_key) - - run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func) - loop = _loop_thread.get_loop() - _debouncer = Debouncer(run_func_partial, lambda: True, ttl, - loop=loop, executor=executor) - task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg), - loop=loop) - _loop_debouncer_func_task_cache[cache_key] = task + run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs) return wrapper return inner -merge_delay_run = functools.partial(delay_run, merge_args=True) +def merge_delay_run(ttl=5, key=None): + """ + 延迟执行函数, 在 ttl 秒内, 只执行最后一次, 并且合并参数 + :param ttl: + :param key: 是否合并参数, 一个 callback + :return: + """ + + def inner(func): + sigs = inspect.signature(func) + if len(sigs.parameters) != 1: + raise ValueError('func must have one arguments: %s' % func.__name__) + param = list(sigs.parameters.values())[0] + if not isinstance(param.default, tuple): + raise ValueError('func default must be tuple: %s' % param.default) + suffix_key_func = key if key else default_suffix_key + + @functools.wraps(func) + def wrapper(*args, **kwargs): + from orgs.utils import get_current_org + org = get_current_org() + func_name = f'{func.__module__}_{func.__name__}' + key_suffix = suffix_key_func(*args, **kwargs) + cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}' + cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {}) + for k, v in kwargs.items(): + if not isinstance(v, tuple): + raise ValueError('func kwargs value must be list or tuple: %s' % func.__name__) + if k not in cache_kwargs: + cache_kwargs[k] = v + else: + cache_kwargs[k] += v + run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs) + + return wrapper + + return inner @delay_run(ttl=5) @@ -167,8 +194,8 @@ def test_delay_run(*username): print("Hello, %s, now is %s" % (username, time.time())) -@delay_run(ttl=5, key=lambda *users: users[0][0], merge_args=True) -def test_merge_delay_run(*users): +@merge_delay_run(ttl=5, key=lambda *users: users[0][0]) +def test_merge_delay_run(users=()): name = ','.join(users) time.sleep(2) print("Hello, %s, now is %s" % (name, time.time())) diff --git a/apps/common/management/commands/expire_caches.py b/apps/common/management/commands/expire_caches.py index d83b995b0..38749ee3d 100644 --- a/apps/common/management/commands/expire_caches.py +++ b/apps/common/management/commands/expire_caches.py @@ -1,6 +1,6 @@ from django.core.management.base import BaseCommand -from assets.signal_handlers.node_assets_mapping import expire_node_assets_mapping +from assets.signal_handlers.node_assets_mapping import expire_node_assets_mapping as _expire_node_assets_mapping from orgs.caches import OrgResourceStatisticsCache from orgs.models import Organization @@ -10,7 +10,7 @@ def expire_node_assets_mapping(): org_ids = [*org_ids, '00000000-0000-0000-0000-000000000000'] for org_id in org_ids: - expire_node_assets_mapping(org_id) + _expire_node_assets_mapping(org_ids=(org_id,)) def expire_org_resource_statistics_cache(): diff --git a/apps/orgs/signal_handlers/cache.py b/apps/orgs/signal_handlers/cache.py index 3eb1bd7e0..48a67149d 100644 --- a/apps/orgs/signal_handlers/cache.py +++ b/apps/orgs/signal_handlers/cache.py @@ -76,7 +76,7 @@ model_cache_field_mapper = { class OrgResourceStatisticsRefreshUtil: @staticmethod @merge_delay_run(ttl=5) - def refresh_org_fields(*org_fields): + def refresh_org_fields(org_fields=()): for org, cache_field_name in org_fields: OrgResourceStatisticsCache(org).expire(*cache_field_name) OrgResourceStatisticsCache(Organization.root()).expire(*cache_field_name)