fix: 优化并发延迟执行

pull/9509/head
Eric 2023-02-10 18:01:31 +08:00
parent aa483a3c6d
commit 78f6f6cf7d
1 changed files with 56 additions and 90 deletions

View File

@ -5,10 +5,8 @@ import functools
import inspect import inspect
import threading import threading
import time import time
import uuid
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from django.core.cache import cache
from django.db import transaction from django.db import transaction
from .utils import logger from .utils import logger
@ -46,27 +44,6 @@ def key_by_org(*args, **kwargs):
return args[0].org_id return args[0].org_id
def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs):
from orgs.utils import set_current_org
try:
set_current_org(org)
uid = uuid.uuid4().__str__()
suffix_key_func = suffix_key if suffix_key else default_suffix_key
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
key = f'DELAY_RUN_{func_name}_{key_suffix}'
cache.set(key, uid, ttl)
st = (ttl - 2 > 1) and ttl - 2 or 2
time.sleep(st)
ret = cache.get(key, None)
if uid == ret:
func(*args, **kwargs)
except Exception as e:
logger.error('delay run error: %s' % e)
class LoopThread(threading.Thread): class LoopThread(threading.Thread):
def __init__(self, loop, *args, **kwargs): def __init__(self, loop, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -82,80 +59,61 @@ loop = asyncio.get_event_loop()
loop_thread = LoopThread(loop) loop_thread = LoopThread(loop)
loop_thread.daemon = True loop_thread.daemon = True
loop_thread.start() loop_thread.start()
executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix='debouncer') executor = ThreadPoolExecutor(max_workers=10,
thread_name_prefix='debouncer')
_loop_debouncer_func_task_cache = {}
_loop_debouncer_func_args_cache = {}
def cancel_or_remove_debouncer_task(cache_key):
task = _loop_debouncer_func_task_cache.get(cache_key, None)
if not task:
return
if task.done():
del _loop_debouncer_func_task_cache[cache_key]
else:
task.cancel()
class Debouncer(object): class Debouncer(object):
def __init__(self, callback, check, delay, *args, **kwargs): def __init__(self, callback, check, delay, loop=None, executor=None):
self.callback = callback self.callback = callback
self.check = check self.check = check
self.delay = delay self.delay = delay
self.loop = loop
if not loop:
self.loop = asyncio.get_event_loop()
self.executor = executor
async def __call__(self, *args, **kwargs): async def __call__(self, *args, **kwargs):
await asyncio.sleep(self.delay) await asyncio.sleep(self.delay)
ok = await self._check(*args, **kwargs) ok = await self._check()
if ok: if ok:
await loop.run_in_executor(executor, self.callback, *args) await self.loop.run_in_executor(self.executor, self.callback, *args)
async def _check(self, *args, **kwargs): async def _check(self):
if asyncio.iscoroutinefunction(self.check): if asyncio.iscoroutinefunction(self.check):
return await self.check(*args, **kwargs) return await self.check()
return await loop.run_in_executor(executor, self.check) return await self.loop.run_in_executor(self.executor, self.check)
def _run_func_with_org(org, func, *args, **kwargs): def _run_func_with_org(key, org, func, *args, **kwargs):
from orgs.utils import set_current_org from orgs.utils import set_current_org
try: try:
set_current_org(org) set_current_org(org)
func(*args, **kwargs) func(*args, **kwargs)
except Exception as e: except Exception as e:
logger.error('delay run error: %s' % e) logger.error('delay run error: %s' % e)
_loop_debouncer_func_task_cache.pop(key, None)
_loop_debouncer_func_args_cache.pop(key, None)
def delay_run(ttl=5, key=None): def delay_run(ttl=5, key=None, merge_args=False):
""" """
延迟执行函数, ttl 秒内, 只执行最后一次 延迟执行函数, ttl 秒内, 只执行最后一次
:param ttl: :param ttl:
:param key: 是否合并参数, 一个 callback :param key: 是否合并参数, 一个 callback
:return: :param merge_args: 是否合并之前的参数, bool
"""
def inner(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
from orgs.utils import get_current_org
org = get_current_org()
suffix_key_func = key if key else default_suffix_key
uid = uuid.uuid4().__str__()
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
# 延迟两倍时间,防止缓存过期,导致校验失败
cache.set(cache_key, uid, ttl * 2)
def _check_func(key_id, key_value):
ret = cache.get(key_id, None)
return key_value == ret
check_func_partial = functools.partial(_check_func, cache_key, uid)
run_func_partial = functools.partial(_run_func_with_org, org, func)
asyncio.run_coroutine_threadsafe(
Debouncer(run_func_partial, check_func_partial, ttl)(*args, **kwargs),
loop=loop
)
return wrapper
return inner
def merge_delay_run(ttl, key=None):
"""
合并 func 参数延迟执行, ttl 秒内, 只执行最后一次
func 参数必须是 *args
:param ttl:
:param key: 是否合并参数, 一个 callback
:return: :return:
""" """
@ -164,39 +122,46 @@ def merge_delay_run(ttl, key=None):
if len(sigs.parameters) != 1: if len(sigs.parameters) != 1:
raise ValueError('func must have one arguments: %s' % func.__name__) raise ValueError('func must have one arguments: %s' % func.__name__)
param = list(sigs.parameters.values())[0] param = list(sigs.parameters.values())[0]
if not str(param).startswith('*'): if not str(param).startswith('*') or param.kind == param.VAR_KEYWORD:
raise ValueError('func args must be startswith *: %s' % func.__name__) raise ValueError('func args must be startswith *: %s and not have **kwargs ' % func.__name__)
suffix_key_func = key if key else default_suffix_key suffix_key_func = key if key else default_suffix_key
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args): def wrapper(*args):
key_suffix = suffix_key_func(*args) from orgs.utils import get_current_org
org = get_current_org()
func_name = f'{func.__module__}_{func.__name__}' func_name = f'{func.__module__}_{func.__name__}'
cache_key = f'DELAY_MERGE_RUN_{func_name}_{key_suffix}' key_suffix = suffix_key_func(*args)
values = cache.get(cache_key, []) cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
new_arg = args
if merge_args:
values = _loop_debouncer_func_args_cache.get(cache_key, [])
new_arg = [*values, *args] new_arg = [*values, *args]
cache.set(cache_key, new_arg, ttl) _loop_debouncer_func_args_cache[cache_key] = new_arg
return delay_run(ttl, suffix_key_func)(func)(*new_arg)
cancel_or_remove_debouncer_task(cache_key)
run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
_debouncer = Debouncer(run_func_partial, lambda: True, ttl,
loop=loop, executor=executor)
task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg),
loop=loop)
_loop_debouncer_func_task_cache[cache_key] = task
return wrapper return wrapper
return inner return inner
merge_delay_run = functools.partial(delay_run, merge_args=True)
@delay_run(ttl=5) @delay_run(ttl=5)
def test_delay_run(username, year=2000): def test_delay_run(*username):
print("Hello, %s, now is %s" % (username, year)) print("Hello, %s, now is %s" % (username, time.time()))
@merge_delay_run(ttl=5, key=lambda *users: users[0][0]) @delay_run(ttl=5, key=lambda *users: users[0][0], merge_args=True)
def test_merge_delay_run(*users):
name = ','.join(users)
time.sleep(2)
print("Hello, %s, now is %s" % (name, time.time()))
@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
def test_merge_delay_run(*users): def test_merge_delay_run(*users):
name = ','.join(users) name = ','.join(users)
time.sleep(2) time.sleep(2)
@ -210,6 +175,7 @@ def do_test():
# test_delay_run('test', year=i) # test_delay_run('test', year=i)
test_merge_delay_run('test %s' % i) test_merge_delay_run('test %s' % i)
test_merge_delay_run('best %s' % i) test_merge_delay_run('best %s' % i)
test_delay_run('test run %s' % i)
end = time.time() end = time.time()
using = end - s using = end - s