update pipeline module

4.0
vapao 2023-03-03 09:12:22 +08:00
parent 903b104c60
commit 7f5c037c85
4 changed files with 93 additions and 10 deletions

View File

@ -1,14 +1,19 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.conf import settings
from apps.credential.models import Credential
from apps.host.models import Host
from libs.utils import AttrDict, human_seconds_time
from libs.utils import AttrDict, str_decode
from libs.gitlib import RemoteGit
from apps.pipeline.helper import Helper
from functools import partial, partialmethod
from apps.setting.utils import AppSetting
from functools import partial
from threading import Thread
from concurrent import futures
from uuid import uuid4
import subprocess
import tempfile
import time
import os
@ -20,6 +25,7 @@ class NodeExecutor:
self.nodes = {x.id: x for x in map(AttrDict, nodes)}
self.node = AttrDict(nodes[0])
self.helper = Helper.make(self.rds, self.rds_key)
self.max_workers = max(10, os.cpu_count() * 5)
def run(self, node=None, state=None):
print(node, state)
@ -41,6 +47,8 @@ class NodeExecutor:
self._do_build(node)
elif node.module == 'ssh_exec':
self._do_ssh_exec(node)
elif node.module == 'data_transfer':
self._do_data_transfer(node)
def _do_build(self, node, marker=None):
# if node.mode == 'branch':
@ -64,9 +72,8 @@ class NodeExecutor:
def _do_ssh_exec(self, node):
threads = []
max_workers = max(10, os.cpu_count() * 5)
self.helper.send_status(node.id, 'processing')
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for host in Host.objects.filter(id__in=node.targets):
t = executor.submit(self._ssh_exec, host, node)
threads.append(t)
@ -84,3 +91,76 @@ class NodeExecutor:
if is_success:
self.helper.send_success(key, '执行结束', start_time=timestamp)
return is_success
def _do_data_transfer(self, node):
self.helper.send_info(node.id, '开始执行\r\n', 'processing')
node.source = source = AttrDict(node.source)
node.destination = destination = AttrDict(node.destination)
host = Host.objects.get(pk=source.target)
with host.get_ssh() as ssh:
code, _ = ssh.exec_command_raw(f'[ -d {source.path} ]')
if code == 0:
local_dir = os.path.join(settings.TRANSFER_DIR, uuid4().hex)
os.makedirs(local_dir)
with tempfile.NamedTemporaryFile(mode='w') as fp:
fp.write(host.pkey or AppSetting.get('private_key'))
fp.flush()
target = f'{host.username}@{host.hostname}:{source.path}'
command = f'sshfs -o ro -o ssh_command="ssh -p {host.port} -i {fp.name}" {target} {local_dir}'
task = subprocess.run(command, shell=True, capture_output=True)
if task.returncode != 0:
os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}')
return self.helper.send_error(node.id, task.stderr.decode())
threads = []
with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for host in Host.objects.filter(id__in=destination.targets):
t = executor.submit(self._data_transfer_dir, node, host, local_dir)
threads.append(t)
results = [x.result() for x in futures.as_completed(threads)]
os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}')
state = 'success' if all(results) else 'error'
self.helper.send_status(node.id, state)
self.run(node, state)
else:
self._data_transfer_file()
def _data_transfer_dir(self, node, host, local_dir):
timestamp = time.time()
key = f'{node.id}.{host.id}'
remote_dir = node.destination.path
self.helper.send_info(key, '开始传输数据\r\n', 'processing')
with tempfile.NamedTemporaryFile(mode='w') as fp:
fp.write(host.pkey or AppSetting.get('private_key'))
fp.write('\n')
fp.flush()
options = '-avz --progress -h'
argument = f'{local_dir}/ {host.username}@{host.hostname}:{remote_dir}'
command = f'rsync {options} -e "ssh -p {host.port} -o StrictHostKeyChecking=no -i {fp.name}" {argument}'
task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
message = b''
while True:
output = task.stdout.read(1)
if not output:
break
if output in (b'\r', b'\n'):
message += b'\r\n' if output == b'\n' else b'\r'
message = str_decode(message)
if 'rsync: command not found' in message:
data = '\r\n\x1b[31m检测到该主机未安装rsync可通过批量执行/执行任务模块进行以下命令批量安装\x1b[0m'
data += '\r\nCentos/Redhat: yum install -y rsync'
data += '\r\nUbuntu/Debian: apt install -y rsync'
self.helper.send_error(key, data)
break
self.helper.send(key, message)
message = b''
else:
message += output
status = task.wait()
if status == 0:
self.helper.send_success(key, '传输完成', start_time=timestamp)
return status == 0
def _data_transfer_file(self):
pass

View File

@ -83,11 +83,14 @@ class DoView(View):
nodes, ids = json.loads(pipe.nodes), set()
for item in filter(lambda x: x['module'] == 'ssh_exec', nodes):
ids.update(item['targets'])
for item in filter(lambda x: x['module'] == 'data_transfer', nodes):
ids.update(item['destination']['targets'])
host_map = {x.id: f'{x.name}({x.hostname})' for x in Host.objects.filter(id__in=ids)}
for item in filter(lambda x: x['module'] == 'ssh_exec', nodes):
item['targets'] = [{'id': x, 'name': host_map[x]} for x in item['targets']]
item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['targets']]
for item in filter(lambda x: x['module'] == 'data_transfer', nodes):
item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['destination']['targets']]
rds = get_redis_connection()
executor = NodeExecutor(rds, history.deploy_key, json.loads(pipe.nodes))
Thread(target=executor.run).start()

View File

@ -129,8 +129,8 @@ function Body() {
<CodeOutlined className={css.icon} onClick={() => openTerminal()}/>
</Tooltip>
</div>
{S.node?.module === 'ssh_exec' && (
<Tabs items={(S.node?.targets ?? []).map(x => ({label: x.name, key: `${S.node.id}.${x.id}`}))}
{['ssh_exec', 'data_transfer'].includes(S.node?.module) && (
<Tabs items={(S.node?._targets ?? []).map(x => ({label: x.name, key: `${S.node.id}.${x.id}`}))}
tabBarStyle={{fontSize: 13}} onChange={v => S.node = Object.assign({}, S.node, {_id: v})}/>
)}
<div className={css.termContainer}>

View File

@ -12,8 +12,8 @@ import css from './sider.module.less';
function Sider() {
function handleClick(node) {
if (node.module === 'ssh_exec') {
node._id = `${node.id}.${node.targets[0].id}`
if (['ssh_exec', 'data_transfer'].includes(node.module)) {
node._id = `${node.id}.${node._targets[0].id}`
}
S.node = node
}