mirror of https://github.com/jumpserver/jumpserver
perf: 改造 merge_delay_func
parent
66b248db77
commit
9cd780eb06
|
@ -21,13 +21,13 @@ def on_node_pre_save(sender, instance: Node, **kwargs):
|
||||||
|
|
||||||
|
|
||||||
@merge_delay_run(ttl=5, key=key_by_org)
|
@merge_delay_run(ttl=5, key=key_by_org)
|
||||||
def test_assets_connectivity_handler(*assets):
|
def test_assets_connectivity_handler(assets=()):
|
||||||
task_name = gettext_noop("Test assets connectivity ")
|
task_name = gettext_noop("Test assets connectivity ")
|
||||||
test_assets_connectivity_task.delay(assets, task_name)
|
test_assets_connectivity_task.delay(assets, task_name)
|
||||||
|
|
||||||
|
|
||||||
@merge_delay_run(ttl=5, key=key_by_org)
|
@merge_delay_run(ttl=5, key=key_by_org)
|
||||||
def gather_assets_facts_handler(*assets):
|
def gather_assets_facts_handler(assets=()):
|
||||||
if not assets:
|
if not assets:
|
||||||
logger.info("No assets to update hardware info")
|
logger.info("No assets to update hardware info")
|
||||||
return
|
return
|
||||||
|
@ -36,7 +36,7 @@ def gather_assets_facts_handler(*assets):
|
||||||
|
|
||||||
|
|
||||||
@merge_delay_run(ttl=5, key=key_by_org)
|
@merge_delay_run(ttl=5, key=key_by_org)
|
||||||
def ensure_asset_has_node(*assets):
|
def ensure_asset_has_node(assets=()):
|
||||||
asset_ids = [asset.id for asset in assets]
|
asset_ids = [asset.id for asset in assets]
|
||||||
has_ids = Asset.nodes.through.objects \
|
has_ids = Asset.nodes.through.objects \
|
||||||
.filter(asset_id__in=asset_ids) \
|
.filter(asset_id__in=asset_ids) \
|
||||||
|
@ -60,16 +60,16 @@ def on_asset_create(sender, instance=None, created=False, **kwargs):
|
||||||
return
|
return
|
||||||
logger.info("Asset create signal recv: {}".format(instance))
|
logger.info("Asset create signal recv: {}".format(instance))
|
||||||
|
|
||||||
ensure_asset_has_node(instance)
|
ensure_asset_has_node(assets=(instance,))
|
||||||
|
|
||||||
# 获取资产硬件信息
|
# 获取资产硬件信息
|
||||||
auto_info = instance.auto_info
|
auto_info = instance.auto_info
|
||||||
if auto_info.get('ping_enabled'):
|
if auto_info.get('ping_enabled'):
|
||||||
logger.debug('Asset {} ping enabled, test connectivity'.format(instance.name))
|
logger.debug('Asset {} ping enabled, test connectivity'.format(instance.name))
|
||||||
test_assets_connectivity_handler(instance)
|
test_assets_connectivity_handler(assets=(instance,))
|
||||||
if auto_info.get('gather_facts_enabled'):
|
if auto_info.get('gather_facts_enabled'):
|
||||||
logger.debug('Asset {} gather facts enabled, gather facts'.format(instance.name))
|
logger.debug('Asset {} gather facts enabled, gather facts'.format(instance.name))
|
||||||
gather_assets_facts_handler(instance)
|
gather_assets_facts_handler(assets=(instance,))
|
||||||
|
|
||||||
|
|
||||||
RELATED_NODE_IDS = '_related_node_ids'
|
RELATED_NODE_IDS = '_related_node_ids'
|
||||||
|
|
|
@ -34,11 +34,11 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs):
|
||||||
node_ids = [instance.id]
|
node_ids = [instance.id]
|
||||||
else:
|
else:
|
||||||
node_ids = pk_set
|
node_ids = pk_set
|
||||||
update_nodes_assets_amount(*node_ids)
|
update_nodes_assets_amount(node_ids=node_ids)
|
||||||
|
|
||||||
|
|
||||||
@merge_delay_run(ttl=5)
|
@merge_delay_run(ttl=5)
|
||||||
def update_nodes_assets_amount(*node_ids):
|
def update_nodes_assets_amount(node_ids=()):
|
||||||
nodes = list(Node.objects.filter(id__in=node_ids))
|
nodes = list(Node.objects.filter(id__in=node_ids))
|
||||||
logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids)))
|
logger.info('Update nodes assets amount: {} nodes'.format(len(node_ids)))
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ node_assets_mapping_pub_sub = lazy(lambda: RedisPubSub('fm.node_asset_mapping'),
|
||||||
|
|
||||||
|
|
||||||
@merge_delay_run(ttl=5)
|
@merge_delay_run(ttl=5)
|
||||||
def expire_node_assets_mapping(*org_ids):
|
def expire_node_assets_mapping(org_ids=()):
|
||||||
# 所有进程清除(自己的 memory 数据)
|
# 所有进程清除(自己的 memory 数据)
|
||||||
root_org_id = Organization.ROOT_ID
|
root_org_id = Organization.ROOT_ID
|
||||||
Node.expire_node_all_asset_ids_cache_mapping(root_org_id)
|
Node.expire_node_all_asset_ids_cache_mapping(root_org_id)
|
||||||
|
@ -43,18 +43,18 @@ def on_node_post_create(sender, instance, created, update_fields, **kwargs):
|
||||||
need_expire = False
|
need_expire = False
|
||||||
|
|
||||||
if need_expire:
|
if need_expire:
|
||||||
expire_node_assets_mapping(instance.org_id)
|
expire_node_assets_mapping(org_ids=(instance.org_id,))
|
||||||
|
|
||||||
|
|
||||||
@receiver(post_delete, sender=Node)
|
@receiver(post_delete, sender=Node)
|
||||||
def on_node_post_delete(sender, instance, **kwargs):
|
def on_node_post_delete(sender, instance, **kwargs):
|
||||||
expire_node_assets_mapping(instance.org_id)
|
expire_node_assets_mapping(org_ids=(instance.org_id,))
|
||||||
|
|
||||||
|
|
||||||
@receiver(m2m_changed, sender=Asset.nodes.through)
|
@receiver(m2m_changed, sender=Asset.nodes.through)
|
||||||
def on_node_asset_change(sender, instance, **kwargs):
|
def on_node_asset_change(sender, instance, **kwargs):
|
||||||
logger.debug("Recv asset nodes changed signal, expire memery node asset mapping")
|
logger.debug("Recv asset nodes changed signal, expire memery node asset mapping")
|
||||||
expire_node_assets_mapping(instance.org_id)
|
expire_node_assets_mapping(org_ids=(instance.org_id,))
|
||||||
|
|
||||||
|
|
||||||
@receiver(django_ready)
|
@receiver(django_ready)
|
||||||
|
|
|
@ -41,7 +41,10 @@ def default_suffix_key(*args, **kwargs):
|
||||||
|
|
||||||
|
|
||||||
def key_by_org(*args, **kwargs):
|
def key_by_org(*args, **kwargs):
|
||||||
return args[0].org_id
|
values = list(kwargs.values())
|
||||||
|
if not values:
|
||||||
|
return 'default'
|
||||||
|
return values[0].org_id
|
||||||
|
|
||||||
|
|
||||||
class EventLoopThread(threading.Thread):
|
class EventLoopThread(threading.Thread):
|
||||||
|
@ -79,6 +82,15 @@ def cancel_or_remove_debouncer_task(cache_key):
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
def run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs):
|
||||||
|
cancel_or_remove_debouncer_task(cache_key)
|
||||||
|
run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
|
||||||
|
loop = _loop_thread.get_loop()
|
||||||
|
_debouncer = Debouncer(run_func_partial, lambda: True, ttl, loop=loop, executor=executor)
|
||||||
|
task = asyncio.run_coroutine_threadsafe(_debouncer(*args, **kwargs), loop=loop)
|
||||||
|
_loop_debouncer_func_task_cache[cache_key] = task
|
||||||
|
|
||||||
|
|
||||||
class Debouncer(object):
|
class Debouncer(object):
|
||||||
def __init__(self, callback, check, delay, loop=None, executor=None):
|
def __init__(self, callback, check, delay, loop=None, executor=None):
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
|
@ -113,12 +125,36 @@ def _run_func_with_org(key, org, func, *args, **kwargs):
|
||||||
_loop_debouncer_func_args_cache.pop(key, None)
|
_loop_debouncer_func_args_cache.pop(key, None)
|
||||||
|
|
||||||
|
|
||||||
def delay_run(ttl=5, key=None, merge_args=False):
|
def delay_run(ttl=5, key=None):
|
||||||
"""
|
"""
|
||||||
延迟执行函数, 在 ttl 秒内, 只执行最后一次
|
延迟执行函数, 在 ttl 秒内, 只执行最后一次
|
||||||
:param ttl:
|
:param ttl:
|
||||||
:param key: 是否合并参数, 一个 callback
|
:param key: 是否合并参数, 一个 callback
|
||||||
:param merge_args: 是否合并之前的参数, bool
|
:return:
|
||||||
|
"""
|
||||||
|
|
||||||
|
def inner(func):
|
||||||
|
suffix_key_func = key if key else default_suffix_key
|
||||||
|
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
from orgs.utils import get_current_org
|
||||||
|
org = get_current_org()
|
||||||
|
func_name = f'{func.__module__}_{func.__name__}'
|
||||||
|
key_suffix = suffix_key_func(*args)
|
||||||
|
cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
|
||||||
|
run_debouncer_func(cache_key, org, ttl, func, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
|
||||||
|
def merge_delay_run(ttl=5, key=None):
|
||||||
|
"""
|
||||||
|
延迟执行函数, 在 ttl 秒内, 只执行最后一次, 并且合并参数
|
||||||
|
:param ttl:
|
||||||
|
:param key: 是否合并参数, 一个 callback
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -127,48 +163,39 @@ def delay_run(ttl=5, key=None, merge_args=False):
|
||||||
if len(sigs.parameters) != 1:
|
if len(sigs.parameters) != 1:
|
||||||
raise ValueError('func must have one arguments: %s' % func.__name__)
|
raise ValueError('func must have one arguments: %s' % func.__name__)
|
||||||
param = list(sigs.parameters.values())[0]
|
param = list(sigs.parameters.values())[0]
|
||||||
if not str(param).startswith('*') or param.kind == param.VAR_KEYWORD:
|
if not isinstance(param.default, tuple):
|
||||||
raise ValueError('func args must be startswith *: %s and not have **kwargs ' % func.__name__)
|
raise ValueError('func default must be tuple: %s' % param.default)
|
||||||
suffix_key_func = key if key else default_suffix_key
|
suffix_key_func = key if key else default_suffix_key
|
||||||
|
|
||||||
@functools.wraps(func)
|
@functools.wraps(func)
|
||||||
def wrapper(*args):
|
def wrapper(*args, **kwargs):
|
||||||
from orgs.utils import get_current_org
|
from orgs.utils import get_current_org
|
||||||
org = get_current_org()
|
org = get_current_org()
|
||||||
func_name = f'{func.__module__}_{func.__name__}'
|
func_name = f'{func.__module__}_{func.__name__}'
|
||||||
key_suffix = suffix_key_func(*args)
|
key_suffix = suffix_key_func(*args, **kwargs)
|
||||||
cache_key = f'DELAY_RUN_{func_name}_{key_suffix}'
|
cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}'
|
||||||
new_arg = args
|
cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {})
|
||||||
if merge_args:
|
for k, v in kwargs.items():
|
||||||
values = _loop_debouncer_func_args_cache.get(cache_key, [])
|
if not isinstance(v, tuple):
|
||||||
new_arg = [*values, *args]
|
raise ValueError('func kwargs value must be list or tuple: %s' % func.__name__)
|
||||||
_loop_debouncer_func_args_cache[cache_key] = new_arg
|
if k not in cache_kwargs:
|
||||||
|
cache_kwargs[k] = v
|
||||||
cancel_or_remove_debouncer_task(cache_key)
|
else:
|
||||||
|
cache_kwargs[k] += v
|
||||||
run_func_partial = functools.partial(_run_func_with_org, cache_key, org, func)
|
run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs)
|
||||||
loop = _loop_thread.get_loop()
|
|
||||||
_debouncer = Debouncer(run_func_partial, lambda: True, ttl,
|
|
||||||
loop=loop, executor=executor)
|
|
||||||
task = asyncio.run_coroutine_threadsafe(_debouncer(*new_arg),
|
|
||||||
loop=loop)
|
|
||||||
_loop_debouncer_func_task_cache[cache_key] = task
|
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
merge_delay_run = functools.partial(delay_run, merge_args=True)
|
|
||||||
|
|
||||||
|
|
||||||
@delay_run(ttl=5)
|
@delay_run(ttl=5)
|
||||||
def test_delay_run(*username):
|
def test_delay_run(*username):
|
||||||
print("Hello, %s, now is %s" % (username, time.time()))
|
print("Hello, %s, now is %s" % (username, time.time()))
|
||||||
|
|
||||||
|
|
||||||
@delay_run(ttl=5, key=lambda *users: users[0][0], merge_args=True)
|
@merge_delay_run(ttl=5, key=lambda *users: users[0][0])
|
||||||
def test_merge_delay_run(*users):
|
def test_merge_delay_run(users=()):
|
||||||
name = ','.join(users)
|
name = ','.join(users)
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
print("Hello, %s, now is %s" % (name, time.time()))
|
print("Hello, %s, now is %s" % (name, time.time()))
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
from assets.signal_handlers.node_assets_mapping import expire_node_assets_mapping
|
from assets.signal_handlers.node_assets_mapping import expire_node_assets_mapping as _expire_node_assets_mapping
|
||||||
from orgs.caches import OrgResourceStatisticsCache
|
from orgs.caches import OrgResourceStatisticsCache
|
||||||
from orgs.models import Organization
|
from orgs.models import Organization
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ def expire_node_assets_mapping():
|
||||||
org_ids = [*org_ids, '00000000-0000-0000-0000-000000000000']
|
org_ids = [*org_ids, '00000000-0000-0000-0000-000000000000']
|
||||||
|
|
||||||
for org_id in org_ids:
|
for org_id in org_ids:
|
||||||
expire_node_assets_mapping(org_id)
|
_expire_node_assets_mapping(org_ids=(org_id,))
|
||||||
|
|
||||||
|
|
||||||
def expire_org_resource_statistics_cache():
|
def expire_org_resource_statistics_cache():
|
||||||
|
|
|
@ -76,7 +76,7 @@ model_cache_field_mapper = {
|
||||||
class OrgResourceStatisticsRefreshUtil:
|
class OrgResourceStatisticsRefreshUtil:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@merge_delay_run(ttl=5)
|
@merge_delay_run(ttl=5)
|
||||||
def refresh_org_fields(*org_fields):
|
def refresh_org_fields(org_fields=()):
|
||||||
for org, cache_field_name in org_fields:
|
for org, cache_field_name in org_fields:
|
||||||
OrgResourceStatisticsCache(org).expire(*cache_field_name)
|
OrgResourceStatisticsCache(org).expire(*cache_field_name)
|
||||||
OrgResourceStatisticsCache(Organization.root()).expire(*cache_field_name)
|
OrgResourceStatisticsCache(Organization.root()).expire(*cache_field_name)
|
||||||
|
|
Loading…
Reference in New Issue