# -*- coding: utf-8 -*- # from operator import add, sub from assets.utils import is_asset_exists_in_node from django.db.models.signals import ( post_save, m2m_changed, pre_delete, post_delete ) from django.db.models import Q, F from django.dispatch import receiver from common.local import thread_local from common.exceptions import M2MReverseNotAllowed from common.const.signals import PRE_ADD, POST_ADD, POST_REMOVE, PRE_CLEAR from common.utils import get_logger from common.decorator import on_transaction_commit from .models import Asset, SystemUser, Node, compute_parent_key from users.models import User from .tasks import ( update_assets_hardware_info_util, test_asset_connectivity_util, push_system_user_to_assets_manual, push_system_user_to_assets, add_nodes_assets_to_system_users ) logger = get_logger(__file__) def update_asset_hardware_info_on_created(asset): logger.debug("Update asset `{}` hardware info".format(asset)) update_assets_hardware_info_util.delay([asset]) def test_asset_conn_on_created(asset): logger.debug("Test asset `{}` connectivity".format(asset)) test_asset_connectivity_util.delay([asset]) @receiver(post_save, sender=Asset) @on_transaction_commit def on_asset_created_or_update(sender, instance=None, created=False, **kwargs): """ 当资产创建时,更新硬件信息,更新可连接性 确保资产必须属于一个节点 """ if created: logger.info("Asset create signal recv: {}".format(instance)) # 获取资产硬件信息 update_asset_hardware_info_on_created(instance) test_asset_conn_on_created(instance) # 确保资产存在一个节点 has_node = instance.nodes.all().exists() if not has_node: instance.nodes.add(Node.org_root()) @receiver(post_save, sender=SystemUser, dispatch_uid="jms") def on_system_user_update(sender, instance=None, created=True, **kwargs): """ 当系统用户更新时,可能更新了秘钥,用户名等,这时要自动推送系统用户到资产上, 其实应该当 用户名,密码,秘钥 sudo等更新时再推送,这里偷个懒, 这里直接取了 instance.assets 因为nodes和系统用户发生变化时,会自动将nodes下的资产 关联到上面 """ if instance and not created: logger.info("System user update signal recv: {}".format(instance)) assets = instance.assets.all().valid() push_system_user_to_assets.delay(instance, assets) @receiver(m2m_changed, sender=SystemUser.assets.through) def on_system_user_assets_change(sender, instance=None, action='', model=None, pk_set=None, **kwargs): """ 当系统用户和资产关系发生变化时,应该重新推送系统用户到新添加的资产中 """ if action != POST_ADD: return logger.debug("System user assets change signal recv: {}".format(instance)) queryset = model.objects.filter(pk__in=pk_set) if model == Asset: system_users = [instance] assets = queryset else: system_users = queryset assets = [instance] for system_user in system_users: push_system_user_to_assets.delay(system_user, assets) @receiver(m2m_changed, sender=SystemUser.users.through) def on_system_user_users_change(sender, instance=None, action='', model=None, pk_set=None, **kwargs): """ 当系统用户和用户关系发生变化时,应该重新推送系统用户资产中 """ if action != POST_ADD: return if not instance.username_same_with_user: return logger.debug("System user users change signal recv: {}".format(instance)) queryset = model.objects.filter(pk__in=pk_set) if model == SystemUser: system_users = queryset else: system_users = [instance] for s in system_users: push_system_user_to_assets_manual.delay(s) @receiver(m2m_changed, sender=SystemUser.nodes.through) def on_system_user_nodes_change(sender, instance=None, action=None, model=None, pk_set=None, **kwargs): """ 当系统用户和节点关系发生变化时,应该将节点下资产关联到新的系统用户上 """ if action != POST_ADD: return logger.info("System user nodes update signal recv: {}".format(instance)) queryset = model.objects.filter(pk__in=pk_set) if model == Node: nodes_keys = queryset.values_list('key', flat=True) system_users = [instance] else: nodes_keys = [instance.key] system_users = queryset add_nodes_assets_to_system_users.delay(nodes_keys, system_users) @receiver(m2m_changed, sender=SystemUser.groups.through) def on_system_user_groups_change(instance, action, pk_set, reverse, **kwargs): """ 当系统用户和用户组关系发生变化时,应该将组下用户关联到新的系统用户上 """ if action != POST_ADD: return if reverse: raise M2MReverseNotAllowed logger.info("System user groups update signal recv: {}".format(instance)) users = User.objects.filter(groups__id__in=pk_set).distinct() instance.users.add(users) @receiver(m2m_changed, sender=Asset.nodes.through) def on_asset_nodes_add(instance, action, reverse, pk_set, **kwargs): """ 本操作共访问 4 次数据库 当资产的节点发生变化时,或者 当节点的资产关系发生变化时, 节点下新增的资产,添加到节点关联的系统用户中 """ if action != POST_ADD: return logger.debug("Assets node add signal recv: {}".format(action)) if reverse: nodes = [instance.key] asset_ids = pk_set else: nodes = Node.objects.filter(pk__in=pk_set).values_list('key', flat=True) asset_ids = [instance.id] # 节点资产发生变化时,将资产关联到节点及祖先节点关联的系统用户, 只关注新增的 nodes_ancestors_keys = set() for node in nodes: nodes_ancestors_keys.update(Node.get_node_ancestor_keys(node, with_self=True)) # 查询所有祖先节点关联的系统用户,都是要跟资产建立关系的 system_user_ids = SystemUser.objects.filter( nodes__key__in=nodes_ancestors_keys ).distinct().values_list('id', flat=True) # 查询所有已存在的关系 m2m_model = SystemUser.assets.through exist = set(m2m_model.objects.filter( systemuser_id__in=system_user_ids, asset_id__in=asset_ids ).values_list('systemuser_id', 'asset_id')) # TODO 优化 to_create = [] for system_user_id in system_user_ids: asset_ids_to_push = [] for asset_id in asset_ids: if (system_user_id, asset_id) in exist: continue asset_ids_to_push.append(asset_id) to_create.append(m2m_model( systemuser_id=system_user_id, asset_id=asset_id )) push_system_user_to_assets.delay(system_user_id, asset_ids_to_push) m2m_model.objects.bulk_create(to_create) def _update_node_assets_amount(node: Node, asset_pk_set: set, operator=add): """ 一个节点与多个资产关系变化时,更新计数 :param node: 节点实例 :param asset_pk_set: 资产的`id`集合, 内部不会修改该值 :param operator: 操作 * -> Node # -> Asset * [3] / \ * * [2] / \ * * [1] / / \ * [a] # # [b] """ # 获取节点[1]祖先节点的 `key` 含自己,也就是[1, 2, 3]节点的`key` ancestor_keys = node.get_ancestor_keys(with_self=True) ancestors = Node.objects.filter(key__in=ancestor_keys).order_by('-key') to_update = [] for ancestor in ancestors: # 迭代祖先节点的`key`,顺序是 [1] -> [2] -> [3] # 查询该节点及其后代节点是否包含要操作的资产,将包含的从要操作的 # 资产集合中去掉,他们是重复节点,无论增加或删除都不会影响节点的资产数量 asset_pk_set -= set(Asset.objects.filter( id__in=asset_pk_set ).filter( Q(nodes__key__istartswith=f'{ancestor.key}:') | Q(nodes__key=ancestor.key) ).distinct().values_list('id', flat=True)) if not asset_pk_set: # 要操作的资产集合为空,说明都是重复资产,不用改变节点资产数量 # 而且既然它包含了,它的祖先节点肯定也包含了,所以祖先节点都不用 # 处理了 break ancestor.assets_amount = operator(F('assets_amount'), len(asset_pk_set)) to_update.append(ancestor) Node.objects.bulk_update(to_update, fields=('assets_amount', 'parent_key')) def _remove_ancestor_keys(ancestor_key, tree_set): # 这里判断 `ancestor_key` 不能是空,防止数据错误导致的死循环 # 判断是否在集合里,来区分是否已被处理过 while ancestor_key and ancestor_key in tree_set: tree_set.remove(ancestor_key) ancestor_key = compute_parent_key(ancestor_key) def _update_nodes_asset_amount(node_keys, asset_pk, operator): """ 一个资产与多个节点关系变化时,更新计数 :param node_keys: 节点 id 的集合 :param asset_pk: 资产 id :param operator: 操作 """ # 所有相关节点的祖先节点,组成一棵局部树 ancestor_keys = set() for key in node_keys: ancestor_keys.update(Node.get_node_ancestor_keys(key)) # 相关节点可能是其他相关节点的祖先节点,如果是从相关节点里干掉 node_keys -= ancestor_keys to_update_keys = [] for key in node_keys: # 遍历相关节点,处理它及其祖先节点 # 查询该节点是否包含待处理资产 exists = is_asset_exists_in_node(asset_pk, key) parent_key = compute_parent_key(key) if exists: # 如果资产在该节点,那么他及其祖先节点都不用处理 _remove_ancestor_keys(parent_key, ancestor_keys) continue else: # 不存在,要更新本节点 to_update_keys.append(key) # 这里判断 `parent_key` 不能是空,防止数据错误导致的死循环 # 判断是否在集合里,来区分是否已被处理过 while parent_key and parent_key in ancestor_keys: exists = is_asset_exists_in_node(asset_pk, parent_key) if exists: _remove_ancestor_keys(parent_key, ancestor_keys) break else: to_update_keys.append(parent_key) ancestor_keys.remove(parent_key) parent_key = compute_parent_key(parent_key) Node.objects.filter(key__in=to_update_keys).update( assets_amount=operator(F('assets_amount'), 1) ) @receiver(m2m_changed, sender=Asset.nodes.through) def update_nodes_assets_amount(action, instance, reverse, pk_set, **kwargs): # 不允许 `pre_clear` ,因为该信号没有 `pk_set` # [官网](https://docs.djangoproject.com/en/3.1/ref/signals/#m2m-changed) refused = (PRE_CLEAR,) if action in refused: raise ValueError mapper = { PRE_ADD: add, POST_REMOVE: sub } if action not in mapper: return operator = mapper[action] if reverse: node: Node = instance asset_pk_set = set(pk_set) _update_node_assets_amount(node, asset_pk_set, operator) else: asset_pk = instance.id # 与资产直接关联的节点 node_keys = set(Node.objects.filter(id__in=pk_set).values_list('key', flat=True)) _update_nodes_asset_amount(node_keys, asset_pk, operator) ASSET_DELETE_SIGNAL_FOR_NODE_TREE_PARAMS = 'asset_delete_signal_for_node_tree_params' @receiver(pre_delete, sender=Asset) def on_asset_delete(instance: Asset, **kwargs): node_keys = Node.objects.filter( assets=instance ).distinct().values_list('key', flat=True) params = { 'node_keys': set(node_keys), 'asset_pk': instance.id, 'operator': sub } setattr(thread_local, ASSET_DELETE_SIGNAL_FOR_NODE_TREE_PARAMS, params) @receiver(post_delete, sender=Asset) def on_asset_post_delete(instance: Asset, **kwargs): params = getattr(thread_local, ASSET_DELETE_SIGNAL_FOR_NODE_TREE_PARAMS, None) if params and params.get('asset_pk') == instance.id: _update_nodes_asset_amount(**params)