fix(filter): orm lookups adjusted for CharFilter
parent
645b2ab37b
commit
1dd13c9073
|
@ -2,4 +2,5 @@
|
|||
/backend/.idea
|
||||
.idea
|
||||
|
||||
.history/
|
||||
.history/
|
||||
.vscode/
|
|
@ -164,14 +164,15 @@ class UserViewSet(CustomModelViewSet):
|
|||
serializer_class = UserSerializer
|
||||
create_serializer_class = UserCreateSerializer
|
||||
update_serializer_class = UserUpdateSerializer
|
||||
filter_fields = ["name", "username", "gender", "is_active", "dept", "user_type"]
|
||||
# filter_fields = {
|
||||
# 'name': ['icontains'],
|
||||
# 'username': ['icontains'],
|
||||
# 'gender': ['icontains'],
|
||||
# 'is_active': ['icontains'],
|
||||
# 'dept': ['exact'],
|
||||
# }
|
||||
# filter_fields = ["name", "username", "gender", "is_active", "dept", "user_type"]
|
||||
filter_fields = {
|
||||
"name": ["icontains"],
|
||||
"username": ["exact"],
|
||||
"gender": ["icontains"],
|
||||
"is_active": ["icontains"],
|
||||
"dept": ["exact"],
|
||||
"user_type": ["exact"],
|
||||
}
|
||||
search_fields = ["username", "name", "gender", "dept__name", "role__name"]
|
||||
# 导出
|
||||
export_field_label = [
|
||||
|
|
|
@ -32,13 +32,13 @@ def get_dept(dept_id: int, dept_all_list=None, dept_list=None):
|
|||
:return:
|
||||
"""
|
||||
if not dept_all_list:
|
||||
dept_all_list = Dept.objects.all().values('id', 'parent')
|
||||
dept_all_list = Dept.objects.all().values("id", "parent")
|
||||
if dept_list is None:
|
||||
dept_list = [dept_id]
|
||||
for ele in dept_all_list:
|
||||
if ele.get('parent') == dept_id:
|
||||
dept_list.append(ele.get('id'))
|
||||
get_dept(ele.get('id'), dept_all_list, dept_list)
|
||||
if ele.get("parent") == dept_id:
|
||||
dept_list.append(ele.get("id"))
|
||||
get_dept(ele.get("id"), dept_all_list, dept_list)
|
||||
return list(set(dept_list))
|
||||
|
||||
|
||||
|
@ -54,20 +54,26 @@ class DataLevelPermissionsFilter(BaseFilterBackend):
|
|||
4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
|
||||
5. 自定数据权限 获取部门,根据部门过滤
|
||||
"""
|
||||
|
||||
def filter_queryset(self, request, queryset, view):
|
||||
"""
|
||||
接口白名单是否认证数据权限
|
||||
"""
|
||||
api = request.path # 当前请求接口
|
||||
method = request.method # 当前请求方法
|
||||
methodList = ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
|
||||
methodList = ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
|
||||
method = methodList.index(method)
|
||||
# ***接口白名单***
|
||||
api_white_list = ApiWhiteList.objects.filter(enable_datasource=False).values(permission__api=F('url'),
|
||||
permission__method=F('method'))
|
||||
api_white_list = ApiWhiteList.objects.filter(enable_datasource=False).values(
|
||||
permission__api=F("url"), permission__method=F("method")
|
||||
)
|
||||
api_white_list = [
|
||||
str(item.get('permission__api').replace('{id}', '.*?')) + ":" + str(item.get('permission__method')) for item
|
||||
in api_white_list if item.get('permission__api')]
|
||||
str(item.get("permission__api").replace("{id}", ".*?"))
|
||||
+ ":"
|
||||
+ str(item.get("permission__method"))
|
||||
for item in api_white_list
|
||||
if item.get("permission__api")
|
||||
]
|
||||
for item in api_white_list:
|
||||
new_api = api + ":" + str(method)
|
||||
matchObj = re.match(item, new_api, re.M | re.I)
|
||||
|
@ -81,16 +87,16 @@ class DataLevelPermissionsFilter(BaseFilterBackend):
|
|||
"""
|
||||
if request.user.is_superuser == 0:
|
||||
# 0. 获取用户的部门id,没有部门则返回空
|
||||
user_dept_id = getattr(request.user, 'dept_id', None)
|
||||
user_dept_id = getattr(request.user, "dept_id", None)
|
||||
if not user_dept_id:
|
||||
return queryset.none()
|
||||
|
||||
# 1. 判断过滤的数据是否有创建人所在部门 "dept_belong_id" 字段
|
||||
if not getattr(queryset.model, 'dept_belong_id', None):
|
||||
if not getattr(queryset.model, "dept_belong_id", None):
|
||||
return queryset
|
||||
|
||||
# 2. 如果用户没有关联角色则返回本部门数据
|
||||
if not hasattr(request.user, 'role'):
|
||||
if not hasattr(request.user, "role"):
|
||||
return queryset.filter(dept_belong_id=user_dept_id)
|
||||
|
||||
# 3. 根据所有角色 获取所有权限范围
|
||||
|
@ -99,28 +105,38 @@ class DataLevelPermissionsFilter(BaseFilterBackend):
|
|||
# (2, "本部门数据权限"),
|
||||
# (3, "全部数据权限"),
|
||||
# (4, "自定数据权限")
|
||||
role_list = request.user.role.filter(status=1).values('admin', 'data_range')
|
||||
dataScope_list = [] # 权限范围列表
|
||||
role_list = request.user.role.filter(status=1).values("admin", "data_range")
|
||||
dataScope_list = [] # 权限范围列表
|
||||
for ele in role_list:
|
||||
# 判断用户是否为超级管理员角色/如果拥有[全部数据权限]则返回所有数据
|
||||
if 3 == ele.get('data_range') or ele.get('admin') == True:
|
||||
if 3 == ele.get("data_range") or ele.get("admin") == True:
|
||||
return queryset
|
||||
dataScope_list.append(ele.get('data_range'))
|
||||
dataScope_list.append(ele.get("data_range"))
|
||||
dataScope_list = list(set(dataScope_list))
|
||||
|
||||
# 4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
|
||||
if 0 in dataScope_list:
|
||||
return queryset.filter(creator=request.user, dept_belong_id=user_dept_id)
|
||||
return queryset.filter(
|
||||
creator=request.user, dept_belong_id=user_dept_id
|
||||
)
|
||||
|
||||
# 5. 自定数据权限 获取部门,根据部门过滤
|
||||
dept_list = []
|
||||
for ele in dataScope_list:
|
||||
if ele == 4:
|
||||
dept_list.extend(request.user.role.filter(status=1).values_list('dept__id', flat=True))
|
||||
dept_list.extend(
|
||||
request.user.role.filter(status=1).values_list(
|
||||
"dept__id", flat=True
|
||||
)
|
||||
)
|
||||
elif ele == 2:
|
||||
dept_list.append(user_dept_id)
|
||||
elif ele == 1:
|
||||
dept_list.extend(get_dept(user_dept_id, ))
|
||||
dept_list.extend(
|
||||
get_dept(
|
||||
user_dept_id,
|
||||
)
|
||||
)
|
||||
return queryset.filter(dept_belong_id__in=list(set(dept_list)))
|
||||
else:
|
||||
return queryset
|
||||
|
@ -128,11 +144,11 @@ class DataLevelPermissionsFilter(BaseFilterBackend):
|
|||
|
||||
class CustomDjangoFilterBackend(DjangoFilterBackend):
|
||||
lookup_prefixes = {
|
||||
'^': 'istartswith',
|
||||
'=': 'iexact',
|
||||
'@': 'search',
|
||||
'$': 'iregex',
|
||||
'~': 'icontains'
|
||||
"^": "istartswith",
|
||||
"=": "iexact",
|
||||
"@": "search",
|
||||
"$": "iregex",
|
||||
"~": "icontains",
|
||||
}
|
||||
|
||||
def construct_search(self, field_name):
|
||||
|
@ -140,7 +156,7 @@ class CustomDjangoFilterBackend(DjangoFilterBackend):
|
|||
if lookup:
|
||||
field_name = field_name[1:]
|
||||
else:
|
||||
lookup = 'icontains'
|
||||
lookup = "icontains"
|
||||
return LOOKUP_SEP.join([field_name, lookup])
|
||||
|
||||
def find_filter_lookups(self, orm_lookups, search_term_key):
|
||||
|
@ -153,39 +169,43 @@ class CustomDjangoFilterBackend(DjangoFilterBackend):
|
|||
"""
|
||||
Return the `FilterSet` class used to filter the queryset.
|
||||
"""
|
||||
filterset_class = getattr(view, 'filterset_class', None)
|
||||
filterset_fields = getattr(view, 'filterset_fields', None)
|
||||
filterset_class = getattr(view, "filterset_class", None)
|
||||
filterset_fields = getattr(view, "filterset_fields", None)
|
||||
|
||||
# TODO: remove assertion in 2.1
|
||||
if filterset_class is None and hasattr(view, 'filter_class'):
|
||||
if filterset_class is None and hasattr(view, "filter_class"):
|
||||
utils.deprecate(
|
||||
"`%s.filter_class` attribute should be renamed `filterset_class`."
|
||||
% view.__class__.__name__)
|
||||
filterset_class = getattr(view, 'filter_class', None)
|
||||
% view.__class__.__name__
|
||||
)
|
||||
filterset_class = getattr(view, "filter_class", None)
|
||||
|
||||
# TODO: remove assertion in 2.1
|
||||
if filterset_fields is None and hasattr(view, 'filter_fields'):
|
||||
if filterset_fields is None and hasattr(view, "filter_fields"):
|
||||
utils.deprecate(
|
||||
"`%s.filter_fields` attribute should be renamed `filterset_fields`."
|
||||
% view.__class__.__name__)
|
||||
filterset_fields = getattr(view, 'filter_fields', None)
|
||||
% view.__class__.__name__
|
||||
)
|
||||
filterset_fields = getattr(view, "filter_fields", None)
|
||||
|
||||
if filterset_class:
|
||||
filterset_model = filterset_class._meta.model
|
||||
|
||||
# FilterSets do not need to specify a Meta class
|
||||
if filterset_model and queryset is not None:
|
||||
assert issubclass(queryset.model, filterset_model), \
|
||||
'FilterSet model %s does not match queryset model %s' % \
|
||||
(filterset_model, queryset.model)
|
||||
assert issubclass(
|
||||
queryset.model, filterset_model
|
||||
), "FilterSet model %s does not match queryset model %s" % (
|
||||
filterset_model,
|
||||
queryset.model,
|
||||
)
|
||||
|
||||
return filterset_class
|
||||
|
||||
if filterset_fields and queryset is not None:
|
||||
MetaBase = getattr(self.filterset_base, 'Meta', object)
|
||||
MetaBase = getattr(self.filterset_base, "Meta", object)
|
||||
|
||||
class AutoFilterSet(self.filterset_base):
|
||||
|
||||
@classmethod
|
||||
def get_filters(cls):
|
||||
"""
|
||||
|
@ -206,6 +226,7 @@ class CustomDjangoFilterBackend(DjangoFilterBackend):
|
|||
field = get_model_field(cls._meta.model, field_name)
|
||||
from django.db import models
|
||||
from timezone_field import TimeZoneField
|
||||
|
||||
# 不进行 过滤的model 类
|
||||
if isinstance(field, (models.JSONField, TimeZoneField)):
|
||||
continue
|
||||
|
@ -222,16 +243,20 @@ class CustomDjangoFilterBackend(DjangoFilterBackend):
|
|||
continue
|
||||
|
||||
if field is not None:
|
||||
filters[filter_name] = cls.filter_for_field(field, field_name, lookup_expr)
|
||||
filters[filter_name] = cls.filter_for_field(
|
||||
field, field_name, lookup_expr
|
||||
)
|
||||
|
||||
# Allow Meta.fields to contain declared filters *only* when a list/tuple
|
||||
if isinstance(cls._meta.fields, (list, tuple)):
|
||||
undefined = [f for f in undefined if f not in cls.declared_filters]
|
||||
undefined = [
|
||||
f for f in undefined if f not in cls.declared_filters
|
||||
]
|
||||
|
||||
if undefined:
|
||||
raise TypeError(
|
||||
"'Meta.fields' must not contain non-model field names: %s"
|
||||
% ', '.join(undefined)
|
||||
% ", ".join(undefined)
|
||||
)
|
||||
|
||||
# Add in declared filters. This is necessary since we don't enforce adding
|
||||
|
@ -251,14 +276,21 @@ class CustomDjangoFilterBackend(DjangoFilterBackend):
|
|||
filterset = self.get_filterset(request, queryset, view)
|
||||
if filterset is None:
|
||||
return queryset
|
||||
if filterset.__class__.__name__ == 'AutoFilterSet':
|
||||
if filterset.__class__.__name__ == "AutoFilterSet":
|
||||
queryset = filterset.queryset
|
||||
orm_lookups = []
|
||||
for search_field in filterset.filters:
|
||||
if isinstance(filterset.filters[search_field], CharFilter):
|
||||
orm_lookups.append(self.construct_search(six.text_type(search_field)))
|
||||
orm_lookups.append(
|
||||
self.construct_search(six.text_type(search_field))
|
||||
)
|
||||
else:
|
||||
orm_lookups.append(search_field)
|
||||
orm_lookups = (
|
||||
orm_lookups
|
||||
if isinstance(filterset.__class__._meta.fields, (list, tuple))
|
||||
else filterset.filters.keys()
|
||||
)
|
||||
conditions = []
|
||||
queries = []
|
||||
for search_term_key in filterset.data.keys():
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue