From 9c5e4955f29a87bf275448792dea1facefb406e6 Mon Sep 17 00:00:00 2001 From: Eric Date: Sat, 11 Feb 2023 23:00:41 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=BB=98=E8=AE=A4=20e?= =?UTF-8?q?vent=20loop=20=E5=86=B2=E7=AA=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/decorators.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/apps/common/decorators.py b/apps/common/decorators.py index 38ce820b0..bdc541a9a 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -44,21 +44,25 @@ def key_by_org(*args, **kwargs): return args[0].org_id -class LoopThread(threading.Thread): - def __init__(self, loop, *args, **kwargs): +class EventLoopThread(threading.Thread): + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.loop = loop + self._loop = asyncio.new_event_loop() def run(self) -> None: - asyncio.set_event_loop(loop) - self.loop.run_forever() - print('loop stopped') + asyncio.set_event_loop(self._loop) + try: + self._loop.run_forever() + except Exception as e: + logger.error("Event loop stopped with err: {} ".format(e)) + + def get_loop(self): + return self._loop -loop = asyncio.get_event_loop() -loop_thread = LoopThread(loop) -loop_thread.daemon = True -loop_thread.start() +_loop_thread = EventLoopThread() +_loop_thread.setDaemon(True) +_loop_thread.start() executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix='debouncer') _loop_debouncer_func_task_cache = {} @@ -87,14 +91,15 @@ class Debouncer(object): async def __call__(self, *args, **kwargs): await asyncio.sleep(self.delay) - ok = await self._check() + ok = await self._run_sync_to_async(self.check) if ok: - await self.loop.run_in_executor(self.executor, self.callback, *args) + callback_func = functools.partial(self.callback, *args, **kwargs) + return await self._run_sync_to_async(callback_func) - async def _check(self): - if asyncio.iscoroutinefunction(self.check): - return await self.check() - return await self.loop.run_in_executor(self.executor, self.check) + async def _run_sync_to_async(self, func): + if asyncio.iscoroutinefunction(func): + return await func() + return await self.loop.run_in_executor(self.executor, func) def _run_func_with_org(key, org, func, *args, **kwargs): @@ -142,6 +147,7 @@ def delay_run(ttl=5, key=None, merge_args=False): cancel_or_remove_debouncer_task(cache_key) run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func) + loop = _loop_thread.get_loop() _debouncer = Debouncer(run_func_partial, lambda: True, ttl, loop=loop, executor=executor) task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg),