diff --git a/apps/common/decorators.py b/apps/common/decorators.py
index 7e2c41ec5..38ce820b0 100644
--- a/apps/common/decorators.py
+++ b/apps/common/decorators.py
@@ -5,10 +5,8 @@ import functools
 import inspect
 import threading
 import time
-import uuid
 from concurrent.futures import ThreadPoolExecutor
 
-from django.core.cache import cache
 from django.db import transaction
 
 from .utils import logger
@@ -46,27 +44,6 @@ def key_by_org(*args, **kwargs):
     return args[0].org_id
 
 
-def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs):
-    from orgs.utils import set_current_org
-
-    try:
-        set_current_org(org)
-        uid = uuid.uuid4().__str__()
-        suffix_key_func = suffix_key if suffix_key else default_suffix_key
-        func_name = f'{func.__module__}_{func.__name__}'
-        key_suffix = suffix_key_func(*args, **kwargs)
-        key = f'DELAY_RUN_{func_name}_{key_suffix}'
-        cache.set(key, uid, ttl)
-        st = (ttl - 2 > 1) and ttl - 2 or 2
-        time.sleep(st)
-        ret = cache.get(key, None)
-
-        if uid == ret:
-            func(*args, **kwargs)
-    except Exception as e:
-        logger.error('delay run error: %s' % e)
-
-
 class LoopThread(threading.Thread):
     def __init__(self, loop, *args, **kwargs):
         super().__init__(*args, **kwargs)
@@ -82,80 +59,61 @@ loop = asyncio.get_event_loop()
 loop_thread = LoopThread(loop)
 loop_thread.daemon = True
 loop_thread.start()
-executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix='debouncer')
+executor = ThreadPoolExecutor(max_workers=10,
+                              thread_name_prefix='debouncer')
+_loop_debouncer_func_task_cache = {}
+_loop_debouncer_func_args_cache = {}
+
+
+def cancel_or_remove_debouncer_task(cache_key):
+    task = _loop_debouncer_func_task_cache.get(cache_key, None)
+    if not task:
+        return
+    if task.done():
+        del _loop_debouncer_func_task_cache[cache_key]
+    else:
+        task.cancel()
 
 
 class Debouncer(object):
-    def __init__(self, callback, check, delay, *args, **kwargs):
+    def __init__(self, callback, check, delay, loop=None, executor=None):
         self.callback = callback
         self.check = check
         self.delay = delay
+        self.loop = loop
+        if not loop:
+            self.loop = asyncio.get_event_loop()
+        self.executor = executor
 
     async def __call__(self, *args, **kwargs):
         await asyncio.sleep(self.delay)
-        ok = await self._check(*args, **kwargs)
+        ok = await self._check()
         if ok:
-            await loop.run_in_executor(executor, self.callback, *args)
+            await self.loop.run_in_executor(self.executor, self.callback, *args)
 
-    async def _check(self, *args, **kwargs):
+    async def _check(self):
         if asyncio.iscoroutinefunction(self.check):
-            return await self.check(*args, **kwargs)
-        return await loop.run_in_executor(executor, self.check)
+            return await self.check()
+        return await self.loop.run_in_executor(self.executor, self.check)
 
 
-def _run_func_with_org(org, func, *args, **kwargs):
+def _run_func_with_org(key, org, func, *args, **kwargs):
     from orgs.utils import set_current_org
-
     try:
         set_current_org(org)
         func(*args, **kwargs)
     except Exception as e:
         logger.error('delay run error: %s' % e)
+    _loop_debouncer_func_task_cache.pop(key, None)
+    _loop_debouncer_func_args_cache.pop(key, None)
 
 
-def delay_run(ttl=5, key=None):
+def delay_run(ttl=5, key=None, merge_args=False):
     """
     延迟执行函数, 在 ttl 秒内, 只执行最后一次
     :param ttl:
     :param key: 是否合并参数, 一个 callback
-    :return:
-    """
-
-    def inner(func):
-        @functools.wraps(func)
-        def wrapper(*args, **kwargs):
-            from orgs.utils import get_current_org
-            org = get_current_org()
-            suffix_key_func = key if key else default_suffix_key
-            uid = uuid.uuid4().__str__()
-            func_name = f'{func.__module__}_{func.__name__}'
-            key_suffix = suffix_key_func(*args, **kwargs)
-            cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
-            # 延迟两倍时间,防止缓存过期,导致校验失败
-            cache.set(cache_key, uid, ttl * 2)
-
-            def _check_func(key_id, key_value):
-                ret = cache.get(key_id, None)
-                return key_value == ret
-
-            check_func_partial = functools.partial(_check_func, cache_key, uid)
-            run_func_partial = functools.partial(_run_func_with_org, org, func)
-            asyncio.run_coroutine_threadsafe(
-                Debouncer(run_func_partial, check_func_partial, ttl)(*args, **kwargs),
-                loop=loop
-            )
-
-        return wrapper
-
-    return inner
-
-
-def merge_delay_run(ttl, key=None):
-    """
-    合并 func 参数,延迟执行, 在 ttl 秒内, 只执行最后一次
-    func 参数必须是 *args
-    :param ttl:
-    :param key: 是否合并参数, 一个 callback
+    :param merge_args: 是否合并之前的参数, bool
     :return:
     """
 
@@ -164,39 +122,46 @@ def merge_delay_run(ttl, key=None):
         if len(sigs.parameters) != 1:
             raise ValueError('func must have one arguments: %s' % func.__name__)
         param = list(sigs.parameters.values())[0]
-        if not str(param).startswith('*'):
-            raise ValueError('func args must be startswith *: %s' % func.__name__)
-
+        if not str(param).startswith('*') or param.kind == param.VAR_KEYWORD:
+            raise ValueError('func args must be startswith *: %s and not have **kwargs ' % func.__name__)
         suffix_key_func = key if key else default_suffix_key
 
         @functools.wraps(func)
         def wrapper(*args):
-            key_suffix = suffix_key_func(*args)
+            from orgs.utils import get_current_org
+            org = get_current_org()
             func_name = f'{func.__module__}_{func.__name__}'
-            cache_key = f'DELAY_MERGE_RUN_{func_name}_{key_suffix}'
-            values = cache.get(cache_key, [])
-            new_arg = [*values, *args]
-            cache.set(cache_key, new_arg, ttl)
-            return delay_run(ttl, suffix_key_func)(func)(*new_arg)
+            key_suffix = suffix_key_func(*args)
+            cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
+            new_arg = args
+            if merge_args:
+                values = _loop_debouncer_func_args_cache.get(cache_key, [])
+                new_arg = [*values, *args]
+                _loop_debouncer_func_args_cache[cache_key] = new_arg
+
+            cancel_or_remove_debouncer_task(cache_key)
+
+            run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
+            _debouncer = Debouncer(run_func_partial, lambda: True, ttl,
+                                   loop=loop, executor=executor)
+            task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg),
+                                                    loop=loop)
+            _loop_debouncer_func_task_cache[cache_key] = task
 
         return wrapper
 
     return inner
 
 
+merge_delay_run = functools.partial(delay_run, merge_args=True)
+
+
 @delay_run(ttl=5)
-def test_delay_run(username, year=2000):
-    print("Hello, %s, now is %s" % (username, year))
+def test_delay_run(*username):
+    print("Hello, %s, now is %s" % (username, time.time()))
 
 
-@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
-def test_merge_delay_run(*users):
-    name = ','.join(users)
-    time.sleep(2)
-    print("Hello, %s, now is %s" % (name, time.time()))
-
-
-@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
+@delay_run(ttl=5, key=lambda *users: users[0][0], merge_args=True)
 def test_merge_delay_run(*users):
     name = ','.join(users)
     time.sleep(2)
@@ -210,6 +175,7 @@ def do_test():
         # test_delay_run('test', year=i)
         test_merge_delay_run('test %s' % i)
         test_merge_delay_run('best %s' % i)
+        test_delay_run('test run %s' % i)
 
     end = time.time()
     using = end - s