fix: 特定key paramiko 测试可连接性失败

pull/12443/head
feng 2023-12-26 19:13:16 +08:00 committed by Bryan
parent af44ffab0a
commit a4d0e3fd17
1 changed files with 23 additions and 3 deletions

View File

@ -2,8 +2,11 @@ import re
import time
import paramiko
from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey
from sshtunnel import SSHTunnelForwarder
KEY_CLASSES = (RSAKey, DSSKey, ECDSAKey, Ed25519Key)
def common_argument_spec():
options = dict(
@ -39,21 +42,38 @@ class SSHClient:
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.connect_params = self.get_connect_params()
@staticmethod
def get_encrypt_cls(key_path):
for key_cls in KEY_CLASSES:
try:
key_cls.from_private_key_file(key_path)
return key_cls
except paramiko.SSHException:
continue
raise paramiko.SSHException('Invalid key file')
def get_pkey(self, key_path):
if not key_path:
return None
key_cls = self.get_encrypt_cls(key_path)
return key_cls.from_private_key_file(key_path)
def get_connect_params(self):
params = {
'allow_agent': False, 'look_for_keys': False,
'hostname': self.module.params['login_host'],
'port': self.module.params['login_port'],
'key_filename': self.module.params['login_private_key_path'] or None
# TODO: https://github.com/paramiko/paramiko/issues/2048
'pkey': self.get_pkey(self.module.params['login_private_key_path'])
}
if self.module.params['become']:
params['username'] = self.module.params['become_user']
params['password'] = self.module.params['become_password']
params['key_filename'] = self.module.params['become_private_key_path'] or None
params['pkey'] = self.get_pkey(self.module.params['become_private_key_path'])
else:
params['username'] = self.module.params['login_user']
params['password'] = self.module.params['login_password']
params['key_filename'] = self.module.params['login_private_key_path'] or None
params['pkey'] = self.get_pkey(self.module.params['login_private_key_path'])
return params
def _get_channel(self):