django-vue-admin/dvadmin-backend/apps/op_drf/generics.py

466 lines
16 KiB
Python
Raw Normal View History

2021-02-24 13:37:46 +00:00
from django.core.exceptions import ValidationError
from django.db.models.query import QuerySet
from django.http import Http404
from django.shortcuts import get_object_or_404 as _get_object_or_404
from rest_framework.settings import api_settings
from . import mixins
from .pagination import Pagination, JsonPagination
from .response import SuccessResponse
from utils.jsonpath_util import get_jsonpath, filter_json, search_json
from utils.sort_util import sortList
from .views import CustomAPIView
def get_object_or_404(queryset, *filter_args, **filter_kwargs):
"""
Same as Django's standard shortcut, but make sure to also raise 404
if the filter_kwargs don't match the required types.
"""
try:
return _get_object_or_404(queryset, *filter_args, **filter_kwargs)
except (TypeError, ValueError, ValidationError):
raise Http404
class GenericAPIView(CustomAPIView):
"""
Base class for all other generic views.
"""
# You'll need to either set these attributes,
# or override `get_queryset()`/`get_serializer_class()`.
# If you are overriding a view method, it is important that you call
# `get_queryset()` instead of accessing the `queryset` property directly,
# as `queryset` will get evaluated only once, and those results are cached
# for all subsequent requests.
queryset = None
serializer_class = None
# If you want to use object lookups other than pk, set 'lookup_field'.
# For more complex lookup requirements override `get_object()`.
lookup_field = 'pk'
lookup_url_kwarg = None
# The filter backend classes to use for queryset filtering
filter_backends = api_settings.DEFAULT_FILTER_BACKENDS
# The style to use for queryset pagination.
pagination_class = Pagination
def get_queryset(self):
"""
Get the list of items for this view.
This must be an iterable, and may be a queryset.
Defaults to using `self.queryset`.
This method should always be used rather than accessing `self.queryset`
directly, as `self.queryset` gets evaluated only once, and those results
are cached for all subsequent requests.
You may want to override this if you need to provide different
querysets depending on the incoming request.
(Eg. return a list of items that is specific to the user)
"""
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
)
queryset = self.queryset
if isinstance(queryset, QuerySet):
# Ensure queryset is re-evaluated on each request.
queryset = queryset.all()
return queryset
def get_object(self):
"""
Returns the object the view is displaying.
You may want to override this if you need to provide non-standard
queryset lookups. Eg if objects are referenced using multiple
keyword arguments in the url conf.
"""
queryset = self.filter_queryset(self.get_queryset())
# Perform the lookup filtering.
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg)
)
filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]}
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def get_serializer(self, *args, **kwargs):
"""
Return the serializer instance that should be used for validating and
deserializing input, and for serializing output.
"""
serializer_class = self.get_serializer_class()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
def get_serializer_class(self):
"""
Return the class to use for the serializer.
Defaults to using `self.serializer_class`.
You may want to override this if you need to provide different
serializations depending on the incoming request.
(Eg. admins get full serialization, others get basic serialization)
"""
assert self.serializer_class is not None, (
"'%s' should either include a `serializer_class` attribute, "
"or override the `get_serializer_class()` method."
% self.__class__.__name__
)
return self.serializer_class
def get_serializer_context(self):
"""
Extra context provided to the serializer class.
"""
return {
'request': self.request,
'format': self.format_kwarg,
'view': self
}
def filter_queryset(self, queryset):
"""
Given a queryset, filter it with whichever filter backend is in use.
You are unlikely to want to override this method, although you may need
to call it either from a list view, or from a custom `get_object`
method if you want to apply the configured filtering backend to the
default queryset.
"""
for backend in list(self.filter_backends):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset
@property
def paginator(self):
"""
The paginator instance associated with the view, or `None`.
"""
if not hasattr(self, '_paginator'):
if self.pagination_class is None:
self._paginator = None
else:
self._paginator = self.pagination_class()
return self._paginator
def paginate_queryset(self, queryset):
"""
Return a single page of results, or `None` if pagination is disabled.
"""
if self.paginator is None:
return None
return self.paginator.paginate_queryset(queryset, self.request, view=self)
def get_paginated_response(self, data):
"""
Return a paginated style `Response` object for the given output data.
"""
assert self.paginator is not None
return self.paginator.get_paginated_response(data)
# Concrete view classes that provide method handlers
# by composing the mixin classes with the base view.
class CreateAPIView(mixins.CreateModelMixin,
GenericAPIView):
"""
Concrete view for creating a model instance.
"""
def post(self, request, *args, **kwargs):
return self.create(request, *args, **kwargs)
class ListAPIView(mixins.ListModelMixin,
GenericAPIView):
"""
Concrete view for listing a queryset.
"""
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
class RetrieveAPIView(mixins.RetrieveModelMixin,
GenericAPIView):
"""
Concrete view for retrieving a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
class DestroyAPIView(mixins.DestroyModelMixin,
GenericAPIView):
"""
Concrete view for deleting a model instance.
"""
def delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
class UpdateAPIView(mixins.UpdateModelMixin,
GenericAPIView):
"""
Concrete view for updating a model instance.
"""
def put(self, request, *args, **kwargs):
return self.update(request, *args, **kwargs)
def patch(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
class ListCreateAPIView(mixins.ListModelMixin,
mixins.CreateModelMixin,
GenericAPIView):
"""
Concrete view for listing a queryset or creating a model instance.
"""
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
return self.create(request, *args, **kwargs)
class RetrieveUpdateAPIView(mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
GenericAPIView):
"""
Concrete view for retrieving, updating a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
def put(self, request, *args, **kwargs):
return self.update(request, *args, **kwargs)
def patch(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
class RetrieveDestroyAPIView(mixins.RetrieveModelMixin,
mixins.DestroyModelMixin,
GenericAPIView):
"""
Concrete view for retrieving or deleting a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
def delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
class RetrieveUpdateDestroyAPIView(mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
GenericAPIView):
"""
Concrete view for retrieving, updating or deleting a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
def put(self, request, *args, **kwargs):
return self.update(request, *args, **kwargs)
def patch(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
def delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
class JsonListView(GenericAPIView):
"""
JsonList的查询过滤搜索排序通用视图
"""
# 模糊搜索
search_param = api_settings.SEARCH_PARAM
search_fields = []
ordering_param = api_settings.ORDERING_PARAM
# 可排序属性,默认所有属性可排序
ordering_fields = '__all__'
pagination_class = JsonPagination
queryset: list = []
pagination_total = 0
# 返回的属性
response_fields = '__all__'
# response_fields = ['app_code', 'ip', 'hostname', 'host_type', 'host_env']
# 默认排序, 只支持view中设置ordering, 暂不支持排序参数由请求动态传入
ordering = None # 正序
# ordering = '-ip' # 倒序
_has_ordering = False
_exclude_params = []
# 属性
_json_fields = None
@property
def paginator(self):
if not hasattr(self, '_paginator'):
if self.pagination_class is None:
self._paginator = None
else:
self._paginator = self.pagination_class(len(self.get_queryset()))
self._paginator.request = self.request
return self._paginator
def filter_queryset(self, queryset):
"""
使用jsonpath实现, 重写对查询集的条件过滤逻辑
:param queryset:
:return:
"""
if not self._exclude_params:
self.__class__._exclude_params = []
self.__class__._exclude_params.append(self.paginator.page_size_query_param)
self.__class__._exclude_params.append(self.paginator.page_query_param)
self.__class__._exclude_params.extend(self.paginator.other_page_size_query_param)
self.__class__._exclude_params.extend(self.paginator.other_page_query_param)
self.__class__._exclude_params.append(self.ordering_param)
self.__class__._exclude_params.append(self.search_param)
expr = get_jsonpath(self.request.query_params, self._exclude_params, self.json_fields)
if not expr:
return queryset
_queryset = filter_json(self.get_queryset(), expr)
if not _queryset:
_queryset = []
return _queryset
def get_queryset(self):
"""
获取查询集/初始化查询集
:return:
"""
return self.queryset
@property
def json_fields(self):
if self._json_fields is None:
queryset = self.get_queryset()
if len(queryset):
ele: dict = queryset[0]
self.__class__._json_fields = {key: value.__class__.__name__ for (key, value) in ele.items()}
else:
self.__class__._json_fields = []
return self._json_fields
def queryset_ordering(self):
"""
查询集排序
:return:
"""
if self._has_ordering:
return
_ordering = self.ordering
if _ordering and self.ordering.startswith('-'):
_ordering = self.ordering[1:]
if _ordering and _ordering in self.json_fields:
if (isinstance(self.ordering_fields,
str) and self.ordering_fields.lower() == '__all__') or _ordering in self.ordering_fields:
sortList(self.get_queryset(), self.ordering)
self.__class__._has_ordering = True
def queryset_search(self, queryset):
search_value = self.request.query_params.get(self.search_param, None)
if self.search_fields and search_value:
queryset = search_json(queryset, search_value, self.search_fields)
return queryset
def paginate_queryset(self, queryset):
"""
重写内存分页逻辑
:param queryset:
:return:
"""
paginator: JsonPagination = self.paginator
if paginator is None:
return queryset
request = self.request
page_size = paginator.get_page_size(request)
page_num = paginator.get_page_num(request)
start_index = (page_num - 1) * page_size
end_index = start_index + page_size
page_expr = f"$[{start_index}:{end_index}]"
queryset = filter_json(queryset, page_expr)
return queryset
def get_paginated_response(self, data):
"""
重写分页的返回格式
:param data:
:return:
"""
if self.paginator is None:
return SuccessResponse(data)
return self.paginator.get_paginated_response(data, self.pagination_total)
def filter_response_fields(self, data):
"""
过滤掉不显示的字段
:param data:
:return:
"""
if isinstance(self.response_fields, str) and self.response_fields.lower() == '__all__':
return data
data = [{key: value for key, value in ele.items() if key in self.response_fields} for ele in data]
return data
def filter_json(self):
"""
过滤
(1)查询集排序, 根据ordering对查询集进行正序/倒序
(2)查询集过滤,根据请求参数传入的属性值
(3)分页, 如果存在分页器, 根据输入的分页参数分页
(4)属性字段过滤, 过滤掉不需要返回的属性值
:return:
"""
self.queryset_ordering()
queryset = self.get_queryset()
queryset = self.filter_queryset(queryset)
queryset = self.queryset_search(queryset)
self.paginator.data_count = len(queryset)
queryset = self.paginate_queryset(queryset)
data = self.filter_response_fields(queryset)
return data
def get(self, request, *args, **kwargs):
data = self.filter_json()
return self.get_paginated_response(data)