diff --git a/apps/common/fields/model.py b/apps/common/fields/model.py index 945a98ea4..b4ac41b2f 100644 --- a/apps/common/fields/model.py +++ b/apps/common/fields/model.py @@ -5,7 +5,7 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from django.utils.encoding import force_text -from ..utils import signer, aes_crypto +from ..utils import signer, aes_crypto, aes_ecb_crypto __all__ = [ @@ -117,9 +117,17 @@ class EncryptMixin: return signer.unsign(value) or '' def decrypt_from_aes(self, value): + """ + 先尝试使用GCM模式解密,如果解不开,再尝试使用原来的ECB模式解密 + """ try: return aes_crypto.decrypt(value) - except (TypeError, ValueError): + except ValueError: + pass + + try: + return aes_ecb_crypto.decrypt(value) + except (TypeError, ValueError, UnicodeDecodeError): pass def from_db_value(self, value, expression, connection, context): diff --git a/apps/common/utils/crypto.py b/apps/common/utils/crypto.py index ea3590d6c..31991c93c 100644 --- a/apps/common/utils/crypto.py +++ b/apps/common/utils/crypto.py @@ -1,5 +1,7 @@ import base64 from Crypto.Cipher import AES +from Crypto.Util.Padding import pad +from Crypto.Random import get_random_bytes from django.conf import settings @@ -44,11 +46,69 @@ class AESCrypto: return str(aes.decrypt(base64.decodebytes(bytes(text, encoding='utf8'))).rstrip(b'\0').decode("utf8")) # 解密 -def get_aes_crypto(key=None): +class AESCryptoGCM: + """ + 使用AES GCM模式 + """ + + def __init__(self, key): + self.key = self.process_key(key) + + @staticmethod + def process_key(key): + """ + 返回32 bytes 的key + """ + if not isinstance(key, bytes): + key = bytes(key, encoding='utf-8') + + if len(key) >= 32: + return key[:32] + + return pad(key, 32) + + def encrypt(self, text): + """ + 加密text,并将 header, nonce, tag (3*16 bytes, base64后变为 3*24 bytes) + 附在密文前。解密时要用到。 + """ + header = get_random_bytes(16) + cipher = AES.new(self.key, AES.MODE_GCM) + cipher.update(header) + ciphertext, tag = cipher.encrypt_and_digest(bytes(text, encoding='utf-8')) + + result = [] + for byte_data in (header, cipher.nonce, tag, ciphertext): + result.append(base64.b64encode(byte_data).decode('utf-8')) + + return ''.join(result) + + def decrypt(self, text): + """ + 提取header, nonce, tag并解密text。 + """ + metadata = text[:72] + header = base64.b64decode(metadata[:24]) + nonce = base64.b64decode(metadata[24:48]) + tag = base64.b64decode(metadata[48:]) + ciphertext = base64.b64decode(text[72:]) + + cipher = AES.new(self.key, AES.MODE_GCM, nonce=nonce) + + cipher.update(header) + plain_text_bytes = cipher.decrypt_and_verify(ciphertext, tag) + return plain_text_bytes.decode('utf-8') + + +def get_aes_crypto(key=None, mode='GCM'): if key is None: key = settings.SECRET_KEY - a = AESCrypto(key) + if mode == 'ECB': + a = AESCrypto(key) + elif mode == 'GCM': + a = AESCryptoGCM(key) return a -aes_crypto = get_aes_crypto() +aes_ecb_crypto = get_aes_crypto(mode='ECB') +aes_crypto = get_aes_crypto(mode='GCM')