|
|
@ -1,7 +1,9 @@
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
#
|
|
|
|
#
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
import functools
|
|
|
|
import functools
|
|
|
|
import inspect
|
|
|
|
import inspect
|
|
|
|
|
|
|
|
import threading
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
import uuid
|
|
|
|
import uuid
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
@ -65,7 +67,50 @@ def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs):
|
|
|
|
logger.error('delay run error: %s' % e)
|
|
|
|
logger.error('delay run error: %s' % e)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
executor = ThreadPoolExecutor(20)
|
|
|
|
class LoopThread(threading.Thread):
|
|
|
|
|
|
|
|
def __init__(self, loop, *args, **kwargs):
|
|
|
|
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
|
|
self.loop = loop
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(self) -> None:
|
|
|
|
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
|
|
|
self.loop.run_forever()
|
|
|
|
|
|
|
|
print('loop stopped')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
|
|
|
loop_thread = LoopThread(loop)
|
|
|
|
|
|
|
|
loop_thread.daemon = True
|
|
|
|
|
|
|
|
loop_thread.start()
|
|
|
|
|
|
|
|
executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix='debouncer')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Debouncer(object):
|
|
|
|
|
|
|
|
def __init__(self, callback, check, delay, *args, **kwargs):
|
|
|
|
|
|
|
|
self.callback = callback
|
|
|
|
|
|
|
|
self.check = check
|
|
|
|
|
|
|
|
self.delay = delay
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def __call__(self, *args, **kwargs):
|
|
|
|
|
|
|
|
await asyncio.sleep(self.delay)
|
|
|
|
|
|
|
|
ok = await self._check(*args, **kwargs)
|
|
|
|
|
|
|
|
if ok:
|
|
|
|
|
|
|
|
await loop.run_in_executor(executor, self.callback, *args)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _check(self, *args, **kwargs):
|
|
|
|
|
|
|
|
if asyncio.iscoroutinefunction(self.check):
|
|
|
|
|
|
|
|
return await self.check(*args, **kwargs)
|
|
|
|
|
|
|
|
return await loop.run_in_executor(executor, self.check)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run_func_with_org(org, func, *args, **kwargs):
|
|
|
|
|
|
|
|
from orgs.utils import set_current_org
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
set_current_org(org)
|
|
|
|
|
|
|
|
func(*args, **kwargs)
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
logger.error('delay run error: %s' % e)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def delay_run(ttl=5, key=None):
|
|
|
|
def delay_run(ttl=5, key=None):
|
|
|
@ -81,7 +126,24 @@ def delay_run(ttl=5, key=None):
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
from orgs.utils import get_current_org
|
|
|
|
from orgs.utils import get_current_org
|
|
|
|
org = get_current_org()
|
|
|
|
org = get_current_org()
|
|
|
|
executor.submit(_run_func_if_is_last, ttl, key, org, func, *args, **kwargs)
|
|
|
|
suffix_key_func = key if key else default_suffix_key
|
|
|
|
|
|
|
|
uid = uuid.uuid4().__str__()
|
|
|
|
|
|
|
|
func_name = f'{func.__module__}_{func.__name__}'
|
|
|
|
|
|
|
|
key_suffix = suffix_key_func(*args, **kwargs)
|
|
|
|
|
|
|
|
cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
|
|
|
|
|
|
|
|
# 延迟两倍时间,防止缓存过期,导致校验失败
|
|
|
|
|
|
|
|
cache.set(cache_key, uid, ttl * 2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _check_func(key_id, key_value):
|
|
|
|
|
|
|
|
ret = cache.get(key_id, None)
|
|
|
|
|
|
|
|
return key_value == ret
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
check_func_partial = functools.partial(_check_func, cache_key, uid)
|
|
|
|
|
|
|
|
run_func_partial = functools.partial(_run_func_with_org, org, func)
|
|
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe(
|
|
|
|
|
|
|
|
Debouncer(run_func_partial, check_func_partial, ttl)(*args, **kwargs),
|
|
|
|
|
|
|
|
loop=loop
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
@ -130,11 +192,25 @@ def test_delay_run(username, year=2000):
|
|
|
|
@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
|
|
|
|
@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
|
|
|
|
def test_merge_delay_run(*users):
|
|
|
|
def test_merge_delay_run(*users):
|
|
|
|
name = ','.join(users)
|
|
|
|
name = ','.join(users)
|
|
|
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
|
|
print("Hello, %s, now is %s" % (name, time.time()))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
|
|
|
|
|
|
|
|
def test_merge_delay_run(*users):
|
|
|
|
|
|
|
|
name = ','.join(users)
|
|
|
|
|
|
|
|
time.sleep(2)
|
|
|
|
print("Hello, %s, now is %s" % (name, time.time()))
|
|
|
|
print("Hello, %s, now is %s" % (name, time.time()))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def do_test():
|
|
|
|
def do_test():
|
|
|
|
for i in range(10):
|
|
|
|
s = time.time()
|
|
|
|
|
|
|
|
print("start : %s" % time.time())
|
|
|
|
|
|
|
|
for i in range(100):
|
|
|
|
# test_delay_run('test', year=i)
|
|
|
|
# test_delay_run('test', year=i)
|
|
|
|
test_merge_delay_run('test %s' % i)
|
|
|
|
test_merge_delay_run('test %s' % i)
|
|
|
|
test_merge_delay_run('best %s' % i)
|
|
|
|
test_merge_delay_run('best %s' % i)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
end = time.time()
|
|
|
|
|
|
|
|
using = end - s
|
|
|
|
|
|
|
|
print("end : %s, using: %s" % (end, using))
|
|
|
|