update pipeline module

4.0
vapao 2023-03-12 01:08:34 +08:00
parent 5428b5c365
commit 6809eb95cd
11 changed files with 155 additions and 64 deletions

View File

@ -3,9 +3,10 @@
# Released under the AGPL-3.0 License.
from django.urls import path
from apps.pipeline.views import PipeView, DoView
from apps.pipeline.views import PipeView, DoView, handle_data_upload
urlpatterns = [
path('', PipeView.as_view()),
path('upload/', handle_data_upload),
path('do/', DoView.as_view()),
]

View File

@ -12,9 +12,11 @@ from apps.setting.utils import AppSetting
from functools import partial
from threading import Thread
from concurrent import futures
from pathlib import Path
from uuid import uuid4
import subprocess
import tempfile
import shutil
import time
import os
@ -50,6 +52,8 @@ class NodeExecutor:
self._do_ssh_exec(node)
elif node.module == 'data_transfer':
self._do_data_transfer(node)
elif node.module == 'data_upload':
self._do_data_upload(node)
elif node.module == 'parameter':
self._do_parameter(node)
@ -149,7 +153,7 @@ class NodeExecutor:
threads = []
with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for host in Host.objects.filter(id__in=destination.targets):
t = executor.submit(self._data_transfer, node, host, local_path)
t = executor.submit(self._data_transfer, node, host, local_path, destination.path)
threads.append(t)
results = [x.result() for x in futures.as_completed(threads)]
os.system(f'umount -f {local_dir} &> /dev/null ; rm -rf {local_dir}')
@ -157,10 +161,10 @@ class NodeExecutor:
self.helper.send_status(node.id, state)
self.run(node, state)
def _data_transfer(self, node, host, local_path):
def _data_transfer(self, node, host, local_path, remote_path):
# TODO支持--delete参数页面上添加是否删除选项
timestamp = time.time()
key = f'{node.id}.{host.id}'
remote_path = node.destination.path
self.helper.send_info(key, '开始传输数据\r\n', 'processing')
with tempfile.NamedTemporaryFile(mode='w') as fp:
fp.write(host.pkey or AppSetting.get('private_key'))
@ -178,3 +182,17 @@ class NodeExecutor:
if is_success:
self.helper.send_success(key, '传输完成', start_time=timestamp)
return is_success
def _do_data_upload(self, node):
self.helper.send_info(node.id, '开始执行\r\n', 'processing')
local_path = Path(settings.TRANSFER_DIR) / self.token / str(node.id)
threads = []
with futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
for host in Host.objects.filter(id__in=node.targets):
t = executor.submit(self._data_transfer, node, host, f'{local_path}{os.sep}', node.path)
threads.append(t)
results = [x.result() for x in futures.as_completed(threads)]
shutil.rmtree(local_path)
state = 'success' if all(results) else 'error'
self.helper.send_status(node.id, state)
self.run(node, state)

View File

@ -2,6 +2,8 @@
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.views.generic import View
from django.conf import settings
from django.http.response import HttpResponseBadRequest
from django_redis import get_redis_connection
from libs import JsonParser, Argument, json_response, auth
from libs.utils import AttrDict
@ -9,6 +11,7 @@ from apps.pipeline.models import Pipeline, PipeHistory
from apps.pipeline.utils import NodeExecutor
from apps.host.models import Host
from threading import Thread
from pathlib import Path
from uuid import uuid4
import json
@ -83,17 +86,23 @@ class DoView(View):
for item in filter(lambda x: x['module'] == 'data_transfer', nodes):
ids.update(item['destination']['targets'])
dynamic_params = None
dynamic_params = []
host_map = {x.id: f'{x.name}({x.hostname})' for x in Host.objects.filter(id__in=ids)}
for item in nodes:
if item['module'] == 'ssh_exec':
if item['module'] in ('ssh_exec', 'data_upload'):
item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['targets']]
elif item['module'] == 'data_transfer':
item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['destination']['targets']]
elif item['module'] == 'parameter':
dynamic_params = item.get('dynamic_params')
if not dynamic_params:
if item['module'] == 'parameter':
dynamic_params = item.get('dynamic_params')
elif item['module'] == 'data_upload':
dynamic_params.append({'id': item['id'], 'name': item['name'], 'type': 'upload'})
token = uuid4().hex
if dynamic_params:
response = AttrDict(token=token, nodes=nodes, dynamic_params=dynamic_params)
else:
latest_history = pipe.pipehistory_set.first()
ordinal = latest_history.ordinal + 1 if latest_history else 1
history = PipeHistory.objects.create(pipeline=pipe, ordinal=ordinal, created_by=request.user)
@ -101,9 +110,7 @@ class DoView(View):
rds = get_redis_connection()
executor = NodeExecutor(rds, history.deploy_key, json.loads(pipe.nodes))
Thread(target=executor.run).start()
response = AttrDict(token=history.id, nodes=nodes)
else:
response = AttrDict(nodes=nodes, dynamic_params=dynamic_params)
response = AttrDict(token=token, nodes=nodes)
return json_response(response)
return json_response(error=error)
@ -111,6 +118,7 @@ class DoView(View):
def patch(self, request):
form, error = JsonParser(
Argument('id', type=int, help='参数错误'),
Argument('token', help='参数错误'),
Argument('params', type=dict, help='参数错误'),
Argument('cols', type=int, required=False),
Argument('rows', type=int, required=False)
@ -131,9 +139,22 @@ class DoView(View):
PipeHistory.objects.create(pipeline=pipe, ordinal=ordinal, created_by=request.user)
rds = get_redis_connection()
token = uuid4().hex
executor = NodeExecutor(rds, token, nodes)
executor = NodeExecutor(rds, form.token, nodes)
Thread(target=executor.run).start()
response = AttrDict(token=token)
return json_response(response)
return json_response()
return json_response(error=error)
def handle_data_upload(request):
token = request.POST.get('token')
node_id = request.POST.get('id')
file = request.FILES.get('file')
if not all([token, node_id, file]):
return HttpResponseBadRequest('参数错误')
file_path = Path(settings.TRANSFER_DIR) / token / node_id / file.name
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'wb') as f:
for chunk in file.chunks():
f.write(chunk)
return json_response('ok')

View File

@ -3,12 +3,12 @@
* Copyright (c) <spug.dev@gmail.com>
* Released under the AGPL-3.0 License.
*/
import React, {useState} from 'react';
import React, { useState } from 'react';
import { observer } from 'mobx-react';
import { Button, Form } from 'antd';
import { ArrowRightOutlined } from '@ant-design/icons';
import { Button, Form, Upload } from 'antd';
import { ArrowRightOutlined, UploadOutlined } from '@ant-design/icons';
import Parameter from '../modules/Parameter';
import { http } from 'libs';
import { http, X_TOKEN } from 'libs';
import S from './store';
function Ask(props) {
@ -18,27 +18,31 @@ function Ask(props) {
function handleOk() {
const params = form.getFieldsValue();
setLoading(true)
http.patch('/api/pipeline/do/', {id: 1, params})
http.patch('/api/pipeline/do/', {id: 1, token: S.token, params})
.then(res => {
S.token = res.token
S.dynamicParams = null
})
.finally(() => setLoading(false))
}
return (
<div hidden={S.token}>
<Form form={form} labelCol={{span: 6}} wrapperCol={{span: 16}}>
{(S.dynamicParams ?? []).map((item, idx) => (
<Form.Item key={idx} required={item.required} name={item.variable} label={item.name} tooltip={item.help}>
<Parameter.Component data={item}/>
</Form.Item>
))}
<Form.Item wrapperCol={{offset: 6, span: 16}}>
<Button icon={<ArrowRightOutlined/>} loading={loading} type="primary" onClick={handleOk}>下一步</Button>
<Form form={form} labelCol={{span: 6}} wrapperCol={{span: 16}}>
{(S.dynamicParams ?? []).map((item, idx) => item.type === 'upload' ? (
<Form.Item key={idx} required label={item.name}>
<Upload multiple action="/api/pipeline/upload/" headers={{'X-Token': X_TOKEN}}
data={{token: S.token, id: item.id}}>
<Button icon={<UploadOutlined/>}>点击上传</Button>
</Upload>
</Form.Item>
</Form>
</div>
) : (
<Form.Item key={idx} required={item.required} name={item.variable} label={item.name} tooltip={item.help}>
<Parameter.Component data={item}/>
</Form.Item>
))}
<Form.Item wrapperCol={{offset: 6, span: 16}}>
<Button icon={<ArrowRightOutlined/>} loading={loading} type="primary" onClick={handleOk}>下一步</Button>
</Form.Item>
</Form>
)
}

View File

@ -121,17 +121,17 @@ function Body() {
) : wsState === '1' ? (
<Badge status="success" text="Websocket 已连接"/>
) : (
<Badge status="processing" text="Websocket 连接已关闭"/>
<Badge status="error" text="Websocket 连接已关闭"/>
)}
{['build', 'ssh_exec', 'data_transfer'].includes(S.node.module) && S.outputs[S.nodeID]?.status === 'processing' ? (
{['build', 'ssh_exec', 'data_transfer', 'data_upload'].includes(S.node.module) && S.outputs[S.nodeID]?.status === 'processing' ? (
<Popconfirm title="确定要终止执行?" onConfirm={handleTerminate}>
<StopOutlined className={css.icon} style={{color: '#faad14'}}/>
</Popconfirm>
) : (
<StopOutlined className={css.icon} style={{color: '#dfdfdf'}}/>
)}
{['build', 'ssh_exec', 'data_transfer'].includes(S.node.module) ? (
{['build', 'ssh_exec', 'data_transfer', 'data_upload'].includes(S.node.module) ? (
<Tooltip title="打开web终端">
<CodeOutlined className={css.icon} onClick={() => openTerminal()}/>
</Tooltip>
@ -139,7 +139,7 @@ function Body() {
<CodeOutlined className={css.icon} style={{color: '#dfdfdf'}}/>
)}
</div>
{['ssh_exec', 'data_transfer'].includes(S.node?.module) && (
{['ssh_exec', 'data_transfer', 'data_upload'].includes(S.node?.module) && (
<Tabs items={(S.node?._targets ?? []).map(x => ({label: x.name, key: x.id}))}
className={css.tabs} activeKey={S.node._host_id} onChange={handleTabChange}/>
)}

View File

@ -14,7 +14,7 @@ import css from './sider.module.less';
function Sider() {
function handleClick(node) {
node = lds.cloneDeep(node)
if (['ssh_exec', 'data_transfer'].includes(node.module)) {
if (['ssh_exec', 'data_transfer', 'data_upload'].includes(node.module)) {
node._host_id = node._targets[0].id
node._id = `${node.id}.${node._host_id}`
} else if (node.module === 'build') {

View File

@ -15,27 +15,28 @@ import css from './index.module.less';
function Index() {
function handleClose() {
S.open = false
S.token = false
S.dynamicParams = null
}
return (
<Modal
open={S.open}
width={S.token ? '80%' : '540px'}
title={S.token ? '执行控制台' : '执行参数设置'}
width={S.dynamicParams ? '540px' : '80%'}
title={S.dynamicParams ? '执行参数设置' : '执行控制台'}
footer={null}
destroyOnClose
maskClosable={false}
wrapClassName={css.fade}
afterClose={S.initial}
onCancel={handleClose}>
<Ask/>
{S.token ? (
{S.dynamicParams ? (
<Ask/>
) : (
<Row>
<Sider/>
<Body/>
</Row>
) : null}
)}
</Modal>
)
}

View File

@ -0,0 +1,60 @@
/**
* Copyright (c) OpenSpug Organization. https://github.com/openspug/spug
* Copyright (c) <spug.dev@gmail.com>
* Released under the AGPL-3.0 License.
*/
import React, { useEffect } from 'react';
import { Form, Input, Radio, InputNumber, message } from 'antd';
import HostSelector from 'pages/host/Selector';
function DataUpload(props) {
const [form] = Form.useForm()
useEffect(() => {
props.setHandler(() => handleSave)
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
useEffect(() => {
form.resetFields()
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [props.node])
function handleSave() {
const data = form.getFieldsValue()
if (!data.name) return message.error('请输入节点名称')
if (!data.condition) return message.error('请选择节点的执行条件')
if (!data.targets || data.targets.length === 0) return message.error('请选择上传目标主机')
if (!data.path) return message.error('请输入上传路径')
return data
}
return (
<Form layout="vertical" form={form} initialValues={props.node}>
<Form.Item required name="name" label="节点名称">
<Input placeholder="请输入节点名称"/>
</Form.Item>
<Form.Item required name="condition" label="执行条件" tooltip="当该节点为流程的起始节点时(无上游节点),该条件将会被忽略。">
<Radio.Group>
<Radio.Button value="success">上游执行成功时</Radio.Button>
<Radio.Button value="error">上游执行失败时</Radio.Button>
<Radio.Button value="always">总是执行</Radio.Button>
</Radio.Group>
</Form.Item>
<Form.Item required name="targets" label="选择主机" tooltip="数据将会上传到选择的主机上">
<HostSelector type="button"/>
</Form.Item>
<Form.Item required name="path" label="上传路径(目录)" tooltip="文件将会上传到该目录下,同名文件将会被覆盖">
<Input placeholder="请输入上传路径"/>
</Form.Item>
<Form.Item name="accept" label="文件类型限制" tooltip="限制上传的文件类型,">
<Input placeholder="请输入接受上传的文件类型"/>
</Form.Item>
<Form.Item name="size" label="文件大小限制" tooltip="限制上传的文件大小单位为MB">
<InputNumber addonAfter="MB" placeholder="请输入文件大小限制"/>
</Form.Item>
</Form>
)
}
export default DataUpload

View File

@ -5,20 +5,8 @@
*/
import React, { useEffect, useState } from 'react';
import { observer, useLocalStore } from 'mobx-react';
import {
Form,
Input,
Button,
Modal,
Switch,
Select,
Popconfirm,
Tooltip,
DatePicker,
Upload,
message
} from 'antd';
import { EditOutlined, DeleteOutlined, PlusOutlined, QuestionCircleOutlined, UploadOutlined } from '@ant-design/icons';
import { Form, Input, Button, Modal, Switch, Select, Popconfirm, Tooltip, DatePicker, message } from 'antd';
import { EditOutlined, DeleteOutlined, PlusOutlined, QuestionCircleOutlined } from '@ant-design/icons';
import { clsNames } from 'libs';
import css from './index.module.less';
import lds from "lodash";
@ -280,10 +268,6 @@ Parameter.Component = function (props) {
))}
</Select>
)
case 'upload':
return (
<Upload><Button icon={<UploadOutlined/>}>点击上传</Button></Upload>
)
default:
return <Input defaultValue={data.default} placeholder="请输入" {...props}/>
}

View File

@ -7,6 +7,7 @@ import React from 'react';
import SSHExec from './SSHExec';
import Build from './Build';
import Parameter from './Parameter';
import DataUpload from './DataUpload';
import DataTransfer from './DataTransfer';
function ModuleConfig(props) {
@ -19,6 +20,8 @@ function ModuleConfig(props) {
return <DataTransfer {...props}/>
case 'parameter':
return <Parameter {...props}/>
case 'data_upload':
return <DataUpload {...props}/>
default:
return <div>hello</div>
}

View File

@ -49,12 +49,11 @@ class Store {
return http.post('/api/pipeline/do/', {id: 1})
.then(res => {
S.open = true
S.token = res.token
S.nodes = res.nodes
S.node = res.nodes[0]
if (res.dynamic_params) {
S.dynamicParams = res.dynamic_params
} else {
S.token = res.token
}
})
}