# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. from paramiko.client import SSHClient, AutoAddPolicy from paramiko.rsakey import RSAKey from paramiko.ssh_exception import AuthenticationException from io import StringIO from uuid import uuid4 import time import re KILLER = ''' function kill_tree { local pid=$1 local and_self=${2:-0} local children=$(pgrep -P $pid) for child in $children; do kill_tree $child 1 done if [ $and_self -eq 1 ]; then kill $pid fi } kill_tree %s ''' class SSH: def __init__(self, host, credential, environment=None): self.client = None self.channel = None self.sftp = None self.exec_file = f'/tmp/spug.{uuid4().hex}' self.pid = None self.environment = environment self.eof = 'Spug EOF 2108111926' self.regex = re.compile(r'(?= 100: self.client.close() raise Exception('Wait spug response timeout') else: counter += 1 time.sleep(0.1) def _get_sftp(self): if self.sftp: return self.sftp self.sftp = self.client.open_sftp() return self.sftp def _make_env_command(self, environment): if not environment: return '' str_envs = [] for k, v in environment.items(): k = k.replace('-', '_') if isinstance(v, str): v = v.replace("'", "'\"'\"'") str_envs.append(f"{k}='{v}'") str_envs = ' '.join(str_envs) return f'export {str_envs}\n' def _handle_command(self, command, environment): new_command = self._make_env_command(environment) new_command += command new_command += f'\necho {self.eof} $?\n' self.put_file_by_fl(StringIO(new_command), self.exec_file) return f'. {self.exec_file}\n' def _decode(self, content): try: content = content.decode() except UnicodeDecodeError: content = content.decode(encoding='GBK', errors='ignore') return content def __enter__(self): self._initial() return self def __exit__(self, *args, **kwargs): self.channel.close() self.client.close() self.client = None