mirror of https://github.com/jumpserver/jumpserver
perf: 优化代码表达
parent
8772cd8c71
commit
51820f23bf
|
@ -10,7 +10,7 @@ class Device:
|
||||||
_driver = None
|
_driver = None
|
||||||
__device = None
|
__device = None
|
||||||
|
|
||||||
def open(self, driver_path="./libpiico_ccmu"):
|
def open(self, driver_path="./libpiico_ccmu.so"):
|
||||||
# load driver
|
# load driver
|
||||||
self.__load_driver(driver_path)
|
self.__load_driver(driver_path)
|
||||||
# open device
|
# open device
|
||||||
|
@ -20,14 +20,14 @@ class Device:
|
||||||
if self.__device is None:
|
if self.__device is None:
|
||||||
raise Exception("device not turned on")
|
raise Exception("device not turned on")
|
||||||
ret = self._driver.SDF_CloseDevice(self.__device)
|
ret = self._driver.SDF_CloseDevice(self.__device)
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise Exception("turn off device failed")
|
raise Exception("turn off device failed")
|
||||||
self.__device = None
|
self.__device = None
|
||||||
|
|
||||||
def new_session(self):
|
def new_session(self):
|
||||||
session = c_void_p()
|
session = c_void_p()
|
||||||
ret = self._driver.SDF_OpenSession(self.__device, pointer(session))
|
ret = self._driver.SDF_OpenSession(self.__device, pointer(session))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise Exception("create session failed")
|
raise Exception("create session failed")
|
||||||
return Session(self._driver, session)
|
return Session(self._driver, session)
|
||||||
|
|
||||||
|
@ -65,6 +65,6 @@ class Device:
|
||||||
def __open_device(self):
|
def __open_device(self):
|
||||||
device = c_void_p()
|
device = c_void_p()
|
||||||
ret = self._driver.SDF_OpenDevice(pointer(device))
|
ret = self._driver.SDF_OpenDevice(pointer(device))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("open piico device failed", ret)
|
raise PiicoError("open piico device failed", ret)
|
||||||
self.__device = device
|
self.__device = device
|
||||||
|
|
|
@ -17,7 +17,7 @@ class Session(SM2Mixin, SM3Mixin, SM4Mixin):
|
||||||
def generate_random(self, length=64):
|
def generate_random(self, length=64):
|
||||||
random_data = (c_ubyte * length)()
|
random_data = (c_ubyte * length)()
|
||||||
ret = self._driver.SDF_GenerateRandom(self._session, c_int(length), random_data)
|
ret = self._driver.SDF_GenerateRandom(self._session, c_int(length), random_data)
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("generate random error", ret)
|
raise PiicoError("generate random error", ret)
|
||||||
return bytes(random_data)
|
return bytes(random_data)
|
||||||
|
|
||||||
|
@ -26,11 +26,11 @@ class Session(SM2Mixin, SM3Mixin, SM4Mixin):
|
||||||
private_key = ECCrefPrivateKey()
|
private_key = ECCrefPrivateKey()
|
||||||
ret = self._driver.SDF_GenerateKeyPair_ECC(self._session, c_int(alg_id), c_int(256), pointer(public_key),
|
ret = self._driver.SDF_GenerateKeyPair_ECC(self._session, c_int(alg_id), c_int(256), pointer(public_key),
|
||||||
pointer(private_key))
|
pointer(private_key))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("generate ecc key pair failed", ret)
|
raise PiicoError("generate ecc key pair failed", ret)
|
||||||
return ECCKeyPair(public_key.encode(), private_key.encode())
|
return ECCKeyPair(public_key.encode(), private_key.encode())
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
ret = self._driver.SDF_CloseSession(self._session)
|
ret = self._driver.SDF_CloseSession(self._session)
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("close session failed", ret)
|
raise PiicoError("close session failed", ret)
|
||||||
|
|
|
@ -25,7 +25,7 @@ class SM2Mixin(BaseMixin):
|
||||||
ecc_data = new_ecc_cipher_cla(len(plain_text))()
|
ecc_data = new_ecc_cipher_cla(len(plain_text))()
|
||||||
ret = self._driver.SDF_ExternalEncrypt_ECC(self._session, c_int(alg_id), pointer(pk), plain_text,
|
ret = self._driver.SDF_ExternalEncrypt_ECC(self._session, c_int(alg_id), pointer(pk), plain_text,
|
||||||
c_int(len(plain_text)), pointer(ecc_data))
|
c_int(len(plain_text)), pointer(ecc_data))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise Exception("ecc encrypt failed", ret)
|
raise Exception("ecc encrypt failed", ret)
|
||||||
return ecc_data.encode()
|
return ecc_data.encode()
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ class SM2Mixin(BaseMixin):
|
||||||
ret = self._driver.SDF_ExternalDecrypt_ECC(self._session, c_int(alg_id), pointer(vk),
|
ret = self._driver.SDF_ExternalDecrypt_ECC(self._session, c_int(alg_id), pointer(vk),
|
||||||
pointer(ecc_data),
|
pointer(ecc_data),
|
||||||
temp_data, pointer(temp_data_length))
|
temp_data, pointer(temp_data_length))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise Exception("ecc decrypt failed", ret)
|
raise Exception("ecc decrypt failed", ret)
|
||||||
return bytes(temp_data[:temp_data_length.value])
|
return bytes(temp_data[:temp_data_length.value])
|
||||||
|
|
||||||
|
@ -68,20 +68,20 @@ class SM2Mixin(BaseMixin):
|
||||||
class SM3Mixin(BaseMixin):
|
class SM3Mixin(BaseMixin):
|
||||||
def hash_init(self, alg_id):
|
def hash_init(self, alg_id):
|
||||||
ret = self._driver.SDF_HashInit(self._session, c_int(alg_id), None, None, c_int(0))
|
ret = self._driver.SDF_HashInit(self._session, c_int(alg_id), None, None, c_int(0))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("hash init failed,alg id is {}".format(alg_id), ret)
|
raise PiicoError("hash init failed,alg id is {}".format(alg_id), ret)
|
||||||
|
|
||||||
def hash_update(self, data):
|
def hash_update(self, data):
|
||||||
data = (c_ubyte * len(data))(*data)
|
data = (c_ubyte * len(data))(*data)
|
||||||
ret = self._driver.SDF_HashUpdate(self._session, data, c_int(len(data)))
|
ret = self._driver.SDF_HashUpdate(self._session, data, c_int(len(data)))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("hash update failed", ret)
|
raise PiicoError("hash update failed", ret)
|
||||||
|
|
||||||
def hash_final(self):
|
def hash_final(self):
|
||||||
result_data = (c_ubyte * 32)()
|
result_data = (c_ubyte * 32)()
|
||||||
result_length = c_int()
|
result_length = c_int()
|
||||||
ret = self._driver.SDF_HashFinal(self._session, result_data, pointer(result_length))
|
ret = self._driver.SDF_HashFinal(self._session, result_data, pointer(result_length))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("hash final failed", ret)
|
raise PiicoError("hash final failed", ret)
|
||||||
return bytes(result_data[:result_length.value])
|
return bytes(result_data[:result_length.value])
|
||||||
|
|
||||||
|
@ -94,13 +94,13 @@ class SM4Mixin(BaseMixin):
|
||||||
|
|
||||||
key = c_void_p()
|
key = c_void_p()
|
||||||
ret = self._driver.SDF_ImportKey(self._session, key_val, c_int(len(key_val)), pointer(key))
|
ret = self._driver.SDF_ImportKey(self._session, key_val, c_int(len(key_val)), pointer(key))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("import key failed", ret)
|
raise PiicoError("import key failed", ret)
|
||||||
return key
|
return key
|
||||||
|
|
||||||
def destroy_cipher_key(self, key):
|
def destroy_cipher_key(self, key):
|
||||||
ret = self._driver.SDF_DestroyKey(self._session, key)
|
ret = self._driver.SDF_DestroyKey(self._session, key)
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise Exception("destroy key failed")
|
raise Exception("destroy key failed")
|
||||||
|
|
||||||
def encrypt(self, plain_text, key, alg, iv=None):
|
def encrypt(self, plain_text, key, alg, iv=None):
|
||||||
|
@ -119,11 +119,11 @@ class SM4Mixin(BaseMixin):
|
||||||
if encrypt:
|
if encrypt:
|
||||||
ret = self._driver.SDF_Encrypt(self._session, key, c_int(alg), iv, text, c_int(len(text)), temp_data,
|
ret = self._driver.SDF_Encrypt(self._session, key, c_int(alg), iv, text, c_int(len(text)), temp_data,
|
||||||
pointer(temp_data_length))
|
pointer(temp_data_length))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("encrypt failed", ret)
|
raise PiicoError("encrypt failed", ret)
|
||||||
else:
|
else:
|
||||||
ret = self._driver.SDF_Decrypt(self._session, key, c_int(alg), iv, text, c_int(len(text)), temp_data,
|
ret = self._driver.SDF_Decrypt(self._session, key, c_int(alg), iv, text, c_int(len(text)), temp_data,
|
||||||
pointer(temp_data_length))
|
pointer(temp_data_length))
|
||||||
if not ret == 0:
|
if ret != 0:
|
||||||
raise PiicoError("decrypt failed", ret)
|
raise PiicoError("decrypt failed", ret)
|
||||||
return temp_data[:temp_data_length.value]
|
return temp_data[:temp_data_length.value]
|
||||||
|
|
Loading…
Reference in New Issue