jumpserver/apps/common/db/fields.py

246 lines
6.1 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
#
import json
2022-11-11 07:04:31 +00:00
2022-11-11 09:28:13 +00:00
from django.core.validators import MinValueValidator, MaxValueValidator
from django.db import models
from django.utils.encoding import force_text
2022-11-11 09:28:13 +00:00
from django.utils.translation import ugettext_lazy as _
2022-11-11 07:04:31 +00:00
from common.utils import signer, crypto
__all__ = [
2022-11-11 07:11:10 +00:00
"JsonMixin",
"JsonDictMixin",
"JsonListMixin",
"JsonTypeMixin",
"JsonCharField",
"JsonTextField",
"JsonListCharField",
"JsonListTextField",
"JsonDictCharField",
"JsonDictTextField",
"EncryptCharField",
"EncryptTextField",
"EncryptMixin",
"EncryptJsonDictTextField",
"EncryptJsonDictCharField",
"PortField",
"BitChoices",
]
class JsonMixin:
tp = None
@staticmethod
def json_decode(data):
try:
return json.loads(data)
except (TypeError, json.JSONDecodeError):
return None
@staticmethod
def json_encode(data):
return json.dumps(data)
def from_db_value(self, value, expression, connection, context=None):
if value is None:
return value
return self.json_decode(value)
def to_python(self, value):
if value is None:
return value
if not isinstance(value, str) or not value.startswith('"'):
return value
else:
return self.json_decode(value)
def get_prep_value(self, value):
if value is None:
return value
return self.json_encode(value)
class JsonTypeMixin(JsonMixin):
tp = dict
def from_db_value(self, value, expression, connection, context=None):
value = super().from_db_value(value, expression, connection, context)
if not isinstance(value, self.tp):
value = self.tp()
return value
def to_python(self, value):
data = super().to_python(value)
if not isinstance(data, self.tp):
data = self.tp()
return data
def get_prep_value(self, value):
if not isinstance(value, self.tp):
value = self.tp()
return self.json_encode(value)
class JsonDictMixin(JsonTypeMixin):
tp = dict
class JsonDictCharField(JsonDictMixin, models.CharField):
description = _("Marshal dict data to char field")
class JsonDictTextField(JsonDictMixin, models.TextField):
description = _("Marshal dict data to text field")
class JsonListMixin(JsonTypeMixin):
tp = list
class JsonStrListMixin(JsonListMixin):
pass
class JsonListCharField(JsonListMixin, models.CharField):
description = _("Marshal list data to char field")
class JsonListTextField(JsonListMixin, models.TextField):
description = _("Marshal list data to text field")
class JsonCharField(JsonMixin, models.CharField):
description = _("Marshal data to char field")
class JsonTextField(JsonMixin, models.TextField):
description = _("Marshal data to text field")
class EncryptMixin:
"""
EncryptMixin要放在最前面
"""
2020-06-11 06:10:55 +00:00
def decrypt_from_signer(self, value):
2022-11-11 07:11:10 +00:00
return signer.unsign(value) or ""
2020-06-11 06:10:55 +00:00
def from_db_value(self, value, expression, connection, context=None):
2022-10-18 12:37:17 +00:00
if not value:
return value
2022-10-18 12:37:17 +00:00
value = force_text(value)
2020-10-28 09:28:15 +00:00
plain_value = crypto.decrypt(value)
# 如果没有解开使用原来的signer解密
if not plain_value:
2020-06-11 06:10:55 +00:00
plain_value = self.decrypt_from_signer(value)
# 可能和Json mix所以要先解密再json
sp = super()
2022-11-11 07:11:10 +00:00
if hasattr(sp, "from_db_value"):
plain_value = sp.from_db_value(plain_value, expression, connection, context)
return plain_value
def get_prep_value(self, value):
2022-10-18 12:37:17 +00:00
if not value:
return value
2020-06-11 06:10:55 +00:00
# 先 json 再解密
sp = super()
2022-11-11 07:11:10 +00:00
if hasattr(sp, "get_prep_value"):
value = sp.get_prep_value(value)
value = force_text(value)
# 替换新的加密方式
return crypto.encrypt(value)
class EncryptTextField(EncryptMixin, models.TextField):
description = _("Encrypt field using Secret Key")
class EncryptCharField(EncryptMixin, models.CharField):
2019-06-24 14:16:39 +00:00
@staticmethod
def change_max_length(kwargs):
2022-11-11 07:11:10 +00:00
kwargs.setdefault("max_length", 1024)
max_length = kwargs.get("max_length")
2019-06-24 14:16:39 +00:00
if max_length < 129:
max_length = 128
max_length = max_length * 2
2022-11-11 07:11:10 +00:00
kwargs["max_length"] = max_length
2019-06-24 14:16:39 +00:00
def __init__(self, *args, **kwargs):
2019-06-24 14:16:39 +00:00
self.change_max_length(kwargs)
super().__init__(*args, **kwargs)
2019-06-24 14:16:39 +00:00
def deconstruct(self):
name, path, args, kwargs = super().deconstruct()
2022-11-11 07:11:10 +00:00
max_length = kwargs.pop("max_length")
2019-06-24 14:16:39 +00:00
if max_length > 255:
max_length = max_length // 2
2022-11-11 07:11:10 +00:00
kwargs["max_length"] = max_length
2019-06-24 14:16:39 +00:00
return name, path, args, kwargs
class EncryptJsonDictTextField(EncryptMixin, JsonDictTextField):
pass
class EncryptJsonDictCharField(EncryptMixin, JsonDictCharField):
pass
class PortField(models.IntegerField):
def __init__(self, *args, **kwargs):
2022-11-11 07:11:10 +00:00
kwargs.update(
{
"blank": False,
"null": False,
"validators": [MinValueValidator(0), MaxValueValidator(65535)],
}
)
super().__init__(*args, **kwargs)
2022-11-11 07:04:31 +00:00
class BitChoices(models.IntegerChoices):
@classmethod
def branches(cls):
return [i for i in cls]
2022-11-11 09:28:13 +00:00
@classmethod
def is_tree(cls):
return False
2022-11-11 07:04:31 +00:00
@classmethod
def tree(cls):
2022-11-11 09:28:13 +00:00
if not cls.is_tree():
return []
2022-11-11 07:11:10 +00:00
root = [_("All"), cls.branches()]
2022-11-11 09:28:13 +00:00
return [cls.render_node(root)]
2022-11-11 07:04:31 +00:00
@classmethod
def render_node(cls, node):
if isinstance(node, BitChoices):
return {
2022-11-11 09:28:13 +00:00
"value": node.name,
2022-11-11 07:11:10 +00:00
"label": node.label,
2022-11-11 07:04:31 +00:00
}
else:
name, children = node
return {
2022-11-11 09:28:13 +00:00
"value": name,
2022-11-11 07:11:10 +00:00
"label": name,
"children": [cls.render_node(child) for child in children],
2022-11-11 07:04:31 +00:00
}
@classmethod
def all(cls):
value = 0
for c in cls:
value |= c.value
return value