mirror of https://github.com/jumpserver/jumpserver
Merge pull request #9514 from jumpserver/pr@dev@fix_event_loop
fix: 修复默认 event loop 冲突问题pull/9515/head
commit
e88cb71d3c
|
@ -44,21 +44,25 @@ def key_by_org(*args, **kwargs):
|
||||||
return args[0].org_id
|
return args[0].org_id
|
||||||
|
|
||||||
|
|
||||||
class LoopThread(threading.Thread):
|
class EventLoopThread(threading.Thread):
|
||||||
def __init__(self, loop, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.loop = loop
|
self._loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(self._loop)
|
||||||
self.loop.run_forever()
|
try:
|
||||||
print('loop stopped')
|
self._loop.run_forever()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Event loop stopped with err: {} ".format(e))
|
||||||
|
|
||||||
|
def get_loop(self):
|
||||||
|
return self._loop
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
_loop_thread = EventLoopThread()
|
||||||
loop_thread = LoopThread(loop)
|
_loop_thread.setDaemon(True)
|
||||||
loop_thread.daemon = True
|
_loop_thread.start()
|
||||||
loop_thread.start()
|
|
||||||
executor = ThreadPoolExecutor(max_workers=10,
|
executor = ThreadPoolExecutor(max_workers=10,
|
||||||
thread_name_prefix='debouncer')
|
thread_name_prefix='debouncer')
|
||||||
_loop_debouncer_func_task_cache = {}
|
_loop_debouncer_func_task_cache = {}
|
||||||
|
@ -87,14 +91,15 @@ class Debouncer(object):
|
||||||
|
|
||||||
async def __call__(self, *args, **kwargs):
|
async def __call__(self, *args, **kwargs):
|
||||||
await asyncio.sleep(self.delay)
|
await asyncio.sleep(self.delay)
|
||||||
ok = await self._check()
|
ok = await self._run_sync_to_async(self.check)
|
||||||
if ok:
|
if ok:
|
||||||
await self.loop.run_in_executor(self.executor, self.callback, *args)
|
callback_func = functools.partial(self.callback, *args, **kwargs)
|
||||||
|
return await self._run_sync_to_async(callback_func)
|
||||||
|
|
||||||
async def _check(self):
|
async def _run_sync_to_async(self, func):
|
||||||
if asyncio.iscoroutinefunction(self.check):
|
if asyncio.iscoroutinefunction(func):
|
||||||
return await self.check()
|
return await func()
|
||||||
return await self.loop.run_in_executor(self.executor, self.check)
|
return await self.loop.run_in_executor(self.executor, func)
|
||||||
|
|
||||||
|
|
||||||
def _run_func_with_org(key, org, func, *args, **kwargs):
|
def _run_func_with_org(key, org, func, *args, **kwargs):
|
||||||
|
@ -142,6 +147,7 @@ def delay_run(ttl=5, key=None, merge_args=False):
|
||||||
cancel_or_remove_debouncer_task(cache_key)
|
cancel_or_remove_debouncer_task(cache_key)
|
||||||
|
|
||||||
run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
|
run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
|
||||||
|
loop = _loop_thread.get_loop()
|
||||||
_debouncer = Debouncer(run_func_partial, lambda: True, ttl,
|
_debouncer = Debouncer(run_func_partial, lambda: True, ttl,
|
||||||
loop=loop, executor=executor)
|
loop=loop, executor=executor)
|
||||||
task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg),
|
task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg),
|
||||||
|
|
Loading…
Reference in New Issue