feat: support ed25519 key

pull/9123/head
Eric 2022-11-25 18:06:22 +08:00
parent 276f644794
commit 608e0c9f26
3 changed files with 89 additions and 53 deletions

View File

@ -1,23 +1,22 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import io
import os import os
import sshpubkeys
from hashlib import md5 from hashlib import md5
from django.db import models import sshpubkeys
from django.conf import settings from django.conf import settings
from django.utils import timezone from django.db import models
from django.db.models import QuerySet from django.db.models import QuerySet
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from assets.const import Connectivity, SecretType
from common.db import fields
from common.utils import ( from common.utils import (
ssh_key_string_to_obj, ssh_key_gen, get_logger, ssh_key_string_to_obj, ssh_key_gen, get_logger,
random_string, ssh_pubkey_gen, lazyproperty random_string, lazyproperty, parse_ssh_public_key_str
) )
from common.db import fields
from orgs.mixins.models import JMSOrgBaseModel from orgs.mixins.models import JMSOrgBaseModel
from assets.const import Connectivity, SecretType
logger = get_logger(__file__) logger = get_logger(__file__)
@ -88,7 +87,7 @@ class BaseAccount(JMSOrgBaseModel):
@lazyproperty @lazyproperty
def public_key(self): def public_key(self):
if self.secret_type == SecretType.SSH_KEY: if self.secret_type == SecretType.SSH_KEY:
return ssh_pubkey_gen(private_key=self.private_key) return parse_ssh_public_key_str(self.private_key)
return None return None
@property @property
@ -97,7 +96,7 @@ class BaseAccount(JMSOrgBaseModel):
public_key = self.public_key public_key = self.public_key
elif self.private_key: elif self.private_key:
try: try:
public_key = ssh_pubkey_gen(private_key=self.private_key) public_key = parse_ssh_public_key_str(self.private_key)
except IOError as e: except IOError as e:
return str(e) return str(e)
else: else:
@ -129,12 +128,9 @@ class BaseAccount(JMSOrgBaseModel):
return key_path return key_path
def get_private_key(self): def get_private_key(self):
if not self.private_key_obj: if not self.private_key:
return None return None
string_io = io.StringIO() return self.private_key
self.private_key_obj.write_private_key(string_io)
private_key = string_io.getvalue()
return private_key
@property @property
def public_key_obj(self): def public_key_obj(self):

View File

@ -1,9 +1,7 @@
from io import StringIO
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers from rest_framework import serializers
from common.utils import ssh_private_key_gen, validate_ssh_private_key from common.utils import validate_ssh_private_key, parse_ssh_private_key_str
def validate_password_for_ansible(password): def validate_password_for_ansible(password):
@ -24,9 +22,4 @@ def validate_ssh_key(ssh_key, passphrase=None):
valid = validate_ssh_private_key(ssh_key, password=passphrase) valid = validate_ssh_private_key(ssh_key, password=passphrase)
if not valid: if not valid:
raise serializers.ValidationError(_("private key invalid or passphrase error")) raise serializers.ValidationError(_("private key invalid or passphrase error"))
return parse_ssh_private_key_str(ssh_key, passphrase)
ssh_key = ssh_private_key_gen(ssh_key, password=passphrase)
string_io = StringIO()
ssh_key.write_private_key(string_io)
ssh_key = string_io.getvalue()
return ssh_key

View File

@ -1,24 +1,23 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import re
import json
from six import string_types
import base64 import base64
import os
import time
import hashlib import hashlib
import json
import os
import re
import time
from io import StringIO from io import StringIO
from itertools import chain
import paramiko import paramiko
import sshpubkeys import sshpubkeys
from cryptography.hazmat.primitives import serialization
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from itsdangerous import ( from itsdangerous import (
TimedJSONWebSignatureSerializer, JSONWebSignatureSerializer, TimedJSONWebSignatureSerializer, JSONWebSignatureSerializer,
BadSignature, SignatureExpired BadSignature, SignatureExpired
) )
from django.conf import settings from six import string_types
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models.fields.files import FileField
from .http import http_date from .http import http_date
@ -69,22 +68,19 @@ class Signer(metaclass=Singleton):
return None return None
_supported_paramiko_ssh_key_types = (paramiko.RSAKey, paramiko.DSSKey, paramiko.Ed25519Key)
def ssh_key_string_to_obj(text, password=None): def ssh_key_string_to_obj(text, password=None):
key = None key = None
try: for ssh_key_type in _supported_paramiko_ssh_key_types:
key = paramiko.RSAKey.from_private_key(StringIO(text), password=password) if not isinstance(ssh_key_type, paramiko.PKey):
except paramiko.SSHException: continue
pass try:
else: key = ssh_key_type.from_private_key(StringIO(text), password=password)
return key return key
except paramiko.SSHException:
try: pass
key = paramiko.DSSKey.from_private_key(StringIO(text), password=password)
except paramiko.SSHException:
pass
else:
return key
return key return key
@ -137,17 +133,68 @@ def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', h
def validate_ssh_private_key(text, password=None): def validate_ssh_private_key(text, password=None):
if isinstance(text, bytes): if isinstance(text, str):
try: try:
text = text.decode("utf-8") text = text.encode("utf-8")
except UnicodeDecodeError:
return False
if isinstance(password, str):
try:
password = password.encode("utf-8")
except UnicodeDecodeError: except UnicodeDecodeError:
return False return False
key = ssh_key_string_to_obj(text, password=password) key = parse_ssh_private_key_str(text, password=password)
if key is None: return bool(key)
return False
else:
return True def parse_ssh_private_key_str(text: bytes, password=None) -> str:
private_key = _parse_ssh_private_key(text, password=password)
if private_key is None:
return ""
private_key_bytes = private_key.private_bytes(serialization.Encoding.PEM,
serialization.PrivateFormat.OpenSSH,
serialization.NoEncryption())
return private_key_bytes.decode('utf-8')
def parse_ssh_public_key_str(text: bytes = "", password=None) -> str:
private_key = _parse_ssh_private_key(text, password=password)
if private_key is None:
return ""
public_key_bytes = private_key.public_key().public_bytes(serialization.Encoding.OpenSSH,
serialization.PublicFormat.OpenSSH)
return public_key_bytes.decode('utf-8')
def _parse_ssh_private_key(text, password=None):
"""
text: bytes
password: str
return:private key types:
ec.EllipticCurvePrivateKey,
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ed25519.Ed25519PrivateKey,
"""
if isinstance(text, str):
try:
text = text.encode("utf-8")
except UnicodeDecodeError:
return None
if password is not None:
if isinstance(password, str):
try:
password = password.encode("utf-8")
except UnicodeDecodeError:
return None
try:
private_key = serialization.load_ssh_private_key(text, password=password)
return private_key
except (ValueError, TypeError):
pass
return None
def validate_ssh_public_key(text): def validate_ssh_public_key(text):