diff --git a/apps/assets/signal_handlers/asset.py b/apps/assets/signal_handlers/asset.py
index 94aa86ff2..88ebab11e 100644
--- a/apps/assets/signal_handlers/asset.py
+++ b/apps/assets/signal_handlers/asset.py
@@ -21,13 +21,13 @@ def on_node_pre_save(sender, instance: Node, **kwargs):
 
 
 @merge_delay_run(ttl=5, key=key_by_org)
-def test_assets_connectivity_handler(*assets):
+def test_assets_connectivity_handler(assets=()):
     task_name = gettext_noop("Test assets connectivity ")
     test_assets_connectivity_task.delay(assets, task_name)
 
 
 @merge_delay_run(ttl=5, key=key_by_org)
-def gather_assets_facts_handler(*assets):
+def gather_assets_facts_handler(assets=()):
     if not assets:
         logger.info("No assets to update hardware info")
         return
@@ -36,7 +36,7 @@ def gather_assets_facts_handler(*assets):
 
 
 @merge_delay_run(ttl=5, key=key_by_org)
-def ensure_asset_has_node(*assets):
+def ensure_asset_has_node(assets=()):
     asset_ids = [asset.id for asset in assets]
     has_ids = Asset.nodes.through.objects \
         .filter(asset_id__in=asset_ids) \
@@ -60,16 +60,16 @@ def on_asset_create(sender, instance=None, created=False, **kwargs):
         return
     logger.info("Asset create signal recv: {}".format(instance))
 
-    ensure_asset_has_node(instance)
+    ensure_asset_has_node(assets=(instance,))
 
     # 获取资产硬件信息
     auto_info = instance.auto_info
     if auto_info.get('ping_enabled'):
         logger.debug('Asset {} ping enabled, test connectivity'.format(instance.name))
-        test_assets_connectivity_handler(instance)
+        test_assets_connectivity_handler(assets=(instance,))
     if auto_info.get('gather_facts_enabled'):
         logger.debug('Asset {} gather facts enabled, gather facts'.format(instance.name))
-        gather_assets_facts_handler(instance)
+        gather_assets_facts_handler(assets=(instance,))
 
 
 RELATED_NODE_IDS = '_related_node_ids'
diff --git a/apps/assets/signal_handlers/node_assets_amount.py b/apps/assets/signal_handlers/node_assets_amount.py
index 314171a71..cc1c50b84 100644
--- a/apps/assets/signal_handlers/node_assets_amount.py
+++ b/apps/assets/signal_handlers/node_assets_amount.py
@@ -34,11 +34,11 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs):
             node_ids = [instance.id]
         else:
             node_ids = pk_set
-        update_nodes_assets_amount(*node_ids)
+        update_nodes_assets_amount(node_ids=node_ids)
 
 
 @merge_delay_run(ttl=5)
-def update_nodes_assets_amount(*node_ids):
+def update_nodes_assets_amount(node_ids=()):
     nodes = list(Node.objects.filter(id__in=node_ids))
     logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids)))
 
diff --git a/apps/assets/signal_handlers/node_assets_mapping.py b/apps/assets/signal_handlers/node_assets_mapping.py
index 54855a29a..b84474033 100644
--- a/apps/assets/signal_handlers/node_assets_mapping.py
+++ b/apps/assets/signal_handlers/node_assets_mapping.py
@@ -22,7 +22,7 @@ node_assets_mapping_pub_sub = lazy(lambda: RedisPubSub('fm.node_asset_mapping'),
 
 
 @merge_delay_run(ttl=5)
-def expire_node_assets_mapping(*org_ids):
+def expire_node_assets_mapping(org_ids=()):
     # 所有进程清除(自己的 memory 数据)
     root_org_id = Organization.ROOT_ID
     Node.expire_node_all_asset_ids_cache_mapping(root_org_id)
@@ -43,18 +43,18 @@ def on_node_post_create(sender, instance, created, update_fields, **kwargs):
         need_expire = False
 
     if need_expire:
-        expire_node_assets_mapping(instance.org_id)
+        expire_node_assets_mapping(org_ids=(instance.org_id,))
 
 
 @receiver(post_delete, sender=Node)
 def on_node_post_delete(sender, instance, **kwargs):
-    expire_node_assets_mapping(instance.org_id)
+    expire_node_assets_mapping(org_ids=(instance.org_id,))
 
 
 @receiver(m2m_changed, sender=Asset.nodes.through)
 def on_node_asset_change(sender, instance, **kwargs):
     logger.debug("Recv asset nodes changed signal, expire memery node asset mapping")
-    expire_node_assets_mapping(instance.org_id)
+    expire_node_assets_mapping(org_ids=(instance.org_id,))
 
 
 @receiver(django_ready)
diff --git a/apps/common/decorators.py b/apps/common/decorators.py
index bdc541a9a..4e31ac0e5 100644
--- a/apps/common/decorators.py
+++ b/apps/common/decorators.py
@@ -41,7 +41,10 @@ def default_suffix_key(*args, **kwargs):
 
 
 def key_by_org(*args, **kwargs):
-    return args[0].org_id
+    values = list(kwargs.values())
+    if not values:
+        return 'default'
+    return values[0].org_id
 
 
 class EventLoopThread(threading.Thread):
@@ -79,6 +82,15 @@ def cancel_or_remove_debouncer_task(cache_key):
         task.cancel()
 
 
+def run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs):
+    cancel_or_remove_debouncer_task(cache_key)
+    run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
+    loop = _loop_thread.get_loop()
+    _debouncer = Debouncer(run_func_partial, lambda: True, ttl, loop=loop, executor=executor)
+    task = asyncio.run_coroutine_threadsafe(_debouncer(*args, **kwargs), loop=loop)
+    _loop_debouncer_func_task_cache[cache_key] = task
+
+
 class Debouncer(object):
     def __init__(self, callback, check, delay, loop=None, executor=None):
         self.callback = callback
@@ -113,12 +125,36 @@ def _run_func_with_org(key, org, func, *args, **kwargs):
     _loop_debouncer_func_args_cache.pop(key, None)
 
 
-def delay_run(ttl=5, key=None, merge_args=False):
+def delay_run(ttl=5, key=None):
     """
     延迟执行函数, 在 ttl 秒内, 只执行最后一次
     :param ttl:
     :param key: 是否合并参数, 一个 callback
-    :param merge_args: 是否合并之前的参数, bool
+    :return:
+    """
+
+    def inner(func):
+        suffix_key_func = key if key else default_suffix_key
+
+        @functools.wraps(func)
+        def wrapper(*args, **kwargs):
+            from orgs.utils import get_current_org
+            org = get_current_org()
+            func_name = f'{func.__module__}_{func.__name__}'
+            key_suffix = suffix_key_func(*args)
+            cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
+            run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs)
+
+        return wrapper
+
+    return inner
+
+
+def merge_delay_run(ttl=5, key=None):
+    """
+    延迟执行函数, 在 ttl 秒内, 只执行最后一次, 并且合并参数
+    :param ttl:
+    :param key: 是否合并参数, 一个 callback
     :return:
     """
 
@@ -127,48 +163,39 @@ def delay_run(ttl=5, key=None, merge_args=False):
         if len(sigs.parameters) != 1:
             raise ValueError('func must have one arguments: %s' % func.__name__)
         param = list(sigs.parameters.values())[0]
-        if not str(param).startswith('*') or param.kind == param.VAR_KEYWORD:
-            raise ValueError('func args must be startswith *: %s and not have **kwargs ' % func.__name__)
+        if not isinstance(param.default, tuple):
+            raise ValueError('func default must be tuple: %s' % param.default)
         suffix_key_func = key if key else default_suffix_key
 
         @functools.wraps(func)
-        def wrapper(*args):
+        def wrapper(*args, **kwargs):
             from orgs.utils import get_current_org
             org = get_current_org()
             func_name = f'{func.__module__}_{func.__name__}'
-            key_suffix = suffix_key_func(*args)
-            cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
-            new_arg = args
-            if merge_args:
-                values = _loop_debouncer_func_args_cache.get(cache_key, [])
-                new_arg = [*values, *args]
-                _loop_debouncer_func_args_cache[cache_key] = new_arg
-
-            cancel_or_remove_debouncer_task(cache_key)
-
-            run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
-            loop = _loop_thread.get_loop()
-            _debouncer = Debouncer(run_func_partial, lambda: True, ttl,
-                                   loop=loop, executor=executor)
-            task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg),
-                                                    loop=loop)
-            _loop_debouncer_func_task_cache[cache_key] = task
+            key_suffix = suffix_key_func(*args, **kwargs)
+            cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}'
+            cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {})
+            for k, v in kwargs.items():
+                if not isinstance(v, tuple):
+                    raise ValueError('func kwargs value must be list or tuple: %s' % func.__name__)
+                if k not in cache_kwargs:
+                    cache_kwargs[k] = v
+                else:
+                    cache_kwargs[k] += v
+            run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs)
 
         return wrapper
 
     return inner
 
 
-merge_delay_run = functools.partial(delay_run, merge_args=True)
-
-
 @delay_run(ttl=5)
 def test_delay_run(*username):
     print("Hello, %s, now is %s" % (username, time.time()))
 
 
-@delay_run(ttl=5, key=lambda *users: users[0][0], merge_args=True)
-def test_merge_delay_run(*users):
+@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
+def test_merge_delay_run(users=()):
     name = ','.join(users)
     time.sleep(2)
     print("Hello, %s, now is %s" % (name, time.time()))
diff --git a/apps/common/management/commands/expire_caches.py b/apps/common/management/commands/expire_caches.py
index d83b995b0..38749ee3d 100644
--- a/apps/common/management/commands/expire_caches.py
+++ b/apps/common/management/commands/expire_caches.py
@@ -1,6 +1,6 @@
 from django.core.management.base import BaseCommand
 
-from assets.signal_handlers.node_assets_mapping import expire_node_assets_mapping
+from assets.signal_handlers.node_assets_mapping import expire_node_assets_mapping as _expire_node_assets_mapping
 from orgs.caches import OrgResourceStatisticsCache
 from orgs.models import Organization
 
@@ -10,7 +10,7 @@ def expire_node_assets_mapping():
     org_ids = [*org_ids, '00000000-0000-0000-0000-000000000000']
 
     for org_id in org_ids:
-        expire_node_assets_mapping(org_id)
+        _expire_node_assets_mapping(org_ids=(org_id,))
 
 
 def expire_org_resource_statistics_cache():
diff --git a/apps/orgs/signal_handlers/cache.py b/apps/orgs/signal_handlers/cache.py
index 3eb1bd7e0..48a67149d 100644
--- a/apps/orgs/signal_handlers/cache.py
+++ b/apps/orgs/signal_handlers/cache.py
@@ -76,7 +76,7 @@ model_cache_field_mapper = {
 class OrgResourceStatisticsRefreshUtil:
     @staticmethod
     @merge_delay_run(ttl=5)
-    def refresh_org_fields(*org_fields):
+    def refresh_org_fields(org_fields=()):
         for org, cache_field_name in org_fields:
             OrgResourceStatisticsCache(org).expire(*cache_field_name)
             OrgResourceStatisticsCache(Organization.root()).expire(*cache_field_name)