mirror of https://github.com/jumpserver/jumpserver
perf: 修改一些拼写
parent
a70f85e346
commit
fbea1f3480
|
@ -7,6 +7,7 @@ import os
|
|||
import re
|
||||
import time
|
||||
from io import StringIO
|
||||
import logging
|
||||
|
||||
import paramiko
|
||||
import sshpubkeys
|
||||
|
@ -74,13 +75,13 @@ _supported_paramiko_ssh_key_types = (paramiko.RSAKey, paramiko.DSSKey, paramiko.
|
|||
def ssh_key_string_to_obj(text, password=None):
|
||||
key = None
|
||||
for ssh_key_type in _supported_paramiko_ssh_key_types:
|
||||
if not isinstance(ssh_key_type, paramiko.PKey):
|
||||
continue
|
||||
try:
|
||||
key = ssh_key_type.from_private_key(StringIO(text), password=password)
|
||||
return key
|
||||
except paramiko.SSHException:
|
||||
pass
|
||||
if key is None:
|
||||
raise ValueError('Invalid private key')
|
||||
return key
|
||||
|
||||
|
||||
|
@ -152,9 +153,11 @@ def parse_ssh_private_key_str(text: bytes, password=None) -> str:
|
|||
private_key = _parse_ssh_private_key(text, password=password)
|
||||
if private_key is None:
|
||||
return ""
|
||||
private_key_bytes = private_key.private_bytes(serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.OpenSSH,
|
||||
serialization.NoEncryption())
|
||||
private_key_bytes = private_key.private_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PrivateFormat.OpenSSH,
|
||||
serialization.NoEncryption()
|
||||
)
|
||||
return private_key_bytes.decode('utf-8')
|
||||
|
||||
|
||||
|
@ -193,6 +196,7 @@ def _parse_ssh_private_key(text, password=None):
|
|||
private_key = serialization.load_ssh_private_key(text, password=password)
|
||||
return private_key
|
||||
except (ValueError, TypeError):
|
||||
logging.error("Invalid private key")
|
||||
pass
|
||||
return None
|
||||
|
||||
|
|
Loading…
Reference in New Issue