diff --git a/apps/common/decorators.py b/apps/common/decorators.py index 04e27219b..8dc6ed4de 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- # +import asyncio import functools import inspect +import threading import time import uuid from concurrent.futures import ThreadPoolExecutor @@ -65,7 +67,50 @@ def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs): logger.error('delay run error: %s' % e) -executor = ThreadPoolExecutor(10) +class LoopThread(threading.Thread): + def __init__(self, loop, *args, **kwargs): + super().__init__(*args, **kwargs) + self.loop = loop + + def run(self) -> None: + asyncio.set_event_loop(loop) + self.loop.run_forever() + print('loop stopped') + + +loop = asyncio.get_event_loop() +loop_thread = LoopThread(loop) +loop_thread.daemon = True +loop_thread.start() +executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix='debouncer') + + +class Debouncer(object): + def __init__(self, callback, check, delay, *args, **kwargs): + self.callback = callback + self.check = check + self.delay = delay + + async def __call__(self, *args, **kwargs): + await asyncio.sleep(self.delay) + ok = await self._check(*args, **kwargs) + if ok: + await loop.run_in_executor(executor, self.callback, *args) + + async def _check(self, *args, **kwargs): + if asyncio.iscoroutinefunction(self.check): + return await self.check(*args, **kwargs) + return await loop.run_in_executor(executor, self.check) + + +def _run_func_with_org(org, func, *args, **kwargs): + from orgs.utils import set_current_org + + try: + set_current_org(org) + func(*args, **kwargs) + except Exception as e: + logger.error('delay run error: %s' % e) def delay_run(ttl=5, key=None): @@ -81,7 +126,23 @@ def delay_run(ttl=5, key=None): def wrapper(*args, **kwargs): from orgs.utils import get_current_org org = get_current_org() - executor.submit(_run_func_if_is_last, ttl, key, org, func, *args, **kwargs) + suffix_key_func = key if key else default_suffix_key + uid = uuid.uuid4().__str__() + func_name = f'{func.__module__}_{func.__name__}' + key_suffix = suffix_key_func(*args, **kwargs) + cache_key = f'DELAY_RUN_{func_name}_{key_suffix}' + # 延迟两倍时间,防止缓存过期,导致校验失败 + cache.set(cache_key, uid, ttl * 2) + + def _check_func(key_id, key_value): + ret = cache.get(key_id, None) + return key_value == ret + + check_func_partial = functools.partial(_check_func, cache_key, uid) + run_func_partial = functools.partial(_run_func_with_org, org, func) + asyncio.run_coroutine_threadsafe( + Debouncer(run_func_partial, check_func_partial, ttl)(*args, **kwargs), + loop=loop) return wrapper