diff --git a/apps/common/decorators.py b/apps/common/decorators.py index 7e2c41ec5..38ce820b0 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -5,10 +5,8 @@ import functools import inspect import threading import time -import uuid from concurrent.futures import ThreadPoolExecutor -from django.core.cache import cache from django.db import transaction from .utils import logger @@ -46,27 +44,6 @@ def key_by_org(*args, **kwargs): return args[0].org_id -def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs): - from orgs.utils import set_current_org - - try: - set_current_org(org) - uid = uuid.uuid4().__str__() - suffix_key_func = suffix_key if suffix_key else default_suffix_key - func_name = f'{func.__module__}_{func.__name__}' - key_suffix = suffix_key_func(*args, **kwargs) - key = f'DELAY_RUN_{func_name}_{key_suffix}' - cache.set(key, uid, ttl) - st = (ttl - 2 > 1) and ttl - 2 or 2 - time.sleep(st) - ret = cache.get(key, None) - - if uid == ret: - func(*args, **kwargs) - except Exception as e: - logger.error('delay run error: %s' % e) - - class LoopThread(threading.Thread): def __init__(self, loop, *args, **kwargs): super().__init__(*args, **kwargs) @@ -82,80 +59,61 @@ loop = asyncio.get_event_loop() loop_thread = LoopThread(loop) loop_thread.daemon = True loop_thread.start() -executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix='debouncer') +executor = ThreadPoolExecutor(max_workers=10, + thread_name_prefix='debouncer') +_loop_debouncer_func_task_cache = {} +_loop_debouncer_func_args_cache = {} + + +def cancel_or_remove_debouncer_task(cache_key): + task = _loop_debouncer_func_task_cache.get(cache_key, None) + if not task: + return + if task.done(): + del _loop_debouncer_func_task_cache[cache_key] + else: + task.cancel() class Debouncer(object): - def __init__(self, callback, check, delay, *args, **kwargs): + def __init__(self, callback, check, delay, loop=None, executor=None): self.callback = callback self.check = check self.delay = delay + self.loop = loop + if not loop: + self.loop = asyncio.get_event_loop() + self.executor = executor async def __call__(self, *args, **kwargs): await asyncio.sleep(self.delay) - ok = await self._check(*args, **kwargs) + ok = await self._check() if ok: - await loop.run_in_executor(executor, self.callback, *args) + await self.loop.run_in_executor(self.executor, self.callback, *args) - async def _check(self, *args, **kwargs): + async def _check(self): if asyncio.iscoroutinefunction(self.check): - return await self.check(*args, **kwargs) - return await loop.run_in_executor(executor, self.check) + return await self.check() + return await self.loop.run_in_executor(self.executor, self.check) -def _run_func_with_org(org, func, *args, **kwargs): +def _run_func_with_org(key, org, func, *args, **kwargs): from orgs.utils import set_current_org - try: set_current_org(org) func(*args, **kwargs) except Exception as e: logger.error('delay run error: %s' % e) + _loop_debouncer_func_task_cache.pop(key, None) + _loop_debouncer_func_args_cache.pop(key, None) -def delay_run(ttl=5, key=None): +def delay_run(ttl=5, key=None, merge_args=False): """ 延迟执行函数, 在 ttl 秒内, 只执行最后一次 :param ttl: :param key: 是否合并参数, 一个 callback - :return: - """ - - def inner(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - from orgs.utils import get_current_org - org = get_current_org() - suffix_key_func = key if key else default_suffix_key - uid = uuid.uuid4().__str__() - func_name = f'{func.__module__}_{func.__name__}' - key_suffix = suffix_key_func(*args, **kwargs) - cache_key = f'DELAY_RUN_{func_name}_{key_suffix}' - # 延迟两倍时间,防止缓存过期,导致校验失败 - cache.set(cache_key, uid, ttl * 2) - - def _check_func(key_id, key_value): - ret = cache.get(key_id, None) - return key_value == ret - - check_func_partial = functools.partial(_check_func, cache_key, uid) - run_func_partial = functools.partial(_run_func_with_org, org, func) - asyncio.run_coroutine_threadsafe( - Debouncer(run_func_partial, check_func_partial, ttl)(*args, **kwargs), - loop=loop - ) - - return wrapper - - return inner - - -def merge_delay_run(ttl, key=None): - """ - 合并 func 参数,延迟执行, 在 ttl 秒内, 只执行最后一次 - func 参数必须是 *args - :param ttl: - :param key: 是否合并参数, 一个 callback + :param merge_args: 是否合并之前的参数, bool :return: """ @@ -164,39 +122,46 @@ def merge_delay_run(ttl, key=None): if len(sigs.parameters) != 1: raise ValueError('func must have one arguments: %s' % func.__name__) param = list(sigs.parameters.values())[0] - if not str(param).startswith('*'): - raise ValueError('func args must be startswith *: %s' % func.__name__) - + if not str(param).startswith('*') or param.kind == param.VAR_KEYWORD: + raise ValueError('func args must be startswith *: %s and not have **kwargs ' % func.__name__) suffix_key_func = key if key else default_suffix_key @functools.wraps(func) def wrapper(*args): - key_suffix = suffix_key_func(*args) + from orgs.utils import get_current_org + org = get_current_org() func_name = f'{func.__module__}_{func.__name__}' - cache_key = f'DELAY_MERGE_RUN_{func_name}_{key_suffix}' - values = cache.get(cache_key, []) - new_arg = [*values, *args] - cache.set(cache_key, new_arg, ttl) - return delay_run(ttl, suffix_key_func)(func)(*new_arg) + key_suffix = suffix_key_func(*args) + cache_key = f'DELAY_RUN_{func_name}_{key_suffix}' + new_arg = args + if merge_args: + values = _loop_debouncer_func_args_cache.get(cache_key, []) + new_arg = [*values, *args] + _loop_debouncer_func_args_cache[cache_key] = new_arg + + cancel_or_remove_debouncer_task(cache_key) + + run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func) + _debouncer = Debouncer(run_func_partial, lambda: True, ttl, + loop=loop, executor=executor) + task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg), + loop=loop) + _loop_debouncer_func_task_cache[cache_key] = task return wrapper return inner +merge_delay_run = functools.partial(delay_run, merge_args=True) + + @delay_run(ttl=5) -def test_delay_run(username, year=2000): - print("Hello, %s, now is %s" % (username, year)) +def test_delay_run(*username): + print("Hello, %s, now is %s" % (username, time.time())) -@merge_delay_run(ttl=5, key=lambda *users: users[0][0]) -def test_merge_delay_run(*users): - name = ','.join(users) - time.sleep(2) - print("Hello, %s, now is %s" % (name, time.time())) - - -@merge_delay_run(ttl=5, key=lambda *users: users[0][0]) +@delay_run(ttl=5, key=lambda *users: users[0][0], merge_args=True) def test_merge_delay_run(*users): name = ','.join(users) time.sleep(2) @@ -210,6 +175,7 @@ def do_test(): # test_delay_run('test', year=i) test_merge_delay_run('test %s' % i) test_merge_delay_run('best %s' % i) + test_delay_run('test run %s' % i) end = time.time() using = end - s