op_drf 封装

pull/1/head
李强 2021-02-24 21:37:46 +08:00
parent 5516b600a5
commit 24ff29b98a
26 changed files with 2469 additions and 143 deletions

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/../dvadmin-backend/.idea/dvadmin-backend.iml" filepath="$PROJECT_DIR$/../dvadmin-backend/.idea/dvadmin-backend.iml" />
</modules>
</component>
</project>

View File

@ -46,6 +46,8 @@ INSTALLED_APPS = [
'captcha',
# 自定义app
'apps.permission',
'apps.op_drf',
'apps.system',
]
MIDDLEWARE = [

View File

@ -13,7 +13,6 @@ Including another URLconf
1. Import the include() function: from django.conf.urls import url, include
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
"""
import json
from captcha.conf import settings as ca_settings
from captcha.helpers import captcha_image_url, captcha_audio_url
@ -25,9 +24,9 @@ from django.urls import re_path, include
from django.views.static import serve
from rest_framework.views import APIView
from apps.permission.views import GetUserView, GetRouters
from apps.permission.views import GetUserProfileView, GetRouters
from apps.op_drf.response import SuccessResponse
from utils.login import LoginView, LogoutView
from utils.response import SuccessResponse
class CaptchaRefresh(APIView):
@ -47,10 +46,12 @@ class CaptchaRefresh(APIView):
urlpatterns = [
re_path('api-token-auth/', LoginView.as_view(), name='api_token_auth'),
re_path(r'^admin/', admin.site.urls),
re_path(r'^permission/', include('apps.permission.urls')),
re_path(r'^system/', include('apps.system.urls')),
re_path(r'media/(?P<path>.*)', serve, {"document_root": settings.MEDIA_ROOT}),
re_path(r'^login/$', LoginView.as_view()),
re_path(r'^logout/$', LogoutView.as_view()),
re_path(r'^getInfo/$', GetUserView.as_view()),
re_path(r'^getInfo/$', GetUserProfileView.as_view()),
re_path(r'^getRouters/$', GetRouters.as_view()),
url(r'^api-auth/', include('rest_framework.urls', namespace='rest_framework')),
url(r"captcha/refresh/$", CaptchaRefresh.as_view(), name="captcha-refresh"), # 刷新验证码

View File

View File

@ -0,0 +1,13 @@
from django.apps import AppConfig
import logging
logger = logging.getLogger(__name__)
class OpDrfConfig(AppConfig):
name = 'op_drf'
verbose_name = "OP DRF"
def ready(self):
logging.info("OP DRF框架检测完成:success")

View File

@ -2,7 +2,7 @@ from django.contrib.auth import get_user_model
from django.db import models
from django.db.models import SET_NULL
from .string_util import uuid_8, uuid_16, uuid_32, uuid_36
from utils.string_util import uuid_8, uuid_16, uuid_32, uuid_36
class IdField(models.CharField):
@ -70,7 +70,8 @@ class DescriptionField(models.TextField):
"""
def __init__(self, *args, **kwargs):
kwargs['default'] = kwargs.get('default', '')
if kwargs.get('null', True):
kwargs['default'] = kwargs.get('default', '')
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
kwargs['verbose_name'] = kwargs.get('verbose_name', '描述')
@ -78,136 +79,6 @@ class DescriptionField(models.TextField):
super().__init__(*args, **kwargs)
class TextField(models.TextField):
"""
xxx = TextField()
"""
def __init__(self, *args, **kwargs):
kwargs['default'] = kwargs.get('default', '')
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
super().__init__(*args, **kwargs)
class CharField(models.CharField):
"""
xxx = CharField()
"""
def __init__(self, *args, **kwargs):
kwargs['default'] = kwargs.get('default', '')
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
super().__init__(*args, **kwargs)
class IntegerField(models.IntegerField):
"""
xxx = IntegerField()
"""
def __init__(self, *args, **kwargs):
kwargs['default'] = kwargs.get('default', 0)
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
super().__init__(*args, **kwargs)
class BooleanField(models.BooleanField):
"""
xxx = BooleanField()
"""
def __init__(self, *args, **kwargs):
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
super().__init__(*args, **kwargs)
class DateField(models.DateField):
"""
xxx = DateField()
"""
def __init__(self, *args, **kwargs):
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
kwargs['editable'] = kwargs.get('default', False)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
super().__init__(*args, **kwargs)
class DateTimeField(models.DateTimeField):
"""
xxx = DateTimeField()
"""
def __init__(self, *args, **kwargs):
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
kwargs['editable'] = kwargs.get('default', False)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
super().__init__(*args, **kwargs)
class ForeignKey(models.ForeignKey):
"""
xxx = ForeignKey()
"""
def __init__(self, to=None, on_delete=None, related_name=None, related_query_name=None, limit_choices_to=None,
parent_link=False, to_field=None, db_constraint=False, **kwargs):
if on_delete is None:
on_delete = SET_NULL
if to_field is None:
to_field = 'id'
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
kwargs['editable'] = kwargs.get('default', False)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
super().__init__(to, on_delete, related_name, related_query_name, limit_choices_to, parent_link, to_field,
db_constraint, **kwargs)
class OneToOneField(models.OneToOneField):
"""
xxx = OneToOneField()
"""
def __init__(self, *args, on_delete=None, to_field=None, db_constraint=False, **kwargs):
if on_delete is None:
on_delete = SET_NULL
if to_field is None:
to_field = 'id'
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
kwargs['editable'] = kwargs.get('default', None)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
super().__init__(*args, on_delete=on_delete, to_field=to_field, db_constraint=db_constraint, **kwargs)
class ManyToManyField(models.ManyToManyField):
"""
xxx = ManyToManyField()
"""
def __init__(self, *args, db_constraint=False, **kwargs):
kwargs['verbose_name'] = kwargs.get('verbose_name', '')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '')
kwargs['editable'] = kwargs.get('default', False)
kwargs['blank'] = kwargs.get('blank', True)
super().__init__(*args, db_constraint=db_constraint, **kwargs)
class UserForeignKeyField(models.ForeignKey):
"""
user = UserForeignKeyField()
@ -224,8 +95,6 @@ class UserForeignKeyField(models.ForeignKey):
kwargs['verbose_name'] = kwargs.get('verbose_name', '关联的用户')
kwargs['help_text'] = kwargs.get('help_text', '') or kwargs.get('verbose_name', '关联的用户')
kwargs['editable'] = kwargs.get('default', False)
kwargs['blank'] = kwargs.get('blank', True)
kwargs['null'] = kwargs.get('null', True)
super().__init__(to, on_delete, related_name, related_query_name, limit_choices_to, parent_link, to_field,
db_constraint, **kwargs)
@ -258,7 +127,7 @@ class CreateDateTimeField(models.DateTimeField):
super().__init__(verbose_name, name, auto_now, auto_now_add, **kwargs)
class CreatorCharField(CharField):
class CreatorCharField(models.CharField):
"""
creator = CreatorCharField()
"""
@ -272,7 +141,7 @@ class CreatorCharField(CharField):
super().__init__(*args, **kwargs)
class ModifierCharField(CharField):
class ModifierCharField(models.CharField):
"""
modifier = ModifierCharField()
"""

View File

@ -0,0 +1,96 @@
"""
常用的过滤器以及DRF的过滤器
"""
import json
import logging
import operator
from functools import reduce
from django.utils import six
from mongoengine.queryset import visitor
from rest_framework.filters import BaseFilterBackend, SearchFilter, OrderingFilter
logger = logging.getLogger(__name__)
def get_as_kwargs(request):
params = request.GET.dict()
if 'as' not in params:
return {}
as_params = json.loads(params.get('as', '{}'))
return as_params
class MongoSearchFilter(SearchFilter):
"""
适配Mongo模型视图的Search过滤器
"""
def filter_queryset(self, request, queryset, view):
search_fields = getattr(view, 'search_fields', None)
search_terms = self.get_search_terms(request)
if not search_fields or not search_terms:
return queryset
orm_lookups = [
self.construct_search(six.text_type(search_field))
for search_field in search_fields
]
if not orm_lookups:
return queryset
conditions = []
for search_term in search_terms:
queries = [
visitor.Q(**{orm_lookup: search_term})
for orm_lookup in orm_lookups
]
conditions.append(reduce(operator.or_, queries))
queryset = queryset.filter(reduce(operator.and_, conditions))
return queryset
class MongoOrderingFilter(OrderingFilter):
"""
适配Mongo模型视图的Search过滤器
"""
def get_valid_fields(self, queryset, view, context={}):
valid_fields = getattr(view, 'ordering_fields', self.ordering_fields)
if valid_fields is None:
return self.get_default_valid_fields(queryset, view, context)
elif valid_fields == '__all__':
# View explicitly allows filtering on any model field
model = view.get_serializer().__class__.Meta.model
valid_fields = [
(field_name, getattr(field, 'verbose_name', field_name)) for field_name, field in model._fields.items()
]
else:
valid_fields = [
(item, item) if isinstance(item, six.string_types) else item
for item in valid_fields
]
return valid_fields
class AdvancedSearchFilter(BaseFilterBackend):
"""
高级搜索过滤器
"""
def filter_queryset(self, request, queryset, view):
as_kwargs = get_as_kwargs(request)
if as_kwargs:
queryset = queryset.filter(**as_kwargs)
return queryset
class MongoAdvancedSearchFilter(BaseFilterBackend):
"""
mongo高级搜索过滤器
"""
def filter_queryset(self, request, queryset, view):
as_kwargs = get_as_kwargs(request)
if as_kwargs:
queryset = queryset.filter(**as_kwargs)
return queryset

View File

@ -0,0 +1,465 @@
from django.core.exceptions import ValidationError
from django.db.models.query import QuerySet
from django.http import Http404
from django.shortcuts import get_object_or_404 as _get_object_or_404
from rest_framework.settings import api_settings
from . import mixins
from .pagination import Pagination, JsonPagination
from .response import SuccessResponse
from utils.jsonpath_util import get_jsonpath, filter_json, search_json
from utils.sort_util import sortList
from .views import CustomAPIView
def get_object_or_404(queryset, *filter_args, **filter_kwargs):
"""
Same as Django's standard shortcut, but make sure to also raise 404
if the filter_kwargs don't match the required types.
"""
try:
return _get_object_or_404(queryset, *filter_args, **filter_kwargs)
except (TypeError, ValueError, ValidationError):
raise Http404
class GenericAPIView(CustomAPIView):
"""
Base class for all other generic views.
"""
# You'll need to either set these attributes,
# or override `get_queryset()`/`get_serializer_class()`.
# If you are overriding a view method, it is important that you call
# `get_queryset()` instead of accessing the `queryset` property directly,
# as `queryset` will get evaluated only once, and those results are cached
# for all subsequent requests.
queryset = None
serializer_class = None
# If you want to use object lookups other than pk, set 'lookup_field'.
# For more complex lookup requirements override `get_object()`.
lookup_field = 'pk'
lookup_url_kwarg = None
# The filter backend classes to use for queryset filtering
filter_backends = api_settings.DEFAULT_FILTER_BACKENDS
# The style to use for queryset pagination.
pagination_class = Pagination
def get_queryset(self):
"""
Get the list of items for this view.
This must be an iterable, and may be a queryset.
Defaults to using `self.queryset`.
This method should always be used rather than accessing `self.queryset`
directly, as `self.queryset` gets evaluated only once, and those results
are cached for all subsequent requests.
You may want to override this if you need to provide different
querysets depending on the incoming request.
(Eg. return a list of items that is specific to the user)
"""
assert self.queryset is not None, (
"'%s' should either include a `queryset` attribute, "
"or override the `get_queryset()` method."
% self.__class__.__name__
)
queryset = self.queryset
if isinstance(queryset, QuerySet):
# Ensure queryset is re-evaluated on each request.
queryset = queryset.all()
return queryset
def get_object(self):
"""
Returns the object the view is displaying.
You may want to override this if you need to provide non-standard
queryset lookups. Eg if objects are referenced using multiple
keyword arguments in the url conf.
"""
queryset = self.filter_queryset(self.get_queryset())
# Perform the lookup filtering.
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg)
)
filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]}
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def get_serializer(self, *args, **kwargs):
"""
Return the serializer instance that should be used for validating and
deserializing input, and for serializing output.
"""
serializer_class = self.get_serializer_class()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
def get_serializer_class(self):
"""
Return the class to use for the serializer.
Defaults to using `self.serializer_class`.
You may want to override this if you need to provide different
serializations depending on the incoming request.
(Eg. admins get full serialization, others get basic serialization)
"""
assert self.serializer_class is not None, (
"'%s' should either include a `serializer_class` attribute, "
"or override the `get_serializer_class()` method."
% self.__class__.__name__
)
return self.serializer_class
def get_serializer_context(self):
"""
Extra context provided to the serializer class.
"""
return {
'request': self.request,
'format': self.format_kwarg,
'view': self
}
def filter_queryset(self, queryset):
"""
Given a queryset, filter it with whichever filter backend is in use.
You are unlikely to want to override this method, although you may need
to call it either from a list view, or from a custom `get_object`
method if you want to apply the configured filtering backend to the
default queryset.
"""
for backend in list(self.filter_backends):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset
@property
def paginator(self):
"""
The paginator instance associated with the view, or `None`.
"""
if not hasattr(self, '_paginator'):
if self.pagination_class is None:
self._paginator = None
else:
self._paginator = self.pagination_class()
return self._paginator
def paginate_queryset(self, queryset):
"""
Return a single page of results, or `None` if pagination is disabled.
"""
if self.paginator is None:
return None
return self.paginator.paginate_queryset(queryset, self.request, view=self)
def get_paginated_response(self, data):
"""
Return a paginated style `Response` object for the given output data.
"""
assert self.paginator is not None
return self.paginator.get_paginated_response(data)
# Concrete view classes that provide method handlers
# by composing the mixin classes with the base view.
class CreateAPIView(mixins.CreateModelMixin,
GenericAPIView):
"""
Concrete view for creating a model instance.
"""
def post(self, request, *args, **kwargs):
return self.create(request, *args, **kwargs)
class ListAPIView(mixins.ListModelMixin,
GenericAPIView):
"""
Concrete view for listing a queryset.
"""
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
class RetrieveAPIView(mixins.RetrieveModelMixin,
GenericAPIView):
"""
Concrete view for retrieving a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
class DestroyAPIView(mixins.DestroyModelMixin,
GenericAPIView):
"""
Concrete view for deleting a model instance.
"""
def delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
class UpdateAPIView(mixins.UpdateModelMixin,
GenericAPIView):
"""
Concrete view for updating a model instance.
"""
def put(self, request, *args, **kwargs):
return self.update(request, *args, **kwargs)
def patch(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
class ListCreateAPIView(mixins.ListModelMixin,
mixins.CreateModelMixin,
GenericAPIView):
"""
Concrete view for listing a queryset or creating a model instance.
"""
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
return self.create(request, *args, **kwargs)
class RetrieveUpdateAPIView(mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
GenericAPIView):
"""
Concrete view for retrieving, updating a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
def put(self, request, *args, **kwargs):
return self.update(request, *args, **kwargs)
def patch(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
class RetrieveDestroyAPIView(mixins.RetrieveModelMixin,
mixins.DestroyModelMixin,
GenericAPIView):
"""
Concrete view for retrieving or deleting a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
def delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
class RetrieveUpdateDestroyAPIView(mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
GenericAPIView):
"""
Concrete view for retrieving, updating or deleting a model instance.
"""
def get(self, request, *args, **kwargs):
return self.retrieve(request, *args, **kwargs)
def put(self, request, *args, **kwargs):
return self.update(request, *args, **kwargs)
def patch(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
def delete(self, request, *args, **kwargs):
return self.destroy(request, *args, **kwargs)
class JsonListView(GenericAPIView):
"""
JsonList的查询过滤搜索排序通用视图
"""
# 模糊搜索
search_param = api_settings.SEARCH_PARAM
search_fields = []
ordering_param = api_settings.ORDERING_PARAM
# 可排序属性,默认所有属性可排序
ordering_fields = '__all__'
pagination_class = JsonPagination
queryset: list = []
pagination_total = 0
# 返回的属性
response_fields = '__all__'
# response_fields = ['app_code', 'ip', 'hostname', 'host_type', 'host_env']
# 默认排序, 只支持view中设置ordering, 暂不支持排序参数由请求动态传入
ordering = None # 正序
# ordering = '-ip' # 倒序
_has_ordering = False
_exclude_params = []
# 属性
_json_fields = None
@property
def paginator(self):
if not hasattr(self, '_paginator'):
if self.pagination_class is None:
self._paginator = None
else:
self._paginator = self.pagination_class(len(self.get_queryset()))
self._paginator.request = self.request
return self._paginator
def filter_queryset(self, queryset):
"""
使用jsonpath实现, 重写对查询集的条件过滤逻辑
:param queryset:
:return:
"""
if not self._exclude_params:
self.__class__._exclude_params = []
self.__class__._exclude_params.append(self.paginator.page_size_query_param)
self.__class__._exclude_params.append(self.paginator.page_query_param)
self.__class__._exclude_params.extend(self.paginator.other_page_size_query_param)
self.__class__._exclude_params.extend(self.paginator.other_page_query_param)
self.__class__._exclude_params.append(self.ordering_param)
self.__class__._exclude_params.append(self.search_param)
expr = get_jsonpath(self.request.query_params, self._exclude_params, self.json_fields)
if not expr:
return queryset
_queryset = filter_json(self.get_queryset(), expr)
if not _queryset:
_queryset = []
return _queryset
def get_queryset(self):
"""
获取查询集/初始化查询集
:return:
"""
return self.queryset
@property
def json_fields(self):
if self._json_fields is None:
queryset = self.get_queryset()
if len(queryset):
ele: dict = queryset[0]
self.__class__._json_fields = {key: value.__class__.__name__ for (key, value) in ele.items()}
else:
self.__class__._json_fields = []
return self._json_fields
def queryset_ordering(self):
"""
查询集排序
:return:
"""
if self._has_ordering:
return
_ordering = self.ordering
if _ordering and self.ordering.startswith('-'):
_ordering = self.ordering[1:]
if _ordering and _ordering in self.json_fields:
if (isinstance(self.ordering_fields,
str) and self.ordering_fields.lower() == '__all__') or _ordering in self.ordering_fields:
sortList(self.get_queryset(), self.ordering)
self.__class__._has_ordering = True
def queryset_search(self, queryset):
search_value = self.request.query_params.get(self.search_param, None)
if self.search_fields and search_value:
queryset = search_json(queryset, search_value, self.search_fields)
return queryset
def paginate_queryset(self, queryset):
"""
重写内存分页逻辑
:param queryset:
:return:
"""
paginator: JsonPagination = self.paginator
if paginator is None:
return queryset
request = self.request
page_size = paginator.get_page_size(request)
page_num = paginator.get_page_num(request)
start_index = (page_num - 1) * page_size
end_index = start_index + page_size
page_expr = f"$[{start_index}:{end_index}]"
queryset = filter_json(queryset, page_expr)
return queryset
def get_paginated_response(self, data):
"""
重写分页的返回格式
:param data:
:return:
"""
if self.paginator is None:
return SuccessResponse(data)
return self.paginator.get_paginated_response(data, self.pagination_total)
def filter_response_fields(self, data):
"""
过滤掉不显示的字段
:param data:
:return:
"""
if isinstance(self.response_fields, str) and self.response_fields.lower() == '__all__':
return data
data = [{key: value for key, value in ele.items() if key in self.response_fields} for ele in data]
return data
def filter_json(self):
"""
过滤
(1)查询集排序, 根据ordering对查询集进行正序/倒序
(2)查询集过滤,根据请求参数传入的属性值
(3)分页, 如果存在分页器, 根据输入的分页参数分页
(4)属性字段过滤, 过滤掉不需要返回的属性值
:return:
"""
self.queryset_ordering()
queryset = self.get_queryset()
queryset = self.filter_queryset(queryset)
queryset = self.queryset_search(queryset)
self.paginator.data_count = len(queryset)
queryset = self.paginate_queryset(queryset)
data = self.filter_response_fields(queryset)
return data
def get(self, request, *args, **kwargs):
data = self.filter_json()
return self.get_paginated_response(data)

View File

@ -0,0 +1,10 @@
import logging
from django.core.cache import cache
class RedisHandler(logging.StreamHandler):
def emit(self, record):
msg = self.format(record)
print(msg)

View File

@ -0,0 +1,192 @@
import logging
from django.db.models import Model
from rest_framework.request import Request
from rest_framework.views import APIView
logger = logging.getLogger(__name__)
class ViewLogger(object):
"""
基于View视图的日志
"""
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__()
self.view = view
self.request = request
self.log_prefix: str = ''
def handle(self, request: Request, *args, **kwargs):
pass
class APIViewLogger(ViewLogger):
"""
(1)仅在op_drf.views.CustomAPIView的子类中生效
(2)使用: 请勿直接配置view_logger_classes = (APIViewLogger, ), 这样无效,
如有需求, 需要继承APIViewLogger重写其相应的方法
(3)重写handle()方法,所有请求均触发此方法
重写handle_get()方法,仅GET请求均触发此方法
重写handle_post()方法,仅POST请求均触发此方法
重写handle_put()方法,仅PUT请求均触发此方法
重写handle_delete()方法,仅DELETE请求均触发此方法
重写handle_xxx()方法,仅xxx请求均触发此方法
"""
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__()
self.view: APIView = view
self.request: Request = request
self.user = request.user
class ModelViewLogger(APIViewLogger):
"""
基础模型操作日志
(1)仅在op_drf.viewsets.GenericViewSet的子类中生效
(1)在CustomModelViewSet子类中配置: view_logger_classes = [ModelViewLogger, ]
(2)不要在op_drf中继续写具体的日志逻辑代码,
如有需求, 应该继承ModelViewLogger并且重写相应的方法, 例如CustomerModelViewLogger(涉及到其他模块时不要将代码放入op_drf)
"""
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__(view, request)
if hasattr(self.view.get_queryset(), 'model'):
self.model: Model = self.view.get_queryset().model
elif hasattr(self.view.get_serializer(), 'Meta') and hasattr(self.view.get_serializer().Meta, 'model'):
self.model: Model = self.view.get_serializer().Meta.model
class RelationshipViewLogger(APIViewLogger):
"""
关联关系模型操作日志
(1)在ModelRelationshipView子类中配置: view_logger_classes = [RelationshipViewLogger, ]
(2)不要在op_drf中继续写具体的日志逻辑代码,
如有需求, 应该继承RelationshipViewLogger并且重写相应的方法, 例如CustomerRelationshipViewLogger(涉及到其他模块时不要将代码放入op_drf)
"""
def __init__(self, view=None, request=None, instanceId=None, *args, **kwargs) -> None:
super().__init__(view, request)
self.instanceId: str = instanceId
def handle(self, request: Request, *args, **kwargs):
"""
每一次请求都会触发此方法
"""
pass
class CustomerRelationshipViewLogger(RelationshipViewLogger):
"""
(1)在ModelRelationshipView子类中配置: view_logger_classes = [CustomerRelationshipViewLogger, ]
"""
def __init__(self, view=None, request=None, instanceId=None, *args, **kwargs) -> None:
super().__init__(view, request, instanceId, *args, **kwargs)
self.log_prefix: str = 'RelationshipView日志系统:'
def handle_get(self, request: Request, *args, **kwargs):
"""
仅GET请求才会触发此方法
"""
pass
def handle_post(self, request: Request, *args, **kwargs):
"""
仅POST请求才会触发此方法
"""
operator = self.user.username
model_name = getattr(self.view.model, '_meta').verbose_name
to_field_name = self.view.to_field_name
to_model_name = getattr(self.view.relationship_model, '_meta').verbose_name
logger.info(
f'{self.log_prefix}用户[username={operator}]新增, {model_name}实例[{to_field_name}={self.instanceId}]与{to_model_name}的关联关系')
def handle_put(self, request: Request, *args, **kwargs):
"""
仅PUT请求才会触发此方法
"""
operator = self.user.username
model_name = getattr(self.view.model, '_meta').verbose_name
to_field_name = self.view.to_field_name
to_model_name = getattr(self.view.relationship_model, '_meta').verbose_name
logger.info(
f'{self.log_prefix}用户[username={operator}]重置, {model_name}实例[{to_field_name}={self.instanceId}]与{to_model_name}的关联关系')
def handle_delete(self, request: Request, *args, **kwargs):
"""
仅DELETE请求才会触发此方法
"""
operator = self.user.username
model_name = getattr(self.view.model, '_meta').verbose_name
to_field_name = self.view.to_field_name
to_model_name = getattr(self.view.relationship_model, '_meta').verbose_name
logger.info(
f'{self.log_prefix}用户[username={operator}]移除, {model_name}实例[{to_field_name}={self.instanceId}]与{to_model_name}的关联关系')
class CustomerModelViewLogger(ModelViewLogger):
def __init__(self, view=None, request=None, *args, **kwargs) -> None:
super().__init__(view, request, *args, **kwargs)
self.log_prefix: str = 'CustomModelViewSet日志系统:'
def handle(self, request: Request, *args, **kwargs):
pass
def handle_retrieve(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅retrieve(GET)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
logger.info(f'{self.log_prefix}用户[username={operator}]检索{model_name}:[{instance}]')
def handle_list(self, request: Request, *args, **kwargs):
"""
仅list(GET)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
logger.info(f'{self.log_prefix}用户[username={operator}]查询{model_name}')
def handle_create(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅create(POST)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
logger.info(f'{self.log_prefix}用户[username={operator}]创建{model_name}:[{instance}]')
def handle_update(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅update(PUT)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
logger.info(f'{self.log_prefix}用户[username={operator}]更新{model_name}:[{instance}]')
def handle_partial_update(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅partial_update(PATCH)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
logger.info(f'{self.log_prefix}用户[username={operator}]部分更新{model_name}:[{instance}]')
def handle_destroy(self, request: Request, instance: Model = None, *args, **kwargs):
"""
仅destroy(DELETE)请求才会触发此方法
"""
pass
operator = self.user.username
model_name = getattr(self.model, '_meta').verbose_name
logger.info(f'{self.log_prefix}用户[username={operator}]删除{model_name}:[{instance}]')

View File

@ -0,0 +1,102 @@
"""
django中间件
"""
import json
import datetime
from django.utils.deprecation import MiddlewareMixin
from mongoengine import DynamicDocument, StringField, IntField, DictField, DateTimeField
from rest_framework_mongoengine.serializers import DocumentSerializer
import logging
from utils.decorators import exceptionHandler
from utils.request_util import get_request_ip, get_request_data, get_request_path
from .viewsets import CustomMongoModelViewSet
from django.conf import settings
logger = logging.getLogger(__name__)
class ApiLog(DynamicDocument):
"""
API访问日志的Mongo模型
"""
request_ip = StringField(verbose_name="request_ip", help_text="请求IP")
request_username = StringField(verbose_name="request_username", help_text="请求username")
request_method = StringField(verbose_name="request_method", help_text="请求方法")
request_path = StringField(verbose_name="request_path", help_text="请求路径")
request_body = DictField(verbose_name="request_body", help_text="请求参数")
response_code = IntField(verbose_name="response_code", help_text="响应状态码")
response_reason = StringField(verbose_name="response_reason", help_text="响应简述")
access_time = DateTimeField(verbose_name="access_time", help_text="访问时间")
class ApiLogSerializer(DocumentSerializer):
"""
API访问日志的Mongo序列化器
"""
class Meta:
model = ApiLog
fields = '__all__'
class ApiLogModelViewSet(CustomMongoModelViewSet):
"""
API访问日志的CRUD视图
"""
queryset = ApiLog.objects.all()
serializer_class = ApiLogSerializer
search_fields = ('request_ip', 'request_username', 'request_method', 'response_reason', 'source_system')
ordering = '-access_time' # 默认排序
class ApiLoggingMiddleware(MiddlewareMixin):
"""
用于记录API访问日志中间件
"""
def __init__(self, get_response=None):
super().__init__(get_response)
self.enable = op_settings.get_api_log_setting().get('enable', False)
self.methods = op_settings.get_api_log_setting().get('methods', set())
@classmethod
@exceptionHandler()
def __handle_request(cls, request):
request.request_ip = get_request_ip(request)
request.request_data = get_request_data(request)
request.access_time = datetime.datetime.now()
@classmethod
@exceptionHandler(logger=logger)
def __handle_response(cls, request, response):
# request_data,request_ip由PermissionInterfaceMiddleware中间件中添加的属性
body = getattr(request, 'request_data', {})
# 请求含有password则用*替换掉(暂时先用于所有接口的password请求参数)
if isinstance(body, dict) and body.get('password', ''):
body['password'] = '*' * len(body['password'])
info = {
'request_ip': getattr(request, 'request_ip', 'unknown'),
'request_username': request.user.username,
'request_method': request.method,
'request_path': request.path,
'request_body': body,
'response_code': response.status_code,
'response_reason': response.reason_phrase,
'source_system': getattr(settings,'SOURCE_SYSTEM_NAME',None),
'access_time': request.access_time.strftime('%Y-%m-%d %H:%M:%S'),
}
log = ApiLog(**info)
log.save()
def process_request(self, request):
self.__handle_request(request)
def process_response(self, request, response):
"""
主要请求处理完之后记录
:param request:
:param response:
:return:
"""
if self.enable:
if self.methods == 'ALL' or request.method in self.methods:
self.__handle_response(request, response)
return response

View File

@ -0,0 +1,273 @@
from rest_framework import mixins
from rest_framework import serializers
from rest_framework import status
from rest_framework.relations import ManyRelatedField, RelatedField, PrimaryKeyRelatedField
from rest_framework.request import Request
from .response import SuccessResponse
class CreateModelMixin(mixins.CreateModelMixin):
"""
继承增强DRF的CreateModelMixin, 标准化其返回值
"""
create_serializer_class = None
def create(self, request: Request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
if hasattr(self, 'handle_logging'):
self.handle_logging(request, instance=serializer.instance, *args, **kwargs)
headers = self.get_success_headers(serializer.data)
return SuccessResponse(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def perform_create(self, serializer):
super().perform_create(serializer)
class ListModelMixin(mixins.ListModelMixin):
"""
继承增强DRF的CreateModelMixin, 标准化其返回值
"""
list_serializer_class = None
def list(self, request: Request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if hasattr(self, 'handle_logging'):
self.handle_logging(request, *args, **kwargs)
if page is not None:
if getattr(self, 'values_queryset', None):
return self.get_paginated_response(page)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
if getattr(self, 'values_queryset', None):
return SuccessResponse(page)
serializer = self.get_serializer(queryset, many=True)
return SuccessResponse(serializer.data)
class RetrieveModelMixin(mixins.RetrieveModelMixin):
"""
继承增强DRF的CreateModelMixin, 标准化其返回值
"""
retrieve_serializer_class = None
def retrieve(self, request: Request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
if hasattr(self, 'handle_logging'):
self.handle_logging(request, instance=instance, *args, **kwargs)
return SuccessResponse(serializer.data)
class UpdateModelMixin(mixins.UpdateModelMixin):
"""
继承增强DRF的CreateModelMixin, 标准化其返回值
"""
update_serializer_class = None
def update(self, request: Request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
if getattr(instance, '_prefetched_objects_cache', None):
instance._prefetched_objects_cache = {}
if hasattr(self, 'handle_logging'):
self.handle_logging(request, instance=instance, *args, **kwargs)
return SuccessResponse(serializer.data)
def partial_update(self, request, *args, **kwargs):
kwargs['partial'] = True
return self.update(request, *args, **kwargs)
class DestroyModelMixin(mixins.DestroyModelMixin):
"""
继承增强DRF的CreateModelMixin, 标准化其返回值
"""
destroy_serializer_class = None
def destroy(self, request: Request, *args, **kwargs):
instance = self.get_object()
self.perform_destroy(instance)
if hasattr(self, 'handle_logging'):
self.handle_logging(request, instance=instance, *args, **kwargs)
return SuccessResponse(status=status.HTTP_204_NO_CONTENT)
def perform_destroy(self, instance):
instance.delete()
class TableSerializerMixin:
table_option = None
extra_columns = []
FIELD_TYPE_MAP = {
'AutoField': {
'type': 'input',
'addDisabled': True,
},
'CharField': {
'type': 'input',
"maxlength": 255
},
'PasswordField': {
'type': 'input',
'maxlength': 255
},
'URLField': {
'type': 'input',
},
'UUIDField': {
'type': 'input',
'minlength': 32,
'maxlength': 32,
},
'UUID8Field': {
'type': 'input',
'minlength': 8,
'maxlength': 8,
},
'UUID16Field': {
'type': 'input',
'minlength': 16,
'maxlength': 16,
},
'UUID32Field': {
'type': 'input',
'minlength': 32,
'maxlength': 32,
},
'UUID36Field': {
'type': 'input',
'minlength': 36,
'maxlength': 36
},
'DateTimeField': {
'type': 'datetime',
'format': "yyyy-MM-dd hh:mm:ss",
'valueFormat': "yyyy-MM-dd hh:mm:ss",
},
'DateField': {
'type': 'date',
'format': "yyyy-MM-dd",
'valueFormat': "yyyy-MM-dd",
},
'TimeField': {
'type': 'time',
'format': "hh:mm:ss",
'valueFormat': "hh:mm:ss",
},
'BooleanField': {
'type': 'radio',
'dicData': [
{'value': False, 'label': ''},
{'value': True, 'label': ''},
]
},
'ManyRelatedField': {
# 'type': 'select',
'type': 'array',
# "multiple": True,
'required': False,
},
}
FIELD_TYPE_DEFAULT = {
'type': 'input',
}
def getTable(self, serializer: serializers.ModelSerializer = None):
if not serializer:
serializer = self.get_serializer()
serializer_class = serializer.__class__
model = serializer_class.Meta.model
title = model.__name__
if hasattr(model, 'Meta'):
if hasattr(model.Meta, 'verbose_name'):
title = model.Meta.verbose_name or ''
column = self.getColumn(serializer)
table = {
'title': title,
'page': True,
'align': 'center',
'menuAlign': 'center',
'columnBtn': True,
'menu': True,
'menuType': 'icon',
'addBtn': True,
'delBtn': True,
'editBtn': True,
'column': column
}
return table
def getColumn(self, serializer: serializers.ModelSerializer = None):
if not serializer:
serializer = self.get_serializer()
serializer_class = serializer.__class__
fields = serializer.get_fields()
show_fields = getattr(serializer_class.Meta, 'show_fields', set())
hide_fields = getattr(serializer_class.Meta, 'hide_fields', set())
search_fields = getattr(serializer_class.Meta, 'search_fields', set())
sortable_fields = getattr(serializer_class.Meta, 'sortable_fields', set())
column = []
for prop in fields:
field = fields[prop]
field_type = field.__class__.__name__
info = {
'prop': prop,
'label': field.label or prop,
'hide': hide_fields == '__all__' or prop in hide_fields,
'search': search_fields == '__all__' or prop in search_fields,
'sortable': sortable_fields == '__all__' or prop in sortable_fields,
'width': 'auto',
'align': 'left',
'overHidden': False,
}
type_info = self.FIELD_TYPE_MAP.get(field_type, self.FIELD_TYPE_DEFAULT)
info.update(type_info)
allow_null = getattr(field, 'allow_null', False)
allow_blank = getattr(field, 'allow_blank', False)
allow_empty = getattr(field, 'allow_empty', False)
read_only = getattr(field, 'read_only', False)
write_only = getattr(field, 'write_only', False)
if not any([allow_null, allow_blank, allow_empty]):
rules = [{
'required': True,
'message': f"""请输入{info['label']}""",
'trigger': "blur"
}]
info['rules'] = rules
if read_only:
info['editDisabled'] = True,
info['clearable'] = False
if not isinstance(field, (ManyRelatedField, RelatedField, PrimaryKeyRelatedField)):
# 防止序列化该字段的关系模型所有数据
choices = getattr(field, 'choices', None)
if choices:
dicData = list(map(lambda choice: {'value': choice[0], 'label': choice[1]}, choices.items()))
info['dicData'] = dicData
info['type'] = 'select'
column.append(info)
return column

View File

@ -0,0 +1,119 @@
from collections import OrderedDict
from rest_framework.pagination import PageNumberPagination, _positive_int
from rest_framework.utils.urls import replace_query_param
from .response import SuccessResponse
class Pagination(PageNumberPagination):
"""
标准分页器
"""
page_size_query_param = 'pageSize'
other_page_size_query_param = []
# other_page_size_query_param = ['pageSize', ]
page_query_param = "pageNum"
other_page_query_param = []
# other_page_query_param = ['currentPage', 'pageNum']
max_page_size = 1000
page_size = 10
def paginate_queryset(self, queryset, request, view=None):
return super().paginate_queryset(queryset, request, view)
def get_next_link(self):
return super().get_next_link()
def get_previous_link(self):
return super().get_previous_link()
def get_page_size(self, request):
"""
获取页大小
:param request:
:return:
"""
if self.other_page_size_query_param:
for param_name in self.other_page_size_query_param:
if param_name in request.query_params:
return _positive_int(
request.query_params[param_name],
strict=True,
cutoff=self.max_page_size
)
return super().get_page_size(request)
def get_page_num(self, request):
"""
获取页码
:param request:
:return:
"""
if self.other_page_query_param:
for param_name in self.other_page_query_param:
if param_name in request.query_params:
return _positive_int(request.query_params[param_name], strict=True)
page_num = request.query_params.get(self.page_query_param, 1)
return _positive_int(page_num, strict=True)
def get_paginated_response(self, data, search_fields=''):
return SuccessResponse(OrderedDict([
('count', self.page.paginator.count),
('next', self.get_next_link()),
('previous', self.get_previous_link()),
('results', data)
]))
class JsonPagination(Pagination):
"""
Json数据内存分页器
"""
def __init__(self, count=0) -> None:
super().__init__()
self._page_size = 0
self._page_num = 0
self._count = count
self.data_count = 0
def get_page_size(self, request):
_page_size = super().get_page_size(request)
self._page_size = _page_size
return _page_size
def get_page_num(self, request):
_page_num = super().get_page_num(request)
self._page_num = _page_num
return _page_num
def get_next_link(self):
if self._page_size * self._page_num >= self.data_count:
return None
url = self.request.build_absolute_uri()
url = replace_query_param(url, self.page_query_param, self._page_num + 1)
url = replace_query_param(url, self.page_size_query_param, self._page_size)
return url
def get_previous_link(self, count=0):
if self._page_num <= 1:
return None
url = self.request.build_absolute_uri()
url = replace_query_param(url, self.page_query_param, self._page_num - 1)
url = replace_query_param(url, self.page_size_query_param, self._page_size)
return url
def paginate_queryset(self, queryset, request, view=None):
self.get_page_size(request)
self.get_page_num(request)
return super().paginate_queryset(queryset, request, view)
def get_paginated_response(self, data, search_fields=''):
return SuccessResponse(OrderedDict([
('count', self.data_count),
('next', self.get_next_link()),
('previous', self.get_previous_link()),
('results', data)
]))

View File

@ -0,0 +1,52 @@
"""
常用的Response以及Django的ResponseDRF的Response
"""
from django.http.response import DjangoJSONEncoder
from rest_framework.response import Response
class OpDRFJSONEncoder(DjangoJSONEncoder):
"""
重写DjangoJSONEncoder
(1)默认返回支持中文格式的json字符串
"""
def __init__(self, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False,
indent=None, separators=None, default=None):
super().__init__(skipkeys=skipkeys, ensure_ascii=False, check_circular=check_circular,
allow_nan=allow_nan, sort_keys=sort_keys, indent=indent, separators=separators,
default=default)
class SuccessResponse(Response):
"""
标准响应成功的返回, SuccessResponse(data)或者SuccessResponse(data=data)
(1)默认错误码返回200, 不支持指定其他返回码
"""
def __init__(self, data=None, msg='success', status=None, template_name=None, headers=None, exception=False,
content_type=None):
std_data = {
"code": 200,
"data": data,
"msg": msg,
"status": 'success'
}
super().__init__(std_data, status, template_name, headers, exception, content_type)
class ErrorResponse(Response):
"""
标准响应错误的返回,ErrorResponse(msg='xxx')
(1)默认错误码返回201, 也可以指定其他返回码:ErrorResponse(code=xxx)
"""
def __init__(self, data=None, msg='error', code=201, status=None, template_name=None, headers=None,
exception=False, content_type=None):
std_data = {
"code": code,
"data": data,
"msg": msg,
"status": 'error'
}
super().__init__(std_data, status, template_name, headers, exception, content_type)

View File

@ -0,0 +1,72 @@
"""
标准返回
"""
def funcSuccess(data=None, msg='success', **kwargs):
"""
普通函数成功返回格式
:param data:
:param msg:
:return:
"""
return {
"result": True,
"msg": msg,
"data": data,
}
def funcError(data=None, msg='error', **kwargs):
"""
普通函数失败返回格式
:param data:
:param msg:
:return:
"""
return {
"result": False,
"msg": msg,
"data": data,
}
def funcResult(result=True, data=None, msg='success', **kwargs):
"""
普通函数返回格式
:param result:
:param data:
:param msg:
:return:
"""
if result:
return funcSuccess(data=data, msg=msg)
return funcError(data=data, msg=msg)
def paginate(data=None, count=0, next=None, previous=None, msg='success', code=2000):
"""
标准分页返回, 分页错误时:应该直接使用pagination()即可,无需传入任何参数
:param data: 默认为[]
:param count: 默认为len(data); 总计值, 不是data的元素个数,相当于count(*);仅当不传入count时, count==len(data)
:param next: 默认为None, 下一页
:param previous: 默认为None, 上一页
:param msg: 默认为success
:param code: 默认为2000, 建议不传入参数,使用默认即可
:return:
"""
if not data:
data = []
if not count:
count = len(data)
return {
"code": code,
"data": {
"count": count,
"next": next,
"previous": previous,
"results": data
},
"msg": msg,
"status": "success"
}

View File

@ -0,0 +1,93 @@
from rest_framework.serializers import ModelSerializer
from rest_framework.fields import empty
from rest_framework.request import Request
class CustomModelSerializer(ModelSerializer):
"""
增强DRF的ModelSerializer,可自动更新模型的审计字段记录
(1)仅当op_drf.generics.GenericAPIView的子类里使用时有效
(2)非op_drf.generics.GenericAPIView里使用时, 与ModelSerializer作用一样,没人任何增强
(3)self.request能获取到rest_framework.request.Request对象
"""
# 修改人的审计字段名称, 默认modifier, 继承使用时可自定义覆盖
modifier_field_name = 'modifier'
# 创建人的审计字段名称, 默认creator, 继承使用时可自定义覆盖
creator_field_name = 'creator'
def __init__(self, instance=None, data=empty, request=None, **kwargs):
super().__init__(instance, data, **kwargs)
self.request: Request = request
def save(self, **kwargs):
return super().save(**kwargs)
def create(self, validated_data):
if self.request:
username = self.get_request_username()
if self.modifier_field_name in self.fields.fields:
validated_data[self.modifier_field_name] = username
if self.creator_field_name in self.fields.fields:
validated_data[self.creator_field_name] = username
return super().create(validated_data)
def update(self, instance, validated_data):
if self.request:
if hasattr(self.instance, self.modifier_field_name):
self.instance.modifier = self.get_request_username()
return super().update(instance, validated_data)
def get_request_username(self):
if getattr(self.request, 'user', None):
return getattr(self.request.user, 'username', None)
return None
@property
def fields(self):
fields = super().fields
if not hasattr(self, '_context'):
return fields
is_root = self.root == self
parent_is_list_root = self.parent == self.root and getattr(self.parent, 'many', False)
if not (is_root or parent_is_list_root):
return fields
try:
request = self.request or self.context['request']
except KeyError:
return fields
params = getattr(
request, 'query_params', getattr(request, 'GET', None)
)
if params is None:
pass
try:
filter_fields = params.get('_fields', None).split(',')
except AttributeError:
filter_fields = None
try:
omit_fields = params.get('_omit', None).split(',')
except AttributeError:
omit_fields = []
existing = set(fields.keys())
if filter_fields is None:
allowed = existing
else:
allowed = set(filter(None, filter_fields))
omitted = set(filter(None, omit_fields))
for field in existing:
if field not in allowed:
fields.pop(field, None)
if field in omitted:
fields.pop(field, None)
return fields

View File

View File

@ -0,0 +1,287 @@
import logging
import traceback
from types import FunctionType, MethodType
from rest_framework.exceptions import APIException as DRFAPIException
from rest_framework.request import Request
from rest_framework.views import APIView
from utils import exceptions
from utils.model_util import ModelRelateUtils
from .logging.view_logger import CustomerRelationshipViewLogger
from .response import SuccessResponse, ErrorResponse
from .serializers import CustomModelSerializer
logger = logging.getLogger(__name__)
def op_exception_handler(ex, context):
"""
统一异常拦截处理
目的:(1)取消所有的500异常响应,统一响应为标准错误返回
(2)准确显示错误信息
:param ex:
:param context:
:return:
"""
msg = ''
if isinstance(ex, DRFAPIException):
# set_rollback()
msg = ex.detail
elif isinstance(ex, exceptions.APIException):
msg = ex.message
elif isinstance(ex, Exception):
logger.error(traceback.format_exc())
msg = str(ex)
return ErrorResponse(msg=msg)
class CustomAPIView(APIView):
"""
继承增强DRF的APIView
"""
extra_permission_classes = ()
# 仅当GET方法时会触发该权限的校验
GET_permission_classes = ()
# 仅当POST方法时会触发该权限的校验
POST_permission_classes = ()
# 仅当DELETE方法时会触发该权限的校验
DELETE_permission_classes = ()
# 仅当PUT方法时会触发该权限的校验
PUT_permission_classes = ()
view_logger_classes = ()
def initial(self, request: Request, *args, **kwargs):
super().initial(request, *args, **kwargs)
self.check_extra_permissions(request)
self.check_method_extra_permissions(request)
def get_view_loggers(self, request: Request, *args, **kwargs):
logger_classes = self.view_logger_classes or []
if not logger_classes:
return []
view_loggers = [logger_class(view=self, request=request, *args, **kwargs) for logger_class in logger_classes]
return view_loggers
def handle_logging(self, request: Request, *args, **kwargs):
view_loggers = self.get_view_loggers(request, *args, **kwargs)
method = request.method.lower()
for view_logger in view_loggers:
view_logger.handle(request, *args, **kwargs)
logger_fun = getattr(view_logger, f'handle_{method}', None)
if logger_fun and isinstance(logger_fun, (FunctionType, MethodType)):
logger_fun(request, *args, **kwargs)
def get_extra_permissions(self):
return [permission() for permission in self.extra_permission_classes]
def check_extra_permissions(self, request: Request):
for permission in self.get_extra_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
def get_method_extra_permissions(self):
_name = self.request.method.upper()
method_extra_permission_classes = getattr(self, f"{_name}_permission_classes", None)
if not method_extra_permission_classes:
return []
return [permission() for permission in method_extra_permission_classes]
def check_method_extra_permissions(self, request):
for permission in self.get_method_extra_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
class BatchModelApIView(CustomAPIView):
"""
模型批量CRUD通用视图
"""
model = None
serializer_class = None
POST_serializer_class = None
PUT_serializer_class = None
field_name = 'instanceId'
instanceId_list_param_name = 'instanceIdList'
instance_info_param_name = 'info'
def get_serializer(self, *args, **kwargs):
if not self.request:
return None
serializer_class = getattr(self, f"{self.request.method}_serializer_class", None) or getattr(self,
'serializer_class')
serializer = serializer_class(*args, **kwargs)
if isinstance(serializer, CustomModelSerializer):
serializer.request = self.request
return serializer
def get(self, request: Request = None, *args, **kwargs):
data = self.get_serializer(self.model.objects.filter(**{f'{self.field_name}__in': request.data}),
many=True).data
return SuccessResponse(data=data)
def post(self, request: Request = None, *args, **kwargs):
data = []
for info in request.data:
serializer = self.get_serializer(data=info)
serializer.is_valid(raise_exception=True)
serializer.save()
data.append(serializer.data)
return SuccessResponse(data=data)
def put(self, request: Request = None, *args, **kwargs):
data = []
instanceId_list = request.data.get(self.instanceId_list_param_name, [])
info = request.data.get(self.instance_info_param_name, {})
for instanceId in instanceId_list:
serializer = self.get_serializer(
instance=self.model.objects.get(**{f'{self.field_name}': instanceId}),
data=info,
partial=True
)
serializer.is_valid(raise_exception=True)
serializer.save()
return SuccessResponse(data=instanceId_list)
def delete(self, request: Request = None, *args, **kwargs):
self.model.objects.filter(**{f'{self.field_name}__in': request.data}).delete()
return SuccessResponse(data=request.data)
class ModelRelationshipAPIView(CustomAPIView):
"""
模型关联关系通用CRUD视图
"""
model = None
through_model = None
relationship_model = None
relationship_serializer = None
field_name: str = None
from_field_name: str = 'instanceId'
to_field_name: str = None
relationship_field_values = ()
view_logger_classes = [CustomerRelationshipViewLogger, ]
def get_relationship_data(self, instanceId: str):
relationship_model_field_name = self.relationship_field_values[0]
params = {}
params[self.field_name] = instanceId
business_key_dict = self.through_model.objects.filter(**params).values(
*self.relationship_field_values).distinct()
business_key_list = [ele[relationship_model_field_name] for ele in business_key_dict]
params = {}
params[f"{self.to_field_name}__in"] = business_key_list
queryset = self.relationship_model.objects.filter(**params)
data = ModelRelateUtils.model_to_dict(queryset, self.relationship_serializer, default=[])
if 'creator' in self.relationship_field_values and 'ctime' in self.relationship_field_values:
for _index in range(len(data)):
ele = data[_index]
ele['relationship_creator'] = business_key_dict[_index]['creator']
ele['relationship_ctime'] = business_key_dict[_index]['ctime']
return data
def execute_method(self, execute: str, request: Request, instanceId: str, *args, **kwargs):
method = request.method.lower()
fun = None
if execute == 'before':
fun = getattr(self, f'before_{method}', None)
elif execute == 'handle':
fun = getattr(self, f'handle_{method}', None)
elif execute == 'after':
fun = getattr(self, f'after_{method}', None)
if fun and isinstance(fun, (FunctionType, MethodType)):
fun(request, instanceId, *args, **kwargs)
def do_request(self, request: Request, instanceId: str, *args, **kwargs):
self.execute_method('before', request, instanceId, *args, **kwargs)
self.execute_method('handle', request, instanceId, *args, **kwargs)
self.execute_method('after', request, instanceId, *args, **kwargs)
self.handle_logging(request, instanceId=instanceId, *args, **kwargs)
data = self.get_relationship_data(instanceId)
return SuccessResponse(data)
def get(self, request: Request, instanceId: str, *args, **kwargs):
return self.do_request(request, instanceId, *args, **kwargs)
def post(self, request: Request, instanceId: str, *args, **kwargs):
return self.do_request(request, instanceId, *args, **kwargs)
def put(self, request: Request, instanceId: str, *args, **kwargs):
return self.do_request(request, instanceId, *args, **kwargs)
def delete(self, request: Request, instanceId: str, *args, **kwargs):
return self.do_request(request, instanceId, *args, **kwargs)
class ModelRelationshipView(ModelRelationshipAPIView):
"""
模型关联关系通用CRUD视图
"""
def handle_get(self, request: Request, instanceId: str, *args, **kwargs):
data = self.get_relationship_data(instanceId)
return SuccessResponse(data)
def handle_post(self, request: Request, instanceId: str, *args, **kwargs):
relationship_model_field_name = self.relationship_field_values[0]
params = {}
params[f"{self.to_field_name}__in"] = request.data
queryset = self.relationship_model.objects.filter(**params)
exist_list = [getattr(ele, self.to_field_name) for ele in queryset]
bulk_info = []
for _id in exist_list:
info = {}
info[relationship_model_field_name] = _id
info[self.field_name] = instanceId
info['creator'] = request.user.username
bulk_info.append(self.through_model(**info))
self.through_model.objects.bulk_create(bulk_info)
data = self.get_relationship_data(instanceId)
return SuccessResponse(data)
def handle_put(self, request: Request, instanceId: str, *args, **kwargs):
relationship_model_field_name = self.relationship_field_values[0]
params1 = {}
params1[f"{self.field_name}"] = instanceId
params2 = {}
params2[f"{relationship_model_field_name}__in"] = request.data
relationships = self.through_model.objects.filter(**params1).exclude(**params2)
relationships.delete()
params = {}
params[f"{self.field_name}"] = instanceId
instanceId_dict = self.through_model.objects.filter(**params).values(*self.relationship_field_values).distinct()
instanceId_list = [ele.get(relationship_model_field_name) for ele in instanceId_dict]
create_list = list(set(request.data).difference(set(instanceId_list)))
for _id in create_list:
info = {}
info[relationship_model_field_name] = _id
info[self.field_name] = instanceId
info['creator'] = request.user.username
data = self.get_relationship_data(instanceId)
return SuccessResponse(data)
def handle_delete(self, request: Request, instanceId: str, *args, **kwargs):
relationship_model_field_name = self.relationship_field_values[0]
params = {}
params[f"{self.field_name}"] = instanceId
params[f"{relationship_model_field_name}__in"] = request.data
self.through_model.objects.filter(**params).delete()
data = self.get_relationship_data(instanceId)
return SuccessResponse(data)

View File

@ -0,0 +1,249 @@
from types import FunctionType, MethodType
# from rest_framework_mongoengine.generics import GenericAPIView as MongoGenericAPIView
from django.core.exceptions import ValidationError
from django.http.response import Http404
from django.shortcuts import get_object_or_404 as _get_object_or_404
from django_filters.rest_framework import DjangoFilterBackend
from mongoengine.queryset.base import BaseQuerySet
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.request import Request
from rest_framework.settings import api_settings
from rest_framework.viewsets import ViewSetMixin
from utils.exceptions import APIException
from . import mixins
from .filters import MongoSearchFilter, MongoOrderingFilter, AdvancedSearchFilter, MongoAdvancedSearchFilter
from .generics import GenericAPIView
from .logging.view_logger import CustomerModelViewLogger
from .pagination import Pagination
from .serializers import CustomModelSerializer
def get_object_or_404(queryset, *filter_args, **filter_kwargs):
try:
return _get_object_or_404(queryset, *filter_args, **filter_kwargs)
except (TypeError, ValueError, ValidationError, Http404):
raise APIException(message='该对象不存在或者无访问权限')
class GenericViewSet(ViewSetMixin, GenericAPIView):
extra_filter_backends = []
pagination_class = Pagination
filter_backends = [DjangoFilterBackend, OrderingFilter, SearchFilter, AdvancedSearchFilter]
view_logger_classes = (CustomerModelViewLogger,)
def handle_logging(self, request: Request, *args, **kwargs):
view_loggers = self.get_view_loggers(request, *args, **kwargs)
for view_logger in view_loggers:
handle_action = getattr(view_logger, f'handle_{self.action}', None)
if handle_action and isinstance(handle_action, (FunctionType, MethodType)):
handle_action(request, *args, **kwargs)
def get_serializer(self, *args, **kwargs):
serializer_class = self.get_serializer_class()
kwargs['context'] = self.get_serializer_context()
serializer = serializer_class(*args, **kwargs)
if isinstance(serializer, CustomModelSerializer):
serializer.request = self.request
return serializer
def filter_queryset(self, queryset):
for backend in set(set(self.filter_backends) | set(self.extra_filter_backends or [])):
queryset = backend().filter_queryset(self.request, queryset, self)
queryset = self.action_extra_filter_queryset(queryset)
return queryset
def action_extra_filter_queryset(self, queryset):
action__extra_filter_backends = getattr(self, f"{self.action}_extra_filter_backends", None)
if not action__extra_filter_backends:
return queryset
for backend in action__extra_filter_backends:
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset
def get_serializer_class(self):
action_serializer_name = f"{self.action}_serializer_class"
action_serializer_class = getattr(self, action_serializer_name, None)
if action_serializer_class:
return action_serializer_class
return super().get_serializer_class()
def reverse_action(self, url_name, *args, **kwargs):
return super().reverse_action(url_name, *args, **kwargs)
def get_action_extra_permissions(self):
"""
获取已配置的action权限校验,并且实例化其对象
:return:
"""
action_extra_permission_classes = getattr(self, f"{self.action}_extra_permission_classes", None)
if not action_extra_permission_classes:
return []
return [permission() for permission in action_extra_permission_classes]
def check_action_extra_permissions(self, request):
"""
逐个校验action权限校验
:param request:
:return:
"""
for permission in self.get_action_extra_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
def check_action_extra_object_permissions(self, request, obj):
"""
action方法的专属对象权限校验
:param request:
:param obj:
:return:
"""
for permission in self.get_action_extra_permissions():
if not permission.has_object_permission(request, self, obj):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
def initial(self, request, *args, **kwargs):
"""
重写initial方法
(1)新增action的权限校验
:param request:
:param args:
:param kwargs:
:return:
"""
super().initial(request, *args, **kwargs)
self.check_action_extra_permissions(request)
def get_object(self):
queryset = self.filter_queryset(self.get_queryset())
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg)
)
filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]}
obj = get_object_or_404(queryset, **filter_kwargs)
self.check_object_permissions(self.request, obj)
return obj
def check_object_permissions(self, request, obj):
"""
重新check_object_permissions
(1)新增action方法的专属对象权限检查入口
(2)先校验共同的object_permissions, 再校验action的object_permissions
:param request:
:param obj:
:return:
"""
super().check_object_permissions(request, obj)
self.check_action_extra_object_permissions(request, obj)
class MongoGenericAPIView(GenericAPIView):
""" Adaptation of DRF GenericAPIView """
lookup_field = 'id'
def get_queryset(self):
queryset = super(MongoGenericAPIView, self).get_queryset()
if isinstance(queryset, BaseQuerySet):
queryset = queryset.all()
return queryset
def get_object(self):
queryset = self.filter_queryset(self.get_queryset())
# Perform the lookup filtering.
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg)
)
filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]}
obj = get_object_or_404(queryset, **filter_kwargs)
self.check_object_permissions(self.request, obj)
return obj
class MongoGenericViewSet(ViewSetMixin, MongoGenericAPIView):
pagination_class = Pagination
pass
class ReadOnlyModelViewSet(mixins.RetrieveModelMixin,
mixins.ListModelMixin,
GenericViewSet):
pass
class ModelViewSet(mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
GenericViewSet):
pass
class MongoModelViewSet(mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
MongoGenericViewSet):
pass
class CustomModelViewSet(ModelViewSet, mixins.TableSerializerMixin):
"""
自定义的ModelViewSet:
(1)默认分页器就为统一分页器op_drf.pagination.Pagination
(1)默认使用统一标准返回格式
(1)默认支持高级搜索
(1)默认支持生成前端动态table的option
(1)ORM性能优化, 尽可能使用values_queryset形式
"""
values_queryset = None
ordering_fields = '__all__'
def get_queryset(self):
if getattr(self, 'values_queryset', None):
return self.values_queryset
return super().get_queryset()
class CustomMongoModelViewSet(MongoModelViewSet, mixins.TableSerializerMixin):
filter_backends = (MongoOrderingFilter, MongoSearchFilter, MongoAdvancedSearchFilter)
# filter_fields = '__all__' # 暂不支持__all__
filter_fields = ()
search_fields = ()
ordering_fields = '__all__'
view_logger_classes = (CustomerModelViewLogger,)
def get_queryset(self):
queryset = self.queryset
filtering_kwargs = {}
for param in self.request.query_params:
param = param.strip()
if param in ['page_size', 'page', 'search', 'ordering', 'as']: continue
if self.filter_fields == '__all__' or param in self.filter_fields:
# if param in self.filter_fields:
filtering_kwargs[param] = self.request.query_params[param]
queryset = queryset.filter(**filtering_kwargs)
ordering_params = self.request.query_params.get(api_settings.ORDERING_PARAM, None)
if ordering_params:
ordering_fields = [field.strip() for field in ordering_params.split(',')]
ordering_fields = filter(lambda field: self.ordering_fields == '__all__' or field in self.ordering_fields,
ordering_fields)
queryset = queryset.order_by(*ordering_fields)
return queryset
def filter_queryset(self, queryset):
return super().filter_queryset(queryset)

View File

@ -1,7 +1,7 @@
import logging
import traceback
from utils.response import ErrorResponse
from .response import ErrorResponse
logger = logging.getLogger(__name__)

View File

@ -0,0 +1,47 @@
import logging
from collections import Iterable
import jsonpath
logger = logging.getLogger(__name__)
def get_jsonpath(params: dict = None, exclude_params: list = None, type_params: dict = None):
if params is None:
params = {}
if exclude_params is None:
exclude_params = []
_filters = []
for param_name, param_value in params.items():
if param_name in exclude_params:
continue
if type_params is None or type_params.get(param_name, 'str') == 'str':
_filter = f"@.{param_name}=='{param_value}'"
else:
_filter = f"@.{param_name}=={param_value}"
_filters.append(_filter)
_path = " || ".join(_filters)
if not _path:
return ""
return f"[?({_path})]"
def filter_json(obj: list, expr: str, *args, **kwargs):
if not isinstance(obj, Iterable):
return []
if not expr.startswith('$'):
expr = f"${expr}"
logger.debug(f"expr={expr}, len={len(obj)}")
return jsonpath.jsonpath(obj, expr, *args)
def search_json(obj: list, search: str, search_fields: list):
queryset = []
search = search.lower()
for ele in obj:
for field_name in search_fields:
value = ele.get(field_name, None)
if value and search in str(value).lower():
queryset.append(ele)
break
return queryset

View File

@ -0,0 +1,169 @@
import json
from collections import Iterable
from django.apps import apps
from django.apps.config import AppConfig
from django.db.models.fields import Field
from rest_framework.renderers import JSONRenderer
def get_primary_field(model, many=False):
"""
获取模型的主键列对应的Field
:param model:
:param many:
:return:
"""
primary_field: Field = list(filter(lambda field: field.primary_key, model._meta.local_fields))
if many:
return primary_field
return primary_field[0]
def get_primary_key_name(model, many=False):
primary_field = get_primary_field(model=model, many=many)
if many:
return [field.name for field in primary_field]
return primary_field.name
def get_business_key_name(model):
"""
获取业务列名称
:param model:
:return:
"""
return getattr(model, 'business_field_name', get_primary_key_name(model, False))
def get_business_field(model):
"""
获取模型的业务列对应的Field
:param model:
:return:
"""
business_key_name = get_business_key_name(model)
business_field = list(filter(lambda field: field.name == business_key_name, model._meta.local_fields))
return business_field[0]
def get_model(app_label: str = None, model_name: str = None, model_label: str = None):
"""
根据AppModel名称获取model_class
使用:get_model(app_label='op_cmdb', model_name='Business')
或者:get_model(model_label='op_cmdb.Business')
:param app_label: settings中注册的app的名称, 例如:op_cmdb, admin
:param model_name: 某个app中模型的类名, :Business, host, dept(忽略大小写)
:param model_label: 例如: op_cmdb.Business
:return:
"""
if model_label:
app_label, model_name = model_label.split(".")
app_conf: AppConfig = apps.get_app_config(app_label)
return app_conf.get_model(model_name)
class ModelRelateUtils:
"""
封装ORM模型的映射操作,例如
"""
@classmethod
def model_to_dict(cls, models=None, serializer=None, default=None):
"""
ORM模型对象转化为字典
:param models: 模型对象
:param serializer: 模型的序列化器
:param default:
:return:
"""
if default is None:
default = {}
if not models or not serializer:
return default
is_iterable = isinstance(models, Iterable) and not isinstance(models, dict)
if is_iterable:
return [json.loads(JSONRenderer().render(serializer(model).data)) for model in models]
return json.loads(JSONRenderer().render(serializer(models).data))
@classmethod
def serializer_to_dict(cls, datas):
"""
ORM模型对象转化为字典
:param datas: 序列化器反序列化之后的data
:return:
"""
is_iterable = isinstance(datas, Iterable)
if is_iterable:
return [json.loads(JSONRenderer().render(data)) for data in datas]
return json.loads(JSONRenderer().render(datas))
@classmethod
def executeModelRelate(cls, model, related_name, fun_name, id_list):
"""
执行RelatedManager的add方法
:param model: Model
:param related_name: 映射名称
:param fun_name: 函数名称
:param id_list: 单个或者多个
:return:
"""
# 获取函数
related_manager = getattr(model, related_name, '')
if not related_manager:
return 0
return cls.executeRelatedManager(related_manager, fun_name, id_list)
@classmethod
def executeRelatedManager(cls, related_manager, fun_name, id_list):
"""
执行RelatedManager的add方法
:param related_manager: RelatedManager
:param fun_name: RelatedManager的函数名称
:param id_list: 单个或者多个
:return:
"""
# 获取函数
fun = getattr(related_manager, fun_name, '')
# 判断是一个函数
if not hasattr(fun, "__call__"):
return 0
# 判断参数是否一个集合
is_iterable = isinstance(id_list, Iterable) and type(id_list) != str
if is_iterable:
fun(*id_list)
return len(id_list)
else:
fun(id_list)
return 1
@classmethod
def executeRelatedManagerAddMethod(cls, related_manager, id_list):
"""
执行RelatedManager的add方法
:param related_manager: RelatedManager
:param id_list: 单个或者多个
:return:
"""
return cls.executeRelatedManager(related_manager, 'add', id_list)
@classmethod
def executeRelatedManagerSetMethod(cls, related_manager, id_list):
"""
执行RelatedManager的add方法
:param related_manager: RelatedManager
:param id_list: 单个或者多个
:return:
"""
return cls.executeRelatedManager(related_manager, 'set', id_list)
@classmethod
def executeRelatedManagerRemoveMethod(cls, related_manager, id_list):
"""
执行RelatedManager的remove方法
:param related_manager: RelatedManager
:param id_list: 单个或者多个
:return:
"""
return cls.executeRelatedManager(related_manager, 'remove', id_list)

View File

@ -0,0 +1,128 @@
"""
Request工具类
"""
import json
import logging
from django.contrib.auth.models import AbstractBaseUser
from rest_framework.authentication import BaseAuthentication
from rest_framework.settings import api_settings as drf_settings
from django.contrib.auth.models import AnonymousUser
from django.urls.resolvers import ResolverMatch
logger = logging.getLogger(__name__)
def get_request_user(request, authenticate=True):
"""
获取请求user
(1)如果request里的user没有认证,那么则手动认证一次
:param request:
:param authenticate:
:return:
"""
user: AbstractBaseUser = getattr(request, 'user', None)
if user and user.is_authenticated:
return user
authentication: BaseAuthentication = None
for authentication_class in drf_settings.DEFAULT_AUTHENTICATION_CLASSES:
try:
authentication = authentication_class()
user_auth_tuple = authentication.authenticate(request)
if user_auth_tuple is not None:
user, token = user_auth_tuple
if authenticate:
request.user = user
return user
except Exception:
pass
return user or AnonymousUser()
def get_request_ip(request):
"""
获取请求IP
:param request:
:return:
"""
ip = getattr(request, 'request_ip', None)
if ip:
return ip
ip = request.META.get('REMOTE_ADDR', '')
if not ip:
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[-1].strip()
else:
ip = 'unknown'
return ip
def get_request_data(request):
"""
获取请求参数
:param request:
:return:
"""
request_data = getattr(request, 'request_data', None)
if request_data:
return request_data
data: dict = {**request.GET.dict(), **request.POST.dict()}
if not data:
body = getattr(request, '_body', request.body)
if body:
data = json.loads(body)
if not isinstance(data, dict):
data = {'data': data}
return data
def get_request_path(request, *args, **kwargs):
"""
获取请求路径
:param request:
:param args:
:param kwargs:
:return:
"""
request_path = getattr(request, 'request_path', None)
if request_path:
return request_path
values = []
for arg in args:
if len(arg) == 0:
continue
if isinstance(arg, str):
values.append(arg)
elif isinstance(arg, (tuple, set, list)):
values.extend(arg)
elif isinstance(arg, dict):
values.extend(arg.values())
if len(values) == 0:
return request.path
path: str = request.path
for value in values:
path = path.replace('/' + value, '/' + '{id}')
return path
def get_request_canonical_path(request, *args, **kwargs):
"""
获取请求路径
:param request:
:param args:
:param kwargs:
:return:
"""
request_path = getattr(request, 'request_canonical_path', None)
if request_path:
return request_path
path: str = request.path
resolver_match: ResolverMatch = request.resolver_match
for value in resolver_match.args:
path = path.replace(f"/{value}", "/{id}")
for key, value in resolver_match.kwargs.items():
path = path.replace(f"/{value}", f"/{{{key}}}")
if key == 'pk':
pass
return path

View File

@ -0,0 +1,80 @@
"""
封装排序:
普通类型列表的排序
字典列表的排序
对象列表的排序
"""
import operator
from collections import Iterable
def sortSimpleTypeList(li, reverse=False):
"""
排序简单的类型
:param li:
:param reverse:
:return:
"""
li.sort()
if reverse:
li.reverse()
return li
def sortObjectList(obj_list, by_prop):
"""
排序列表:按照对象的某个属性排序
:param obj_list:
:param by_prop:
:return:
"""
reverse = False
if by_prop.startswith('-'):
reverse = True
by_prop = by_prop[1:]
fun = operator.attrgetter(by_prop)
obj_list.sort(key=fun)
if reverse:
obj_list.reverse()
return obj_list
def sortDictList(dict_list, by_key):
"""
排序字典列表:按照字典的某个key的value排序
:param dict_list:
:param by_key:
:return:
"""
reverse = False
if by_key.startswith('-'):
reverse = True
by_key = by_key[1:]
dict_list.sort(key=lambda ele: ele[by_key])
if reverse:
dict_list.reverse()
return dict_list
def sortList(li, by=''):
"""
排序集合: by加上前缀'-'表示逆序
:param li: 字典集合 or 对象集合
:param by: 通过哪一个属性, name 按照name排序; -name 按照name反向排序
:return: 原对象(排序后集合, 不是返回新集合)
"""
reverse = False
if not li or not isinstance(li, Iterable):
# 不是集合, 或空集合返回原内容
return li
if by.startswith('-'):
reverse = True
if isinstance(li[0], (int, float, str)):
return sortSimpleTypeList(li, reverse)
if not by:
# 非简单类型的list, 必须要传入by, 否则不排序
return li
if isinstance(li[0], dict):
# 如果第一个元素是字典类型,则默认所有元素都是字典类型
return sortDictList(li, by)
return sortObjectList(li, by)

View File

@ -1,6 +1,5 @@
"""
封装字符串相关函数:UUID字符串,字符串加密解密
@author: wanglei
"""
import uuid as UUID
import base64