mirror of https://github.com/jumpserver/jumpserver
189 lines
5.3 KiB
Python
189 lines
5.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
|
|
import copy
|
|
from rest_framework import serializers
|
|
|
|
|
|
__all__ = [
|
|
'DynamicMappingField', 'ReadableHiddenField',
|
|
'CustomMetaDictField', 'IgnoreSensitiveInfoReadOnlyJSONField',
|
|
]
|
|
|
|
|
|
#
|
|
# DynamicMappingField
|
|
# -------------------
|
|
|
|
|
|
class DynamicMappingField(serializers.Field):
|
|
"""
|
|
一个可以根据用户行为而动态改变的字段
|
|
|
|
For example, Define attribute `mapping_rules`
|
|
|
|
field_name = meta
|
|
|
|
mapping_rules = {
|
|
'default': serializers.JSONField(),
|
|
'type': {
|
|
'apply_asset': {
|
|
'default': serializers.CharField(label='default'),
|
|
'get': ApplyAssetSerializer,
|
|
'post': ApproveAssetSerializer,
|
|
},
|
|
'apply_application': ApplyApplicationSerializer,
|
|
'login_confirm': LoginConfirmSerializer,
|
|
'login_times': LoginTimesSerializer
|
|
},
|
|
'category': {
|
|
'apply': ApplySerializer,
|
|
'login': LoginSerializer
|
|
}
|
|
}
|
|
"""
|
|
|
|
def __init__(self, mapping_rules, *args, **kwargs):
|
|
assert isinstance(mapping_rules, dict), (
|
|
'`mapping_rule` argument expect type `dict`, gut get `{}`'
|
|
''.format(type(mapping_rules))
|
|
)
|
|
|
|
self.__mapping_rules = mapping_rules
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@property
|
|
def mapping_rules(self):
|
|
return copy.deepcopy(self.__mapping_rules)
|
|
|
|
def to_internal_value(self, data):
|
|
""" 实际是一个虚拟字段所以不返回任何值 """
|
|
pass
|
|
|
|
def to_representation(self, value):
|
|
""" 实际是一个虚拟字段所以不返回任何值 """
|
|
pass
|
|
|
|
|
|
# A Ignore read-only fields for sensitive information
|
|
# ----------------------------------------------------------
|
|
|
|
|
|
class IgnoreSensitiveInfoReadOnlyJSONField(serializers.JSONField):
|
|
""" A ignore read-only fields for sensitive information """
|
|
|
|
def __init__(self, **kwargs):
|
|
kwargs['read_only'] = True
|
|
super().__init__(**kwargs)
|
|
|
|
def to_representation(self, value):
|
|
sensitive_ignored_value = {}
|
|
sensitive_names = ['password']
|
|
for field_name, field_value in value.items():
|
|
for sensitive_name in sensitive_names:
|
|
if sensitive_name in field_name.lower():
|
|
continue
|
|
sensitive_ignored_value[field_name] = field_value
|
|
return super().to_representation(sensitive_ignored_value)
|
|
|
|
|
|
#
|
|
# ReadableHiddenField
|
|
# -------------------
|
|
|
|
|
|
class ReadableHiddenField(serializers.HiddenField):
|
|
""" 可读的 HiddenField """
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.write_only = False
|
|
|
|
def to_representation(self, value):
|
|
if hasattr(value, 'id'):
|
|
return getattr(value, 'id')
|
|
return value
|
|
|
|
#
|
|
# OtherField
|
|
# ----------
|
|
|
|
|
|
# TODO: DELETE 替换完成后删除
|
|
class CustomMetaDictField(serializers.DictField):
|
|
"""
|
|
In use:
|
|
RemoteApp params field
|
|
CommandStorage meta field
|
|
ReplayStorage meta field
|
|
"""
|
|
type_fields_map = {}
|
|
default_type = None
|
|
convert_key_remove_type_prefix = False
|
|
convert_key_to_upper = False
|
|
|
|
def filter_attribute(self, attribute, instance):
|
|
fields = self.type_fields_map.get(instance.type, [])
|
|
for field in fields:
|
|
if field.get('write_only', False):
|
|
attribute.pop(field['name'], None)
|
|
return attribute
|
|
|
|
def get_attribute(self, instance):
|
|
"""
|
|
序列化时调用
|
|
"""
|
|
attribute = super().get_attribute(instance)
|
|
attribute = self.filter_attribute(attribute, instance)
|
|
return attribute
|
|
|
|
def convert_value_key_remove_type_prefix(self, dictionary, value):
|
|
if not self.convert_key_remove_type_prefix:
|
|
return value
|
|
tp = dictionary.get('type')
|
|
prefix = '{}_'.format(tp)
|
|
convert_value = {}
|
|
for k, v in value.items():
|
|
if k.lower().startswith(prefix):
|
|
k = k.lower().split(prefix, 1)[1]
|
|
convert_value[k] = v
|
|
return convert_value
|
|
|
|
def convert_value_key_to_upper(self, value):
|
|
if not self.convert_key_to_upper:
|
|
return value
|
|
convert_value = {k.upper(): v for k, v in value.items()}
|
|
return convert_value
|
|
|
|
def convert_value_key(self, dictionary, value):
|
|
value = self.convert_value_key_remove_type_prefix(dictionary, value)
|
|
value = self.convert_value_key_to_upper(value)
|
|
return value
|
|
|
|
def filter_value_key(self, dictionary, value):
|
|
tp = dictionary.get('type')
|
|
fields = self.type_fields_map.get(tp, [])
|
|
fields_names = [field['name'] for field in fields]
|
|
filter_value = {k: v for k, v in value.items() if k in fields_names}
|
|
return filter_value
|
|
|
|
@staticmethod
|
|
def strip_value(value):
|
|
new_value = {}
|
|
for k, v in value.items():
|
|
if isinstance(v, str):
|
|
v = v.strip()
|
|
new_value[k] = v
|
|
return new_value
|
|
|
|
def get_value(self, dictionary):
|
|
"""
|
|
反序列化时调用
|
|
"""
|
|
value = super().get_value(dictionary)
|
|
value = self.convert_value_key(dictionary, value)
|
|
value = self.filter_value_key(dictionary, value)
|
|
value = self.strip_value(value)
|
|
return value
|
|
|
|
|