mirror of https://github.com/jumpserver/jumpserver
perf: decrypt secret logic
parent
fdd7d9b6b1
commit
034ee65157
|
@ -143,11 +143,13 @@ class EncryptMixin:
|
||||||
if value is None:
|
if value is None:
|
||||||
return value
|
return value
|
||||||
|
|
||||||
plain_value = Encryptor(value).decrypt()
|
encryptor = Encryptor(value)
|
||||||
|
plain_value = encryptor.decrypt()
|
||||||
|
|
||||||
# 如果解密失败,则使用原来的值
|
# 如果解密失败,并且可能不是加密数据,则使用原始值
|
||||||
if not plain_value:
|
if not plain_value and not encryptor.is_encrypted_data():
|
||||||
plain_value = value
|
plain_value = value
|
||||||
|
|
||||||
# 可能和Json mix,所以要先解密,再json
|
# 可能和Json mix,所以要先解密,再json
|
||||||
sp = super()
|
sp = super()
|
||||||
if hasattr(sp, "from_db_value"):
|
if hasattr(sp, "from_db_value"):
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
import base64
|
||||||
|
|
||||||
from django.db import connections, transaction, connection
|
from django.db import connections, transaction, connection
|
||||||
from django.utils.encoding import force_str
|
from django.utils.encoding import force_str
|
||||||
|
@ -102,6 +103,54 @@ class Encryptor:
|
||||||
def __init__(self, value):
|
def __init__(self, value):
|
||||||
self.value = force_str(value)
|
self.value = force_str(value)
|
||||||
|
|
||||||
|
def is_encrypted_data(self):
|
||||||
|
"""
|
||||||
|
检测数据是否为加密格式
|
||||||
|
返回 True 表示是加密数据,False 表示是原始数据
|
||||||
|
"""
|
||||||
|
if not self.value:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 检测 base64 编码格式 (crypto.encrypt 的输出)
|
||||||
|
try:
|
||||||
|
# 尝试不同的 base64 解码方式
|
||||||
|
# 1. 标准 base64
|
||||||
|
try:
|
||||||
|
base64.b64decode(self.value)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 2. URL-safe base64
|
||||||
|
try:
|
||||||
|
# 添加必要的填充
|
||||||
|
missing_padding = len(self.value) % 4
|
||||||
|
if missing_padding:
|
||||||
|
padded_value = self.value + '=' * (4 - missing_padding)
|
||||||
|
else:
|
||||||
|
padded_value = self.value
|
||||||
|
base64.urlsafe_b64decode(padded_value)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 检测 AES GCM 格式 (固定72字符metadata)
|
||||||
|
if len(self.value) > 72:
|
||||||
|
try:
|
||||||
|
# 前72字符应该是3个24字符的base64编码
|
||||||
|
metadata = self.value[:72]
|
||||||
|
for i in range(0, 72, 24):
|
||||||
|
part = metadata[i:i+24]
|
||||||
|
base64.b64decode(part)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def decrypt(self):
|
def decrypt(self):
|
||||||
plain_value = crypto.decrypt(self.value)
|
plain_value = crypto.decrypt(self.value)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue