mirror of https://github.com/jumpserver/jumpserver
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
142 lines
5.2 KiB
142 lines
5.2 KiB
12 months ago
|
from django.shortcuts import get_object_or_404
|
||
|
from rest_framework.decorators import action
|
||
|
from rest_framework.response import Response
|
||
|
|
||
|
from common.api.generic import JMSModelViewSet
|
||
|
from common.utils import is_true
|
||
|
from orgs.mixins.api import OrgBulkModelViewSet
|
||
|
from orgs.mixins.models import OrgModelMixin
|
||
|
from orgs.utils import current_org
|
||
|
from rbac.models import ContentType
|
||
|
from rbac.serializers import ContentTypeSerializer
|
||
|
from . import serializers
|
||
|
from .models import Label, LabeledResource
|
||
|
|
||
|
__all__ = ['LabelViewSet']
|
||
|
|
||
|
|
||
|
class ContentTypeViewSet(JMSModelViewSet):
|
||
|
serializer_class = ContentTypeSerializer
|
||
|
http_method_names = ['get', 'head', 'options']
|
||
|
rbac_perms = {
|
||
|
'default': 'labels.view_contenttype',
|
||
|
'resources': 'labels.view_contenttype',
|
||
|
}
|
||
|
page_default_limit = None
|
||
|
can_labeled_content_type = []
|
||
|
model = ContentType
|
||
|
|
||
|
@classmethod
|
||
|
def get_can_labeled_content_type_ids(cls):
|
||
|
if cls.can_labeled_content_type:
|
||
|
return cls.can_labeled_content_type
|
||
|
content_types = ContentType.objects.all()
|
||
|
for ct in content_types:
|
||
|
model_cls = ct.model_class()
|
||
|
if not model_cls:
|
||
|
continue
|
||
|
if model_cls._meta.parents:
|
||
|
continue
|
||
|
if 'labels' in model_cls._meta._forward_fields_map.keys():
|
||
|
# if issubclass(model_cls, LabeledMixin):
|
||
|
cls.can_labeled_content_type.append(ct.id)
|
||
|
return cls.can_labeled_content_type
|
||
|
|
||
|
def get_queryset(self):
|
||
|
ids = self.get_can_labeled_content_type_ids()
|
||
|
queryset = ContentType.objects.filter(id__in=ids)
|
||
|
return queryset
|
||
|
|
||
|
@action(methods=['GET'], detail=True, serializer_class=serializers.ContentTypeResourceSerializer)
|
||
|
def resources(self, request, *args, **kwargs):
|
||
|
self.page_default_limit = 100
|
||
|
content_type = self.get_object()
|
||
|
model = content_type.model_class()
|
||
|
|
||
|
if issubclass(model, OrgModelMixin):
|
||
|
queryset = model.objects.filter(org_id=current_org.id)
|
||
|
else:
|
||
|
queryset = model.objects.all()
|
||
|
|
||
|
keyword = request.query_params.get('search')
|
||
|
if keyword:
|
||
|
queryset = content_type.filter_queryset(queryset, keyword)
|
||
|
return self.get_paginated_response_from_queryset(queryset)
|
||
|
|
||
|
|
||
|
class LabelContentTypeResourceViewSet(JMSModelViewSet):
|
||
|
serializer_class = serializers.ContentTypeResourceSerializer
|
||
|
rbac_perms = {
|
||
|
'default': 'labels.view_labeledresource',
|
||
|
'update': 'labels.change_labeledresource',
|
||
|
}
|
||
|
ordering_fields = ('res_type', 'date_created')
|
||
|
|
||
|
def get_queryset(self):
|
||
|
label_pk = self.kwargs.get('label')
|
||
|
res_type = self.kwargs.get('res_type')
|
||
|
label = get_object_or_404(Label, pk=label_pk)
|
||
|
content_type = get_object_or_404(ContentType, id=res_type)
|
||
|
bound = self.request.query_params.get('bound', '1')
|
||
|
res_ids = LabeledResource.objects.filter(res_type=content_type, label=label) \
|
||
|
.values_list('res_id', flat=True)
|
||
|
res_ids = set(res_ids)
|
||
|
model = content_type.model_class()
|
||
|
if is_true(bound):
|
||
|
queryset = model.objects.filter(id__in=list(res_ids))
|
||
|
else:
|
||
|
queryset = model.objects.exclude(id__in=list(res_ids))
|
||
|
keyword = self.request.query_params.get('search')
|
||
|
if keyword:
|
||
|
queryset = content_type.filter_queryset(queryset, keyword)
|
||
|
return queryset
|
||
|
|
||
|
def put(self, request, *args, **kwargs):
|
||
|
label_pk = self.kwargs.get('label')
|
||
|
res_type = self.kwargs.get('res_type')
|
||
|
content_type = get_object_or_404(ContentType, id=res_type)
|
||
|
label = get_object_or_404(Label, pk=label_pk)
|
||
|
res_ids = request.data.get('res_ids', [])
|
||
|
|
||
|
LabeledResource.objects \
|
||
|
.filter(res_type=content_type, label=label) \
|
||
|
.exclude(res_id__in=res_ids).delete()
|
||
|
resources = []
|
||
|
for res_id in res_ids:
|
||
|
resources.append(LabeledResource(res_type=content_type, res_id=res_id, label=label, org_id=current_org.id))
|
||
|
LabeledResource.objects.bulk_create(resources, ignore_conflicts=True)
|
||
|
return Response({"total": len(res_ids)})
|
||
|
|
||
|
|
||
|
class LabelViewSet(OrgBulkModelViewSet):
|
||
|
model = Label
|
||
|
filterset_fields = ("name", "value")
|
||
|
search_fields = filterset_fields
|
||
|
serializer_classes = {
|
||
|
'default': serializers.LabelSerializer,
|
||
|
'resource_types': ContentTypeSerializer,
|
||
|
}
|
||
|
rbac_perms = {
|
||
|
'resource_types': 'labels.view_label',
|
||
|
'keys': 'labels.view_label',
|
||
|
}
|
||
|
|
||
|
@action(methods=['GET'], detail=False)
|
||
|
def keys(self, request, *args, **kwargs):
|
||
|
queryset = Label.objects.all()
|
||
|
keyword = request.query_params.get('search')
|
||
|
if keyword:
|
||
|
queryset = queryset.filter(name__icontains=keyword)
|
||
|
keys = queryset.values_list('name', flat=True).distinct()
|
||
|
return Response(keys)
|
||
|
|
||
|
|
||
|
class LabeledResourceViewSet(OrgBulkModelViewSet):
|
||
|
model = LabeledResource
|
||
|
filterset_fields = ("label__name", "label__value", "res_type", "res_id", "label")
|
||
|
search_fields = filterset_fields
|
||
|
serializer_classes = {
|
||
|
'default': serializers.LabeledResourceSerializer,
|
||
|
}
|
||
|
ordering_fields = ('res_type', 'date_created')
|