perf: 优化授权树的刷新,同步解决同步异步的问题

pull/12599/head
ibuler 2024-01-23 16:45:59 +08:00 committed by Bryan
parent f5802ace02
commit 0303408be8
9 changed files with 72 additions and 34 deletions

View File

@ -63,7 +63,7 @@ def create_accounts_activities(account, action='create'):
def on_account_create_by_template(sender, instance, created=False, **kwargs):
if not created or instance.source != 'template':
return
push_accounts_if_need(accounts=(instance,))
push_accounts_if_need.delay(accounts=(instance,))
create_accounts_activities(instance, action='create')

View File

@ -63,13 +63,13 @@ def on_asset_create(sender, instance=None, created=False, **kwargs):
return
logger.info("Asset create signal recv: {}".format(instance))
ensure_asset_has_node(assets=(instance,))
ensure_asset_has_node.delay(assets=(instance,))
# 获取资产硬件信息
auto_config = instance.auto_config
if auto_config.get('ping_enabled'):
logger.debug('Asset {} ping enabled, test connectivity'.format(instance.name))
test_assets_connectivity_handler(assets=(instance,))
test_assets_connectivity_handler.delay(assets=(instance,))
if auto_config.get('gather_facts_enabled'):
logger.debug('Asset {} gather facts enabled, gather facts'.format(instance.name))
gather_assets_facts_handler(assets=(instance,))

View File

@ -2,14 +2,16 @@
#
from operator import add, sub
from django.conf import settings
from django.db.models.signals import m2m_changed
from django.dispatch import receiver
from assets.models import Asset, Node
from common.const.signals import PRE_CLEAR, POST_ADD, PRE_REMOVE
from common.decorators import on_transaction_commit, merge_delay_run
from common.signals import django_ready
from common.utils import get_logger
from orgs.utils import tmp_to_org
from orgs.utils import tmp_to_org, tmp_to_root_org
from ..tasks import check_node_assets_amount_task
logger = get_logger(__file__)
@ -34,7 +36,7 @@ def on_node_asset_change(sender, action, instance, reverse, pk_set, **kwargs):
node_ids = [instance.id]
else:
node_ids = list(pk_set)
update_nodes_assets_amount(node_ids=node_ids)
update_nodes_assets_amount.delay(node_ids=node_ids)
@merge_delay_run(ttl=30)
@ -52,3 +54,18 @@ def update_nodes_assets_amount(node_ids=()):
node.assets_amount = node.get_assets_amount()
Node.objects.bulk_update(nodes, ['assets_amount'])
@receiver(django_ready)
def set_assets_size_to_setting(sender, **kwargs):
from assets.models import Asset
try:
with tmp_to_root_org():
amount = Asset.objects.order_by().count()
except:
amount = 0
if amount > 20000:
settings.ASSET_SIZE = 'large'
elif amount > 2000:
settings.ASSET_SIZE = 'medium'

View File

@ -44,18 +44,18 @@ def on_node_post_create(sender, instance, created, update_fields, **kwargs):
need_expire = False
if need_expire:
expire_node_assets_mapping(org_ids=(instance.org_id,))
expire_node_assets_mapping.delay(org_ids=(instance.org_id,))
@receiver(post_delete, sender=Node)
def on_node_post_delete(sender, instance, **kwargs):
expire_node_assets_mapping(org_ids=(instance.org_id,))
expire_node_assets_mapping.delay(org_ids=(instance.org_id,))
@receiver(m2m_changed, sender=Asset.nodes.through)
def on_node_asset_change(sender, instance, action='pre_remove', **kwargs):
if action.startswith('post'):
expire_node_assets_mapping(org_ids=(instance.org_id,))
expire_node_assets_mapping.delay(org_ids=(instance.org_id,))
@receiver(django_ready)

View File

@ -34,9 +34,9 @@ def update_user_last_used(users=()):
def after_authenticate_update_date(user, token=None):
update_user_last_used(users=(user.id,))
update_user_last_used.delay(users=(user.id,))
if token:
update_token_last_used(tokens=(token,))
update_token_last_used.delay(tokens=(token,))
class AccessTokenAuthentication(authentication.BaseAuthentication):

View File

@ -199,6 +199,32 @@ def merge_delay_run(ttl=5, key=None):
:return:
"""
def delay(func, *args, **kwargs):
from orgs.utils import get_current_org
suffix_key_func = key if key else default_suffix_key
org = get_current_org()
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}'
cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {})
for k, v in kwargs.items():
if not isinstance(v, (tuple, list, set)):
raise ValueError('func kwargs value must be list or tuple: %s %s' % (func.__name__, v))
v = set(v)
if k not in cache_kwargs:
cache_kwargs[k] = v
else:
cache_kwargs[k] = cache_kwargs[k].union(v)
_loop_debouncer_func_args_cache[cache_key] = cache_kwargs
run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs)
def apply(func, sync=False, *args, **kwargs):
if sync:
return func(*args, **kwargs)
else:
return delay(func, *args, **kwargs)
def inner(func):
sigs = inspect.signature(func)
if len(sigs.parameters) != 1:
@ -206,27 +232,12 @@ def merge_delay_run(ttl=5, key=None):
param = list(sigs.parameters.values())[0]
if not isinstance(param.default, tuple):
raise ValueError('func default must be tuple: %s' % param.default)
suffix_key_func = key if key else default_suffix_key
func.delay = functools.partial(delay, func)
func.apply = functools.partial(apply, func)
@functools.wraps(func)
def wrapper(*args, **kwargs):
from orgs.utils import get_current_org
org = get_current_org()
func_name = f'{func.__module__}_{func.__name__}'
key_suffix = suffix_key_func(*args, **kwargs)
cache_key = f'MERGE_DELAY_RUN_{func_name}_{key_suffix}'
cache_kwargs = _loop_debouncer_func_args_cache.get(cache_key, {})
for k, v in kwargs.items():
if not isinstance(v, (tuple, list, set)):
raise ValueError('func kwargs value must be list or tuple: %s %s' % (func.__name__, v))
v = set(v)
if k not in cache_kwargs:
cache_kwargs[k] = v
else:
cache_kwargs[k] = cache_kwargs[k].union(v)
_loop_debouncer_func_args_cache[cache_key] = cache_kwargs
run_debouncer_func(cache_key, org, ttl, func, *args, **cache_kwargs)
return func(*args, **kwargs)
return wrapper

View File

@ -214,6 +214,9 @@ PERM_TREE_REGEN_INTERVAL = CONFIG.PERM_TREE_REGEN_INTERVAL
MAGNUS_ORACLE_PORTS = CONFIG.MAGNUS_ORACLE_PORTS
LIMIT_SUPER_PRIV = CONFIG.LIMIT_SUPER_PRIV
# Asset account may be too many
ASSET_SIZE = 'small'
# Chat AI
CHAT_AI_ENABLED = CONFIG.CHAT_AI_ENABLED
GPT_API_KEY = CONFIG.GPT_API_KEY

View File

@ -87,7 +87,7 @@ class OrgResourceStatisticsRefreshUtil:
if not cache_field_name:
return
org = getattr(instance, 'org', None)
cls.refresh_org_fields(((org, cache_field_name),))
cls.refresh_org_fields.delay(org_fields=((org, cache_field_name),))
@receiver(post_save)

View File

@ -72,7 +72,7 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin):
@timeit
def refresh_if_need(self, force=False):
built_just_now = cache.get(self.cache_key_time)
built_just_now = False if settings.ASSET_SIZE == 'small' else cache.get(self.cache_key_time)
if built_just_now:
logger.info('Refresh user perm tree just now, pass: {}'.format(built_just_now))
return
@ -80,12 +80,18 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin):
if not to_refresh_orgs:
logger.info('Not have to refresh orgs')
return
logger.info("Delay refresh user orgs: {} {}".format(self.user, [o.name for o in to_refresh_orgs]))
refresh_user_orgs_perm_tree(user_orgs=((self.user, tuple(to_refresh_orgs)),))
refresh_user_favorite_assets(users=(self.user,))
sync = True if settings.ASSET_SIZE == 'small' else False
refresh_user_orgs_perm_tree.apply(sync=sync, user_orgs=((self.user, tuple(to_refresh_orgs)),))
refresh_user_favorite_assets.apply(sync=sync, users=(self.user,))
@timeit
def refresh_tree_manual(self):
"""
用来手动 debug
:return:
"""
built_just_now = cache.get(self.cache_key_time)
if built_just_now:
logger.info('Refresh just now, pass: {}'.format(built_just_now))
@ -105,8 +111,9 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin):
return
self._clean_user_perm_tree_for_legacy_org()
ttl = settings.PERM_TREE_REGEN_INTERVAL
cache.set(self.cache_key_time, int(time.time()), ttl)
if settings.ASSET_SIZE != 'small':
ttl = settings.PERM_TREE_REGEN_INTERVAL
cache.set(self.cache_key_time, int(time.time()), ttl)
lock = UserGrantedTreeRebuildLock(self.user.id)
got = lock.acquire(blocking=False)