From 6809eb95cdd8ca83da5c0e7e6cd886b477e57a63 Mon Sep 17 00:00:00 2001 From: vapao Date: Sun, 12 Mar 2023 01:08:34 +0800 Subject: [PATCH] update pipeline module --- spug_api/apps/pipeline/urls.py | 3 +- spug_api/apps/pipeline/utils.py | 24 +++++++- spug_api/apps/pipeline/views.py | 45 ++++++++++---- spug_web/src/pages/pipeline/console/Ask.js | 38 ++++++------ spug_web/src/pages/pipeline/console/Body.js | 8 +-- spug_web/src/pages/pipeline/console/Sider.js | 2 +- spug_web/src/pages/pipeline/console/index.js | 13 ++-- .../src/pages/pipeline/modules/DataUpload.js | 60 +++++++++++++++++++ .../src/pages/pipeline/modules/Parameter.js | 20 +------ spug_web/src/pages/pipeline/modules/index.js | 3 + spug_web/src/pages/pipeline/store.js | 3 +- 11 files changed, 155 insertions(+), 64 deletions(-) create mode 100644 spug_web/src/pages/pipeline/modules/DataUpload.js diff --git a/spug_api/apps/pipeline/urls.py b/spug_api/apps/pipeline/urls.py index 48ab43b..c18c82b 100644 --- a/spug_api/apps/pipeline/urls.py +++ b/spug_api/apps/pipeline/urls.py @@ -3,9 +3,10 @@ # Released under the AGPL-3.0 License. from django.urls import path -from apps.pipeline.views import PipeView, DoView +from apps.pipeline.views import PipeView, DoView, handle_data_upload urlpatterns = [ path('', PipeView.as_view()), + path('upload/', handle_data_upload), path('do/', DoView.as_view()), ] diff --git a/spug_api/apps/pipeline/utils.py b/spug_api/apps/pipeline/utils.py index 16cb598..85d6a91 100644 --- a/spug_api/apps/pipeline/utils.py +++ b/spug_api/apps/pipeline/utils.py @@ -12,9 +12,11 @@ from apps.setting.utils import AppSetting from functools import partial from threading import Thread from concurrent import futures +from pathlib import Path from uuid import uuid4 import subprocess import tempfile +import shutil import time import os @@ -50,6 +52,8 @@ class NodeExecutor: self._do_ssh_exec(node) elif node.module == 'data_transfer': self._do_data_transfer(node) + elif node.module == 'data_upload': + self._do_data_upload(node) elif node.module == 'parameter': self._do_parameter(node) @@ -149,7 +153,7 @@ class NodeExecutor: threads = [] with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: for host in Host.objects.filter(id__in=destination.targets): - t = executor.submit(self._data_transfer, node, host, local_path) + t = executor.submit(self._data_transfer, node, host, local_path, destination.path) threads.append(t) results = [x.result() for x in futures.as_completed(threads)] os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}') @@ -157,10 +161,10 @@ class NodeExecutor: self.helper.send_status(node.id, state) self.run(node, state) - def _data_transfer(self, node, host, local_path): + def _data_transfer(self, node, host, local_path, remote_path): + # TODO:支持--delete参数,页面上添加是否删除选项 timestamp = time.time() key = f'{node.id}.{host.id}' - remote_path = node.destination.path self.helper.send_info(key, '开始传输数据\r\n', 'processing') with tempfile.NamedTemporaryFile(mode='w') as fp: fp.write(host.pkey or AppSetting.get('private_key')) @@ -178,3 +182,17 @@ class NodeExecutor: if is_success: self.helper.send_success(key, '传输完成', start_time=timestamp) return is_success + + def _do_data_upload(self, node): + self.helper.send_info(node.id, '开始执行\r\n', 'processing') + local_path = Path(settings.TRANSFER_DIR) / self.token / str(node.id) + threads = [] + with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + for host in Host.objects.filter(id__in=node.targets): + t = executor.submit(self._data_transfer, node, host, f'{local_path}{os.sep}', node.path) + threads.append(t) + results = [x.result() for x in futures.as_completed(threads)] + shutil.rmtree(local_path) + state = 'success' if all(results) else 'error' + self.helper.send_status(node.id, state) + self.run(node, state) diff --git a/spug_api/apps/pipeline/views.py b/spug_api/apps/pipeline/views.py index 2b4b319..e4089fe 100644 --- a/spug_api/apps/pipeline/views.py +++ b/spug_api/apps/pipeline/views.py @@ -2,6 +2,8 @@ # Copyright: (c) # Released under the AGPL-3.0 License. from django.views.generic import View +from django.conf import settings +from django.http.response import HttpResponseBadRequest from django_redis import get_redis_connection from libs import JsonParser, Argument, json_response, auth from libs.utils import AttrDict @@ -9,6 +11,7 @@ from apps.pipeline.models import Pipeline, PipeHistory from apps.pipeline.utils import NodeExecutor from apps.host.models import Host from threading import Thread +from pathlib import Path from uuid import uuid4 import json @@ -83,17 +86,23 @@ class DoView(View): for item in filter(lambda x: x['module'] == 'data_transfer', nodes): ids.update(item['destination']['targets']) - dynamic_params = None + dynamic_params = [] host_map = {x.id: f'{x.name}({x.hostname})' for x in Host.objects.filter(id__in=ids)} for item in nodes: - if item['module'] == 'ssh_exec': + if item['module'] in ('ssh_exec', 'data_upload'): item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['targets']] elif item['module'] == 'data_transfer': item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['destination']['targets']] - elif item['module'] == 'parameter': - dynamic_params = item.get('dynamic_params') - if not dynamic_params: + if item['module'] == 'parameter': + dynamic_params = item.get('dynamic_params') + elif item['module'] == 'data_upload': + dynamic_params.append({'id': item['id'], 'name': item['name'], 'type': 'upload'}) + + token = uuid4().hex + if dynamic_params: + response = AttrDict(token=token, nodes=nodes, dynamic_params=dynamic_params) + else: latest_history = pipe.pipehistory_set.first() ordinal = latest_history.ordinal + 1 if latest_history else 1 history = PipeHistory.objects.create(pipeline=pipe, ordinal=ordinal, created_by=request.user) @@ -101,9 +110,7 @@ class DoView(View): rds = get_redis_connection() executor = NodeExecutor(rds, history.deploy_key, json.loads(pipe.nodes)) Thread(target=executor.run).start() - response = AttrDict(token=history.id, nodes=nodes) - else: - response = AttrDict(nodes=nodes, dynamic_params=dynamic_params) + response = AttrDict(token=token, nodes=nodes) return json_response(response) return json_response(error=error) @@ -111,6 +118,7 @@ class DoView(View): def patch(self, request): form, error = JsonParser( Argument('id', type=int, help='参数错误'), + Argument('token', help='参数错误'), Argument('params', type=dict, help='参数错误'), Argument('cols', type=int, required=False), Argument('rows', type=int, required=False) @@ -131,9 +139,22 @@ class DoView(View): PipeHistory.objects.create(pipeline=pipe, ordinal=ordinal, created_by=request.user) rds = get_redis_connection() - token = uuid4().hex - executor = NodeExecutor(rds, token, nodes) + executor = NodeExecutor(rds, form.token, nodes) Thread(target=executor.run).start() - response = AttrDict(token=token) - return json_response(response) + return json_response() return json_response(error=error) + + +def handle_data_upload(request): + token = request.POST.get('token') + node_id = request.POST.get('id') + file = request.FILES.get('file') + if not all([token, node_id, file]): + return HttpResponseBadRequest('参数错误') + + file_path = Path(settings.TRANSFER_DIR) / token / node_id / file.name + file_path.parent.mkdir(parents=True, exist_ok=True) + with open(file_path, 'wb') as f: + for chunk in file.chunks(): + f.write(chunk) + return json_response('ok') diff --git a/spug_web/src/pages/pipeline/console/Ask.js b/spug_web/src/pages/pipeline/console/Ask.js index 1251573..976a322 100644 --- a/spug_web/src/pages/pipeline/console/Ask.js +++ b/spug_web/src/pages/pipeline/console/Ask.js @@ -3,12 +3,12 @@ * Copyright (c) * Released under the AGPL-3.0 License. */ -import React, {useState} from 'react'; +import React, { useState } from 'react'; import { observer } from 'mobx-react'; -import { Button, Form } from 'antd'; -import { ArrowRightOutlined } from '@ant-design/icons'; +import { Button, Form, Upload } from 'antd'; +import { ArrowRightOutlined, UploadOutlined } from '@ant-design/icons'; import Parameter from '../modules/Parameter'; -import { http } from 'libs'; +import { http, X_TOKEN } from 'libs'; import S from './store'; function Ask(props) { @@ -18,27 +18,31 @@ function Ask(props) { function handleOk() { const params = form.getFieldsValue(); setLoading(true) - http.patch('/api/pipeline/do/', {id: 1, params}) + http.patch('/api/pipeline/do/', {id: 1, token: S.token, params}) .then(res => { - S.token = res.token S.dynamicParams = null }) .finally(() => setLoading(false)) } return ( - + ) : ( + + + + ))} + + + + ) } diff --git a/spug_web/src/pages/pipeline/console/Body.js b/spug_web/src/pages/pipeline/console/Body.js index 0f9e319..e3caf51 100644 --- a/spug_web/src/pages/pipeline/console/Body.js +++ b/spug_web/src/pages/pipeline/console/Body.js @@ -121,17 +121,17 @@ function Body() { ) : wsState === '1' ? ( ) : ( - + )} - {['build', 'ssh_exec', 'data_transfer'].includes(S.node.module) && S.outputs[S.nodeID]?.status === 'processing' ? ( + {['build', 'ssh_exec', 'data_transfer', 'data_upload'].includes(S.node.module) && S.outputs[S.nodeID]?.status === 'processing' ? ( ) : ( )} - {['build', 'ssh_exec', 'data_transfer'].includes(S.node.module) ? ( + {['build', 'ssh_exec', 'data_transfer', 'data_upload'].includes(S.node.module) ? ( openTerminal()}/> @@ -139,7 +139,7 @@ function Body() { )} - {['ssh_exec', 'data_transfer'].includes(S.node?.module) && ( + {['ssh_exec', 'data_transfer', 'data_upload'].includes(S.node?.module) && ( ({label: x.name, key: x.id}))} className={css.tabs} activeKey={S.node._host_id} onChange={handleTabChange}/> )} diff --git a/spug_web/src/pages/pipeline/console/Sider.js b/spug_web/src/pages/pipeline/console/Sider.js index ba7400d..cb17a5a 100644 --- a/spug_web/src/pages/pipeline/console/Sider.js +++ b/spug_web/src/pages/pipeline/console/Sider.js @@ -14,7 +14,7 @@ import css from './sider.module.less'; function Sider() { function handleClick(node) { node = lds.cloneDeep(node) - if (['ssh_exec', 'data_transfer'].includes(node.module)) { + if (['ssh_exec', 'data_transfer', 'data_upload'].includes(node.module)) { node._host_id = node._targets[0].id node._id = `${node.id}.${node._host_id}` } else if (node.module === 'build') { diff --git a/spug_web/src/pages/pipeline/console/index.js b/spug_web/src/pages/pipeline/console/index.js index 5ba3f01..647df08 100644 --- a/spug_web/src/pages/pipeline/console/index.js +++ b/spug_web/src/pages/pipeline/console/index.js @@ -15,27 +15,28 @@ import css from './index.module.less'; function Index() { function handleClose() { S.open = false - S.token = false + S.dynamicParams = null } return ( - - {S.token ? ( + {S.dynamicParams ? ( + + ) : ( - ) : null} + )} ) } diff --git a/spug_web/src/pages/pipeline/modules/DataUpload.js b/spug_web/src/pages/pipeline/modules/DataUpload.js new file mode 100644 index 0000000..8da7056 --- /dev/null +++ b/spug_web/src/pages/pipeline/modules/DataUpload.js @@ -0,0 +1,60 @@ +/** + * Copyright (c) OpenSpug Organization. https://github.com/openspug/spug + * Copyright (c) + * Released under the AGPL-3.0 License. + */ +import React, { useEffect } from 'react'; +import { Form, Input, Radio, InputNumber, message } from 'antd'; +import HostSelector from 'pages/host/Selector'; + +function DataUpload(props) { + const [form] = Form.useForm() + + useEffect(() => { + props.setHandler(() => handleSave) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) + + useEffect(() => { + form.resetFields() + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [props.node]) + + function handleSave() { + const data = form.getFieldsValue() + if (!data.name) return message.error('请输入节点名称') + if (!data.condition) return message.error('请选择节点的执行条件') + if (!data.targets || data.targets.length === 0) return message.error('请选择上传目标主机') + if (!data.path) return message.error('请输入上传路径') + return data + } + + return ( +
+ + + + + + 上游执行成功时 + 上游执行失败时 + 总是执行 + + + + + + + + + + + + + + +
+ ) +} + +export default DataUpload \ No newline at end of file diff --git a/spug_web/src/pages/pipeline/modules/Parameter.js b/spug_web/src/pages/pipeline/modules/Parameter.js index fe0b8a0..43dfdc7 100644 --- a/spug_web/src/pages/pipeline/modules/Parameter.js +++ b/spug_web/src/pages/pipeline/modules/Parameter.js @@ -5,20 +5,8 @@ */ import React, { useEffect, useState } from 'react'; import { observer, useLocalStore } from 'mobx-react'; -import { - Form, - Input, - Button, - Modal, - Switch, - Select, - Popconfirm, - Tooltip, - DatePicker, - Upload, - message -} from 'antd'; -import { EditOutlined, DeleteOutlined, PlusOutlined, QuestionCircleOutlined, UploadOutlined } from '@ant-design/icons'; +import { Form, Input, Button, Modal, Switch, Select, Popconfirm, Tooltip, DatePicker, message } from 'antd'; +import { EditOutlined, DeleteOutlined, PlusOutlined, QuestionCircleOutlined } from '@ant-design/icons'; import { clsNames } from 'libs'; import css from './index.module.less'; import lds from "lodash"; @@ -280,10 +268,6 @@ Parameter.Component = function (props) { ))} ) - case 'upload': - return ( - - ) default: return } diff --git a/spug_web/src/pages/pipeline/modules/index.js b/spug_web/src/pages/pipeline/modules/index.js index 8db6227..cba19a4 100644 --- a/spug_web/src/pages/pipeline/modules/index.js +++ b/spug_web/src/pages/pipeline/modules/index.js @@ -7,6 +7,7 @@ import React from 'react'; import SSHExec from './SSHExec'; import Build from './Build'; import Parameter from './Parameter'; +import DataUpload from './DataUpload'; import DataTransfer from './DataTransfer'; function ModuleConfig(props) { @@ -19,6 +20,8 @@ function ModuleConfig(props) { return case 'parameter': return + case 'data_upload': + return default: return
hello
} diff --git a/spug_web/src/pages/pipeline/store.js b/spug_web/src/pages/pipeline/store.js index 22d021b..d96f5e4 100644 --- a/spug_web/src/pages/pipeline/store.js +++ b/spug_web/src/pages/pipeline/store.js @@ -49,12 +49,11 @@ class Store { return http.post('/api/pipeline/do/', {id: 1}) .then(res => { S.open = true + S.token = res.token S.nodes = res.nodes S.node = res.nodes[0] if (res.dynamic_params) { S.dynamicParams = res.dynamic_params - } else { - S.token = res.token } }) }