mirror of https://github.com/jumpserver/jumpserver
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
128 lines
4.1 KiB
128 lines
4.1 KiB
# -*- coding: utf-8 -*-
|
|
#
|
|
from django.db.models import Q
|
|
from django.shortcuts import get_object_or_404
|
|
from rest_framework.generics import ListAPIView
|
|
|
|
from assets.models import Asset
|
|
from common.utils import get_logger
|
|
from orgs.mixins import generics
|
|
from perms.filters import AssetPermissionFilter
|
|
from perms.models import AssetPermission
|
|
from perms.serializers import AssetPermissionSerializer
|
|
from users.filters import UserFilter
|
|
from users.models import User, UserGroup
|
|
from users.serializers import UserSerializer, UserGroupSerializer
|
|
|
|
logger = get_logger(__file__)
|
|
__all__ = [
|
|
'AssetPermUserListApi', 'AssetPermUserPermissionsListApi',
|
|
'AssetPermUserGroupListApi', 'AssetPermUserGroupPermissionsListApi',
|
|
]
|
|
|
|
|
|
class BaseAssetPermUserOrUserGroupListApi(ListAPIView):
|
|
rbac_perms = {
|
|
'GET': 'perms.view_assetpermission'
|
|
}
|
|
|
|
def get_object(self):
|
|
asset_id = self.kwargs.get('pk')
|
|
asset = get_object_or_404(Asset, pk=asset_id)
|
|
return asset
|
|
|
|
def get_asset_related_perms(self):
|
|
asset = self.get_object()
|
|
nodes = asset.get_all_nodes(flat=True)
|
|
perms = AssetPermission.objects.filter(Q(assets=asset) | Q(nodes__in=nodes))
|
|
return perms
|
|
|
|
|
|
class AssetPermUserListApi(BaseAssetPermUserOrUserGroupListApi):
|
|
filterset_class = UserFilter
|
|
search_fields = ('username', 'email', 'name', 'id', 'source', 'role')
|
|
serializer_class = UserSerializer
|
|
rbac_perms = {
|
|
'GET': 'perms.view_assetpermission'
|
|
}
|
|
|
|
def get_queryset(self):
|
|
perms = self.get_asset_related_perms()
|
|
users = User.get_queryset().filter(
|
|
Q(assetpermissions__in=perms) | Q(groups__assetpermissions__in=perms)
|
|
).distinct()
|
|
return users
|
|
|
|
|
|
class AssetPermUserGroupListApi(BaseAssetPermUserOrUserGroupListApi):
|
|
serializer_class = UserGroupSerializer
|
|
queryset = UserGroup.objects.none()
|
|
|
|
def get_queryset(self):
|
|
perms = self.get_asset_related_perms()
|
|
user_groups = UserGroup.objects.filter(assetpermissions__in=perms).distinct()
|
|
return user_groups
|
|
|
|
|
|
class BaseAssetRelatedPermissionListApi(generics.ListAPIView):
|
|
model = AssetPermission
|
|
serializer_class = AssetPermissionSerializer
|
|
filterset_class = AssetPermissionFilter
|
|
search_fields = ('name',)
|
|
rbac_perms = {
|
|
'list': 'perms.view_assetpermission'
|
|
}
|
|
|
|
def get_object(self):
|
|
asset_id = self.kwargs.get('pk')
|
|
asset = get_object_or_404(Asset, pk=asset_id)
|
|
return asset
|
|
|
|
def filter_asset_related(self, queryset):
|
|
asset = self.get_object()
|
|
nodes = asset.get_all_nodes(flat=True)
|
|
perms = queryset.filter(Q(assets=asset) | Q(nodes__in=nodes))
|
|
return perms
|
|
|
|
def filter_queryset(self, queryset):
|
|
queryset = super().filter_queryset(queryset)
|
|
queryset = self.filter_asset_related(queryset)
|
|
return queryset
|
|
|
|
|
|
class AssetPermUserPermissionsListApi(BaseAssetRelatedPermissionListApi):
|
|
def filter_queryset(self, queryset):
|
|
queryset = super().filter_queryset(queryset)
|
|
queryset = self.filter_user_related(queryset)
|
|
queryset = queryset.distinct()
|
|
return queryset
|
|
|
|
def filter_user_related(self, queryset):
|
|
user = self.get_perm_user()
|
|
user_groups = user.groups.all()
|
|
perms = queryset.filter(Q(users=user) | Q(user_groups__in=user_groups))
|
|
return perms
|
|
|
|
def get_perm_user(self):
|
|
user_id = self.kwargs.get('perm_user_id')
|
|
user = get_object_or_404(User, pk=user_id)
|
|
return user
|
|
|
|
|
|
class AssetPermUserGroupPermissionsListApi(BaseAssetRelatedPermissionListApi):
|
|
def filter_queryset(self, queryset):
|
|
queryset = super().filter_queryset(queryset)
|
|
queryset = self.filter_user_group_related(queryset)
|
|
queryset = queryset.distinct()
|
|
return queryset
|
|
|
|
def filter_user_group_related(self, queryset):
|
|
user_group = self.get_perm_user_group()
|
|
perms = queryset.filter(user_groups=user_group)
|
|
return perms
|
|
|
|
def get_perm_user_group(self):
|
|
user_group_id = self.kwargs.get('perm_user_group_id')
|
|
user_group = get_object_or_404(UserGroup, pk=user_group_id)
|
|
return user_group
|