From 40f0c100f3cf5e6f24c9569c96c043ba3f22a1c2 Mon Sep 17 00:00:00 2001 From: vapao Date: Fri, 3 Mar 2023 15:40:29 +0800 Subject: [PATCH] update pipeline module --- spug_api/apps/pipeline/helper.py | 4 +- spug_api/apps/pipeline/utils.py | 99 ++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/spug_api/apps/pipeline/helper.py b/spug_api/apps/pipeline/helper.py index b40088e..047cac9 100644 --- a/spug_api/apps/pipeline/helper.py +++ b/spug_api/apps/pipeline/helper.py @@ -172,9 +172,7 @@ class Helper(KitMixin): self.send(key, message, status='success') def send_error(self, key, message, with_break=False): - message = self.term_message(message, 'error') - if not message.endswith('\r\n'): - message += '\r\n' + message = self.term_message(f'\r\n{message}', 'error') self.send(key, message, status='error') if with_break: raise SpugError diff --git a/spug_api/apps/pipeline/utils.py b/spug_api/apps/pipeline/utils.py index d551aa1..1cfbda7 100644 --- a/spug_api/apps/pipeline/utils.py +++ b/spug_api/apps/pipeline/utils.py @@ -99,31 +99,39 @@ class NodeExecutor: host = Host.objects.get(pk=source.target) with host.get_ssh() as ssh: code, _ = ssh.exec_command_raw(f'[ -d {source.path} ]') - if code == 0: - local_dir = os.path.join(settings.TRANSFER_DIR, uuid4().hex) - os.makedirs(local_dir) - with tempfile.NamedTemporaryFile(mode='w') as fp: - fp.write(host.pkey or AppSetting.get('private_key')) - fp.flush() - target = f'{host.username}@{host.hostname}:{source.path}' - command = f'sshfs -o ro -o ssh_command="ssh -p {host.port} -i {fp.name}" {target} {local_dir}' - task = subprocess.run(command, shell=True, capture_output=True) - if task.returncode != 0: - os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}') - return self.helper.send_error(node.id, task.stderr.decode()) + if code == 0: + local_dir = os.path.join(settings.TRANSFER_DIR, uuid4().hex) + os.makedirs(local_dir) + with tempfile.NamedTemporaryFile(mode='w') as fp: + fp.write(host.pkey or AppSetting.get('private_key')) + fp.flush() + target = f'{host.username}@{host.hostname}:{source.path}' + command = f'sshfs -o ro -o ssh_command="ssh -p {host.port} -i {fp.name}" {target} {local_dir}' + task = subprocess.run(command, shell=True, capture_output=True) + if task.returncode != 0: + os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}') + return self.helper.send_error(node.id, task.stderr.decode()) - threads = [] - with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: - for host in Host.objects.filter(id__in=destination.targets): - t = executor.submit(self._data_transfer_dir, node, host, local_dir) - threads.append(t) - results = [x.result() for x in futures.as_completed(threads)] - os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}') - state = 'success' if all(results) else 'error' - self.helper.send_status(node.id, state) - self.run(node, state) - else: - self._data_transfer_file() + threads = [] + with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for host in Host.objects.filter(id__in=destination.targets): + t = executor.submit(self._data_transfer_dir, node, host, local_dir) + threads.append(t) + results = [x.result() for x in futures.as_completed(threads)] + os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}') + state = 'success' if all(results) else 'error' + self.helper.send_status(node.id, state) + self.run(node, state) + else: + threads = [] + with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for dst_host in Host.objects.filter(id__in=destination.targets): + t = executor.submit(self._data_transfer_file, node, host, dst_host) + threads.append(t) + results = [x.result() for x in futures.as_completed(threads)] + state = 'success' if all(results) else 'error' + self.helper.send_status(node.id, state) + self.run(node, state) def _data_transfer_dir(self, node, host, local_dir): timestamp = time.time() @@ -157,10 +165,43 @@ class NodeExecutor: message = b'' else: message += output - status = task.wait() - if status == 0: + code = task.wait() + if code == 0: self.helper.send_success(key, '传输完成', start_time=timestamp) - return status == 0 + else: + self.helper.send_error(key, f'exit code: {code}') + return code == 0 - def _data_transfer_file(self): - pass \ No newline at end of file + def _data_transfer_file(self, node, src_host, dst_host): + timestamp = time.time() + key = f'{node.id}.{dst_host.id}' + src_file = node.source.path + remote_path = node.destination.path + self.helper.send_info(key, '开始传输数据\r\n', 'processing') + with tempfile.NamedTemporaryFile(mode='w') as fp: + fp.write(AppSetting.get('private_key')) + fp.write('\n') + fp.flush() + + arg_src = f'scp://{src_host.username}@{src_host.hostname}:{src_host.port}/{src_file}' + arg_dst = f'scp://{dst_host.username}@{dst_host.hostname}:{dst_host.port}/{remote_path}' + command = f'scp -3 -o StrictHostKeyChecking=no -i {fp.name} {arg_src} {arg_dst}' + task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + message = b'' + while True: + output = task.stdout.read(1) + if not output: + break + if output in (b'\r', b'\n'): + message += b'\r\n' if output == b'\n' else b'\r' + message = str_decode(message) + self.helper.send(key, message) + message = b'' + else: + message += output + code = task.wait() + if code == 0: + self.helper.send_success(key, '传输完成', start_time=timestamp) + else: + self.helper.send_error(key, f'exit code: {code}') + return code == 0