perf: 使用 asyncio 延迟并发执行

pull/9494/head
Eric 2 years ago
parent 37a52c420f
commit 8b7bccc4ad

@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
#
import asyncio
import functools
import inspect
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
@ -65,7 +67,50 @@ def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs):
logger.error('delay run error: %s' % e)
executor = ThreadPoolExecutor(10)
class LoopThread(threading.Thread):
def __init__(self, loop, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = loop
def run(self) -> None:
asyncio.set_event_loop(loop)
self.loop.run_forever()
print('loop stopped')
loop = asyncio.get_event_loop()
loop_thread = LoopThread(loop)
loop_thread.daemon = True
loop_thread.start()
executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix='debouncer')
class Debouncer(object):
def __init__(self, callback, check, delay, *args, **kwargs):
self.callback = callback
self.check = check
self.delay = delay
async def __call__(self, *args, **kwargs):
await asyncio.sleep(self.delay)
ok = await self._check(*args, **kwargs)
if ok:
await loop.run_in_executor(executor, self.callback, *args)
async def _check(self, *args, **kwargs):
if asyncio.iscoroutinefunction(self.check):
return await self.check(*args, **kwargs)
return await loop.run_in_executor(executor, self.check)
def _run_func_with_org(org, func, *args, **kwargs):
from orgs.utils import set_current_org
try:
set_current_org(org)
func(*args, **kwargs)
except Exception as e:
logger.error('delay run error: %s' % e)
def delay_run(ttl=5, key=None):
@ -81,7 +126,23 @@ def delay_run(ttl=5, key=None):
def wrapper(*args, **kwargs):
from orgs.utils import get_current_org
org = get_current_org()
executor.submit(_run_func_if_is_last, ttl, key, org, func, *args, **kwargs)
suffix_key_func = key if key else default_suffix_key
uid = uuid.uuid4().__str__()
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
# 延迟两倍时间,防止缓存过期,导致校验失败
cache.set(cache_key, uid, ttl * 2)
def _check_func(key_id, key_value):
ret = cache.get(key_id, None)
return key_value == ret
check_func_partial = functools.partial(_check_func, cache_key, uid)
run_func_partial = functools.partial(_run_func_with_org, org, func)
asyncio.run_coroutine_threadsafe(
Debouncer(run_func_partial, check_func_partial, ttl)(*args, **kwargs),
loop=loop)
return wrapper

Loading…
Cancel
Save