feat: 使用新的对称加密方式: aes

pull/4095/head
ibuler 2020-06-11 12:10:00 +08:00
parent 04eb670ada
commit 75be45ce43
4 changed files with 74 additions and 55 deletions

View File

@ -3,8 +3,9 @@
import json import json
from django.db import models from django.db import models
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import force_text
from ..utils import signer from ..utils import signer, aes_crypto
__all__ = [ __all__ = [
@ -114,11 +115,22 @@ class EncryptMixin:
def from_db_value(self, value, expression, connection, context): def from_db_value(self, value, expression, connection, context):
if value is None: if value is None:
return value return value
value = signer.unsign(value) value = force_text(value)
plain_value = ''
# 优先采用 aes 解密
try:
plain_value = aes_crypto.decrypt(value)
except (TypeError, ValueError):
pass
# 如果没有解开使用原来的signer解密
if not plain_value:
plain_value = signer.unsign(value) or ''
sp = super() sp = super()
if hasattr(sp, 'from_db_value'): if hasattr(sp, 'from_db_value'):
return sp.from_db_value(value, expression, connection, context) plain_value = sp.from_db_value(plain_value, expression, connection, context)
return value return plain_value
def get_prep_value(self, value): def get_prep_value(self, value):
if value is None: if value is None:
@ -126,7 +138,9 @@ class EncryptMixin:
sp = super() sp = super()
if hasattr(sp, 'get_prep_value'): if hasattr(sp, 'get_prep_value'):
value = sp.get_prep_value(value) value = sp.get_prep_value(value)
return signer.sign(value) value = force_text(value)
# 替换新的加密方式
return aes_crypto.encrypt(value)
class EncryptTextField(EncryptMixin, models.TextField): class EncryptTextField(EncryptMixin, models.TextField):

View File

@ -6,3 +6,4 @@ from .django import *
from .encode import * from .encode import *
from .http import * from .http import *
from .ipip import * from .ipip import *
from .crypto import *

View File

@ -0,0 +1,54 @@
import base64
from Crypto.Cipher import AES
from django.conf import settings
class AESCrypto:
"""
AES
除了MODE_SIV模式key长度为32, 48, or 64,
其余key长度为16, 24 or 32
详细见AES内部文档
CBC模式传入iv参数
本例使用常用的ECB模式
"""
def __init__(self, key):
if len(key) > 32:
key = key[:32]
self.key = self.to_16(key)
@staticmethod
def to_16(key):
"""
转为16倍数的bytes数据
:param key:
:return:
"""
key = bytes(key, encoding="utf8")
while len(key) % 16 != 0:
key += b'\0'
return key # 返回bytes
def aes(self):
return AES.new(self.key, AES.MODE_ECB) # 初始化加密器
def encrypt(self, text):
aes = self.aes()
return str(base64.encodebytes(aes.encrypt(self.to_16(text))),
encoding='utf8').replace('\n', '') # 加密
def decrypt(self, text):
aes = self.aes()
return str(aes.decrypt(base64.decodebytes(bytes(text, encoding='utf8'))).rstrip(b'\0').decode("utf8")) # 解密
def get_aes_crypto(key=None):
if key is None:
key = settings.SECRET_KEY
a = AESCrypto(key)
return a
aes_crypto = get_aes_crypto()

View File

@ -9,7 +9,6 @@ import time
import hashlib import hashlib
from io import StringIO from io import StringIO
from itertools import chain from itertools import chain
from Crypto.Cipher import AES
import paramiko import paramiko
import sshpubkeys import sshpubkeys
@ -227,52 +226,3 @@ def model_to_json(instance, sort_keys=True, indent=2, cls=None):
cls = DjangoJSONEncoder cls = DjangoJSONEncoder
return json.dumps(data, sort_keys=sort_keys, indent=indent, cls=cls) return json.dumps(data, sort_keys=sort_keys, indent=indent, cls=cls)
class AESCrypto:
"""
AES
除了MODE_SIV模式key长度为32, 48, or 64,
其余key长度为16, 24 or 32
详细见AES内部文档
CBC模式传入iv参数
本例使用常用的ECB模式
"""
def __init__(self, key):
if len(key) > 32:
key = key[:32]
self.key = self.to_16(key)
@staticmethod
def to_16(key):
"""
转为16倍数的bytes数据
:param key:
:return:
"""
key = bytes(key, encoding="utf8")
while len(key) % 16 != 0:
key += b'\0'
return key # 返回bytes
def aes(self):
return AES.new(self.key, AES.MODE_ECB) # 初始化加密器
def encrypt(self, text):
aes = self.aes()
return str(base64.encodebytes(aes.encrypt(self.to_16(text))),
encoding='utf8').replace('\n', '') # 加密
def decrypt(self, text):
aes = self.aes()
return str(aes.decrypt(base64.decodebytes(bytes(text, encoding='utf8'))).rstrip(b'\0').decode("utf8")) # 解密
def get_aes_crypto(key=None):
if key is None:
key = settings.SECRET_KEY
a = AESCrypto(key)
return a
aes = get_aes_crypto()