mirror of https://github.com/jumpserver/jumpserver
feat: 添加AES GCM模式为默认的加密方式
parent
e81762d692
commit
8227f44058
|
@ -5,7 +5,7 @@ from django.db import models
|
|||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.encoding import force_text
|
||||
|
||||
from ..utils import signer, aes_crypto
|
||||
from ..utils import signer, aes_crypto, aes_ecb_crypto
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
@ -117,9 +117,17 @@ class EncryptMixin:
|
|||
return signer.unsign(value) or ''
|
||||
|
||||
def decrypt_from_aes(self, value):
|
||||
"""
|
||||
先尝试使用GCM模式解密,如果解不开,再尝试使用原来的ECB模式解密
|
||||
"""
|
||||
try:
|
||||
return aes_crypto.decrypt(value)
|
||||
except (TypeError, ValueError):
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
return aes_ecb_crypto.decrypt(value)
|
||||
except (TypeError, ValueError, UnicodeDecodeError):
|
||||
pass
|
||||
|
||||
def from_db_value(self, value, expression, connection, context):
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import base64
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from Crypto.Random import get_random_bytes
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
|
@ -44,11 +46,69 @@ class AESCrypto:
|
|||
return str(aes.decrypt(base64.decodebytes(bytes(text, encoding='utf8'))).rstrip(b'\0').decode("utf8")) # 解密
|
||||
|
||||
|
||||
def get_aes_crypto(key=None):
|
||||
class AESCryptoGCM:
|
||||
"""
|
||||
使用AES GCM模式
|
||||
"""
|
||||
|
||||
def __init__(self, key):
|
||||
self.key = self.process_key(key)
|
||||
|
||||
@staticmethod
|
||||
def process_key(key):
|
||||
"""
|
||||
返回32 bytes 的key
|
||||
"""
|
||||
if not isinstance(key, bytes):
|
||||
key = bytes(key, encoding='utf-8')
|
||||
|
||||
if len(key) >= 32:
|
||||
return key[:32]
|
||||
|
||||
return pad(key, 32)
|
||||
|
||||
def encrypt(self, text):
|
||||
"""
|
||||
加密text,并将 header, nonce, tag (3*16 bytes, base64后变为 3*24 bytes)
|
||||
附在密文前。解密时要用到。
|
||||
"""
|
||||
header = get_random_bytes(16)
|
||||
cipher = AES.new(self.key, AES.MODE_GCM)
|
||||
cipher.update(header)
|
||||
ciphertext, tag = cipher.encrypt_and_digest(bytes(text, encoding='utf-8'))
|
||||
|
||||
result = []
|
||||
for byte_data in (header, cipher.nonce, tag, ciphertext):
|
||||
result.append(base64.b64encode(byte_data).decode('utf-8'))
|
||||
|
||||
return ''.join(result)
|
||||
|
||||
def decrypt(self, text):
|
||||
"""
|
||||
提取header, nonce, tag并解密text。
|
||||
"""
|
||||
metadata = text[:72]
|
||||
header = base64.b64decode(metadata[:24])
|
||||
nonce = base64.b64decode(metadata[24:48])
|
||||
tag = base64.b64decode(metadata[48:])
|
||||
ciphertext = base64.b64decode(text[72:])
|
||||
|
||||
cipher = AES.new(self.key, AES.MODE_GCM, nonce=nonce)
|
||||
|
||||
cipher.update(header)
|
||||
plain_text_bytes = cipher.decrypt_and_verify(ciphertext, tag)
|
||||
return plain_text_bytes.decode('utf-8')
|
||||
|
||||
|
||||
def get_aes_crypto(key=None, mode='GCM'):
|
||||
if key is None:
|
||||
key = settings.SECRET_KEY
|
||||
a = AESCrypto(key)
|
||||
if mode == 'ECB':
|
||||
a = AESCrypto(key)
|
||||
elif mode == 'GCM':
|
||||
a = AESCryptoGCM(key)
|
||||
return a
|
||||
|
||||
|
||||
aes_crypto = get_aes_crypto()
|
||||
aes_ecb_crypto = get_aes_crypto(mode='ECB')
|
||||
aes_crypto = get_aes_crypto(mode='GCM')
|
||||
|
|
Loading…
Reference in New Issue