You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
jumpserver/apps/common/decorators.py

217 lines
6.0 KiB

# -*- coding: utf-8 -*-
#
import asyncio
import functools
import inspect
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from django.core.cache import cache
from django.db import transaction
from .utils import logger
def on_transaction_commit(func):
"""
如果不调用on_commit, 对象创建时添加多对多字段值失败
"""
def inner(*args, **kwargs):
transaction.on_commit(lambda: func(*args, **kwargs))
return inner
class Singleton(object):
""" 单例类 """
def __init__(self, cls):
self._cls = cls
self._instance = {}
def __call__(self):
if self._cls not in self._instance:
self._instance[self._cls] = self._cls()
return self._instance[self._cls]
def default_suffix_key(*args, **kwargs):
return 'default'
def key_by_org(*args, **kwargs):
return args[0].org_id
def _run_func_if_is_last(ttl, suffix_key, org, func, *args, **kwargs):
from orgs.utils import set_current_org
try:
set_current_org(org)
uid = uuid.uuid4().__str__()
suffix_key_func = suffix_key if suffix_key else default_suffix_key
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
key = f'DELAY_RUN_{func_name}_{key_suffix}'
cache.set(key, uid, ttl)
st = (ttl - 2 > 1) and ttl - 2 or 2
time.sleep(st)
ret = cache.get(key, None)
if uid == ret:
func(*args, **kwargs)
except Exception as e:
logger.error('delay run error: %s' % e)
class LoopThread(threading.Thread):
def __init__(self, loop, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = loop
def run(self) -> None:
asyncio.set_event_loop(loop)
self.loop.run_forever()
print('loop stopped')
loop = asyncio.get_event_loop()
loop_thread = LoopThread(loop)
loop_thread.daemon = True
loop_thread.start()
executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix='debouncer')
class Debouncer(object):
def __init__(self, callback, check, delay, *args, **kwargs):
self.callback = callback
self.check = check
self.delay = delay
async def __call__(self, *args, **kwargs):
await asyncio.sleep(self.delay)
ok = await self._check(*args, **kwargs)
if ok:
await loop.run_in_executor(executor, self.callback, *args)
async def _check(self, *args, **kwargs):
if asyncio.iscoroutinefunction(self.check):
return await self.check(*args, **kwargs)
return await loop.run_in_executor(executor, self.check)
def _run_func_with_org(org, func, *args, **kwargs):
from orgs.utils import set_current_org
try:
set_current_org(org)
func(*args, **kwargs)
except Exception as e:
logger.error('delay run error: %s' % e)
def delay_run(ttl=5, key=None):
"""
延迟执行函数, ttl 秒内, 只执行最后一次
:param ttl:
:param key: 是否合并参数, 一个 callback
:return:
"""
def inner(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
from orgs.utils import get_current_org
org = get_current_org()
suffix_key_func = key if key else default_suffix_key
uid = uuid.uuid4().__str__()
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
# 延迟两倍时间,防止缓存过期,导致校验失败
cache.set(cache_key, uid, ttl * 2)
def _check_func(key_id, key_value):
ret = cache.get(key_id, None)
return key_value == ret
check_func_partial = functools.partial(_check_func, cache_key, uid)
run_func_partial = functools.partial(_run_func_with_org, org, func)
asyncio.run_coroutine_threadsafe(
Debouncer(run_func_partial, check_func_partial, ttl)(*args, **kwargs),
loop=loop
)
return wrapper
return inner
def merge_delay_run(ttl, key=None):
"""
合并 func 参数延迟执行, ttl 秒内, 只执行最后一次
func 参数必须是 *args
:param ttl:
:param key: 是否合并参数, 一个 callback
:return:
"""
def inner(func):
sigs = inspect.signature(func)
if len(sigs.parameters) != 1:
raise ValueError('func must have one arguments: %s' % func.__name__)
param = list(sigs.parameters.values())[0]
if not str(param).startswith('*'):
raise ValueError('func args must be startswith *: %s' % func.__name__)
suffix_key_func = key if key else default_suffix_key
@functools.wraps(func)
def wrapper(*args):
key_suffix = suffix_key_func(*args)
func_name = f'{func.__module__}_{func.__name__}'
cache_key = f'DELAY_MERGE_RUN_{func_name}_{key_suffix}'
values = cache.get(cache_key, [])
new_arg = [*values, *args]
cache.set(cache_key, new_arg, ttl)
return delay_run(ttl, suffix_key_func)(func)(*new_arg)
return wrapper
return inner
@delay_run(ttl=5)
def test_delay_run(username, year=2000):
print("Hello, %s, now is %s" % (username, year))
@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
def test_merge_delay_run(*users):
name = ','.join(users)
time.sleep(2)
print("Hello, %s, now is %s" % (name, time.time()))
@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
def test_merge_delay_run(*users):
name = ','.join(users)
time.sleep(2)
print("Hello, %s, now is %s" % (name, time.time()))
def do_test():
s = time.time()
print("start : %s" % time.time())
for i in range(100):
# test_delay_run('test', year=i)
test_merge_delay_run('test %s' % i)
test_merge_delay_run('best %s' % i)
end = time.time()
using = end - s
print("end : %s, using: %s" % (end, using))