You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
jumpserver/apps/assets/signals_handler.py

239 lines
8.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
#
from collections import defaultdict
from django.db.models.signals import (
post_save, m2m_changed, post_delete
)
from django.db.models.aggregates import Count
from django.dispatch import receiver
from common.utils import get_logger
from common.decorator import on_transaction_commit
from orgs.utils import tmp_to_root_org
from .models import Asset, SystemUser, Node, AuthBook
from .utils import TreeService
from .tasks import (
update_assets_hardware_info_util,
test_asset_connectivity_util,
push_system_user_to_assets_manual,
push_system_user_to_assets,
add_nodes_assets_to_system_users
)
logger = get_logger(__file__)
def update_asset_hardware_info_on_created(asset):
logger.debug("Update asset `{}` hardware info".format(asset))
update_assets_hardware_info_util.delay([asset])
def test_asset_conn_on_created(asset):
logger.debug("Test asset `{}` connectivity".format(asset))
test_asset_connectivity_util.delay([asset])
@receiver(post_save, sender=Asset)
@on_transaction_commit
def on_asset_created_or_update(sender, instance=None, created=False, **kwargs):
"""
当资产创建时,更新硬件信息,更新可连接性
确保资产必须属于一个节点
"""
if created:
logger.info("Asset create signal recv: {}".format(instance))
# 获取资产硬件信息
update_asset_hardware_info_on_created(instance)
test_asset_conn_on_created(instance)
# 确保资产存在一个节点
has_node = instance.nodes.all().exists()
if not has_node:
instance.nodes.add(Node.org_root())
@receiver(post_delete, sender=Asset)
def on_asset_delete(sender, instance=None, **kwargs):
"""
当资产删除时,刷新节点,节点中存在节点和资产的关系
"""
logger.debug("Asset delete signal recv: {}".format(instance))
Node.refresh_assets()
@receiver(post_save, sender=SystemUser, dispatch_uid="jms")
def on_system_user_update(sender, instance=None, created=True, **kwargs):
"""
当系统用户更新时,可能更新了秘钥,用户名等,这时要自动推送系统用户到资产上,
其实应该当 用户名,密码,秘钥 sudo等更新时再推送这里偷个懒,
这里直接取了 instance.assets 因为nodes和系统用户发生变化时会自动将nodes下的资产
关联到上面
"""
if instance and not created:
logger.info("System user update signal recv: {}".format(instance))
assets = instance.assets.all().valid()
push_system_user_to_assets.delay(instance, assets)
@receiver(m2m_changed, sender=SystemUser.assets.through)
def on_system_user_assets_change(sender, instance=None, action='', model=None, pk_set=None, **kwargs):
"""
当系统用户和资产关系发生变化时,应该重新推送系统用户到新添加的资产中
"""
if action != "post_add":
return
logger.debug("System user assets change signal recv: {}".format(instance))
queryset = model.objects.filter(pk__in=pk_set)
if model == Asset:
system_users = [instance]
assets = queryset
else:
system_users = queryset
assets = [instance]
for system_user in system_users:
push_system_user_to_assets.delay(system_user, assets)
@receiver(m2m_changed, sender=SystemUser.users.through)
def on_system_user_users_change(sender, instance=None, action='', model=None, pk_set=None, **kwargs):
"""
当系统用户和用户关系发生变化时,应该重新推送系统用户资产中
"""
if action != "post_add":
return
if not instance.username_same_with_user:
return
logger.debug("System user users change signal recv: {}".format(instance))
queryset = model.objects.filter(pk__in=pk_set)
if model == SystemUser:
system_users = queryset
else:
system_users = [instance]
for s in system_users:
push_system_user_to_assets_manual.delay(s)
@receiver(m2m_changed, sender=SystemUser.nodes.through)
def on_system_user_nodes_change(sender, instance=None, action=None, model=None, pk_set=None, **kwargs):
"""
当系统用户和节点关系发生变化时,应该将节点下资产关联到新的系统用户上
"""
if action != "post_add":
return
logger.info("System user nodes update signal recv: {}".format(instance))
queryset = model.objects.filter(pk__in=pk_set)
if model == Node:
nodes_keys = queryset.values_list('key', flat=True)
system_users = [instance]
else:
nodes_keys = [instance.key]
system_users = queryset
add_nodes_assets_to_system_users.delay(nodes_keys, system_users)
@receiver(m2m_changed, sender=SystemUser.groups.through)
def on_system_user_groups_change(sender, instance=None, action=None, model=None,
pk_set=None, reverse=False, **kwargs):
"""
当系统用户和用户组关系发生变化时,应该将组下用户关联到新的系统用户上
"""
if action != "post_add" or reverse:
return
logger.info("System user groups update signal recv: {}".format(instance))
groups = model.objects.filter(pk__in=pk_set).annotate(users_count=Count("users"))
users = groups.filter(users_count__gt=0).values_list('users', flat=True)
instance.users.add(*tuple(users))
@receiver(m2m_changed, sender=Asset.nodes.through)
def on_asset_nodes_change(sender, instance=None, action='', **kwargs):
"""
资产节点发生变化时,刷新节点
"""
if action.startswith('post'):
logger.debug("Asset nodes change signal recv: {}".format(instance))
Node.refresh_assets()
with tmp_to_root_org():
Node.refresh_assets()
@receiver(m2m_changed, sender=Asset.nodes.through)
def on_asset_nodes_add(sender, instance=None, action='', model=None, pk_set=None, **kwargs):
"""
当资产的节点发生变化时,或者 当节点的资产关系发生变化时,
节点下新增的资产,添加到节点关联的系统用户中
"""
if action != "post_add":
return
logger.debug("Assets node add signal recv: {}".format(action))
if model == Node:
nodes = model.objects.filter(pk__in=pk_set).values_list('key', flat=True)
assets = [instance.id]
else:
nodes = [instance.key]
assets = model.objects.filter(pk__in=pk_set).values_list('id', flat=True)
# 节点资产发生变化时,将资产关联到节点及祖先节点关联的系统用户, 只关注新增的
nodes_ancestors_keys = set()
node_tree = TreeService.new()
for node in nodes:
ancestors_keys = node_tree.ancestors_ids(nid=node)
nodes_ancestors_keys.update(ancestors_keys)
system_users = SystemUser.objects.filter(nodes__key__in=nodes_ancestors_keys)
system_users_assets = defaultdict(set)
for system_user in system_users:
assets_has_set = system_user.assets.all().filter(id__in=assets).values_list('id', flat=True)
assets_remain = set(assets) - set(assets_has_set)
system_users_assets[system_user].update(assets_remain)
for system_user, _assets in system_users_assets.items():
system_user.assets.add(*tuple(_assets))
@receiver(m2m_changed, sender=Asset.nodes.through)
def on_asset_nodes_remove(sender, instance=None, action='', model=None,
pk_set=None, **kwargs):
"""
监控资产删除节点关系, 或节点删除资产,避免产生游离资产
"""
if action not in ["post_remove", "pre_clear", "post_clear"]:
return
if action == "pre_clear":
if model == Node:
instance._nodes = list(instance.nodes.all())
else:
instance._assets = list(instance.assets.all())
return
logger.debug("Assets node remove signal recv: {}".format(action))
if action == "post_remove":
queryset = model.objects.filter(pk__in=pk_set)
else:
if model == Node:
queryset = instance._nodes
else:
queryset = instance._assets
if model == Node:
assets = [instance]
else:
assets = queryset
if isinstance(assets, list):
assets_not_has_node = []
for asset in assets:
if asset.nodes.all().count() == 0:
assets_not_has_node.append(asset.id)
else:
assets_not_has_node = assets.annotate(nodes_count=Count('nodes'))\
.filter(nodes_count=0).values_list('id', flat=True)
Node.org_root().assets.add(*tuple(assets_not_has_node))
@receiver([post_save, post_delete], sender=Node)
def on_node_update_or_created(sender, **kwargs):
# 刷新节点
Node.refresh_nodes()
with tmp_to_root_org():
Node.refresh_nodes()