diff --git a/spug_api/apps/pipeline/utils.py b/spug_api/apps/pipeline/utils.py index 3663b50..d551aa1 100644 --- a/spug_api/apps/pipeline/utils.py +++ b/spug_api/apps/pipeline/utils.py @@ -1,14 +1,19 @@ # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. +from django.conf import settings from apps.credential.models import Credential from apps.host.models import Host -from libs.utils import AttrDict, human_seconds_time +from libs.utils import AttrDict, str_decode from libs.gitlib import RemoteGit from apps.pipeline.helper import Helper -from functools import partial, partialmethod +from apps.setting.utils import AppSetting +from functools import partial from threading import Thread from concurrent import futures +from uuid import uuid4 +import subprocess +import tempfile import time import os @@ -20,6 +25,7 @@ class NodeExecutor: self.nodes = {x.id: x for x in map(AttrDict, nodes)} self.node = AttrDict(nodes[0]) self.helper = Helper.make(self.rds, self.rds_key) + self.max_workers = max(10, os.cpu_count() * 5) def run(self, node=None, state=None): print(node, state) @@ -41,6 +47,8 @@ class NodeExecutor: self._do_build(node) elif node.module == 'ssh_exec': self._do_ssh_exec(node) + elif node.module == 'data_transfer': + self._do_data_transfer(node) def _do_build(self, node, marker=None): # if node.mode == 'branch': @@ -64,9 +72,8 @@ class NodeExecutor: def _do_ssh_exec(self, node): threads = [] - max_workers = max(10, os.cpu_count() * 5) self.helper.send_status(node.id, 'processing') - with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: for host in Host.objects.filter(id__in=node.targets): t = executor.submit(self._ssh_exec, host, node) threads.append(t) @@ -84,3 +91,76 @@ class NodeExecutor: if is_success: self.helper.send_success(key, '执行结束', start_time=timestamp) return is_success + + def _do_data_transfer(self, node): + self.helper.send_info(node.id, '开始执行\r\n', 'processing') + node.source = source = AttrDict(node.source) + node.destination = destination = AttrDict(node.destination) + host = Host.objects.get(pk=source.target) + with host.get_ssh() as ssh: + code, _ = ssh.exec_command_raw(f'[ -d {source.path} ]') + if code == 0: + local_dir = os.path.join(settings.TRANSFER_DIR, uuid4().hex) + os.makedirs(local_dir) + with tempfile.NamedTemporaryFile(mode='w') as fp: + fp.write(host.pkey or AppSetting.get('private_key')) + fp.flush() + target = f'{host.username}@{host.hostname}:{source.path}' + command = f'sshfs -o ro -o ssh_command="ssh -p {host.port} -i {fp.name}" {target} {local_dir}' + task = subprocess.run(command, shell=True, capture_output=True) + if task.returncode != 0: + os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}') + return self.helper.send_error(node.id, task.stderr.decode()) + + threads = [] + with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for host in Host.objects.filter(id__in=destination.targets): + t = executor.submit(self._data_transfer_dir, node, host, local_dir) + threads.append(t) + results = [x.result() for x in futures.as_completed(threads)] + os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}') + state = 'success' if all(results) else 'error' + self.helper.send_status(node.id, state) + self.run(node, state) + else: + self._data_transfer_file() + + def _data_transfer_dir(self, node, host, local_dir): + timestamp = time.time() + key = f'{node.id}.{host.id}' + remote_dir = node.destination.path + self.helper.send_info(key, '开始传输数据\r\n', 'processing') + with tempfile.NamedTemporaryFile(mode='w') as fp: + fp.write(host.pkey or AppSetting.get('private_key')) + fp.write('\n') + fp.flush() + + options = '-avz --progress -h' + argument = f'{local_dir}/ {host.username}@{host.hostname}:{remote_dir}' + command = f'rsync {options} -e "ssh -p {host.port} -o StrictHostKeyChecking=no -i {fp.name}" {argument}' + task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + message = b'' + while True: + output = task.stdout.read(1) + if not output: + break + if output in (b'\r', b'\n'): + message += b'\r\n' if output == b'\n' else b'\r' + message = str_decode(message) + if 'rsync: command not found' in message: + data = '\r\n\x1b[31m检测到该主机未安装rsync,可通过批量执行/执行任务模块进行以下命令批量安装\x1b[0m' + data += '\r\nCentos/Redhat: yum install -y rsync' + data += '\r\nUbuntu/Debian: apt install -y rsync' + self.helper.send_error(key, data) + break + self.helper.send(key, message) + message = b'' + else: + message += output + status = task.wait() + if status == 0: + self.helper.send_success(key, '传输完成', start_time=timestamp) + return status == 0 + + def _data_transfer_file(self): + pass \ No newline at end of file diff --git a/spug_api/apps/pipeline/views.py b/spug_api/apps/pipeline/views.py index 8099aa6..7ce5eec 100644 --- a/spug_api/apps/pipeline/views.py +++ b/spug_api/apps/pipeline/views.py @@ -83,11 +83,14 @@ class DoView(View): nodes, ids = json.loads(pipe.nodes), set() for item in filter(lambda x: x['module'] == 'ssh_exec', nodes): ids.update(item['targets']) + for item in filter(lambda x: x['module'] == 'data_transfer', nodes): + ids.update(item['destination']['targets']) host_map = {x.id: f'{x.name}({x.hostname})' for x in Host.objects.filter(id__in=ids)} for item in filter(lambda x: x['module'] == 'ssh_exec', nodes): - item['targets'] = [{'id': x, 'name': host_map[x]} for x in item['targets']] - + item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['targets']] + for item in filter(lambda x: x['module'] == 'data_transfer', nodes): + item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['destination']['targets']] rds = get_redis_connection() executor = NodeExecutor(rds, history.deploy_key, json.loads(pipe.nodes)) Thread(target=executor.run).start() diff --git a/spug_web/src/pages/pipeline/console/Body.js b/spug_web/src/pages/pipeline/console/Body.js index 152c78f..0d7911e 100644 --- a/spug_web/src/pages/pipeline/console/Body.js +++ b/spug_web/src/pages/pipeline/console/Body.js @@ -129,8 +129,8 @@ function Body() { openTerminal()}/> - {S.node?.module === 'ssh_exec' && ( - ({label: x.name, key: `${S.node.id}.${x.id}`}))} + {['ssh_exec', 'data_transfer'].includes(S.node?.module) && ( + ({label: x.name, key: `${S.node.id}.${x.id}`}))} tabBarStyle={{fontSize: 13}} onChange={v => S.node = Object.assign({}, S.node, {_id: v})}/> )}
diff --git a/spug_web/src/pages/pipeline/console/Sider.js b/spug_web/src/pages/pipeline/console/Sider.js index 29b9ad2..7f746ea 100644 --- a/spug_web/src/pages/pipeline/console/Sider.js +++ b/spug_web/src/pages/pipeline/console/Sider.js @@ -12,8 +12,8 @@ import css from './sider.module.less'; function Sider() { function handleClick(node) { - if (node.module === 'ssh_exec') { - node._id = `${node.id}.${node.targets[0].id}` + if (['ssh_exec', 'data_transfer'].includes(node.module)) { + node._id = `${node.id}.${node._targets[0].id}` } S.node = node }