spug/spug_api/apps/pipeline/views.py

170 lines
7.1 KiB
Python

# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.views.generic import View
from django.conf import settings
from django.http.response import HttpResponseBadRequest
from django_redis import get_redis_connection
from libs import JsonParser, Argument, json_response, auth
from libs.utils import AttrDict
from apps.pipeline.models import Pipeline, PipeHistory
from apps.pipeline.utils import NodeExecutor
from apps.host.models import Host
from threading import Thread
from pathlib import Path
from uuid import uuid4
import json
class PipeView(View):
def get(self, request):
form, error = JsonParser(
Argument('id', type=int, required=False)
).parse(request.GET)
if error is None:
if form.id:
pipe = Pipeline.objects.filter(pk=form.id).first()
if not pipe:
return json_response(error='未找到指定流程')
response = pipe.to_view()
else:
pipes = Pipeline.objects.all()
response = [x.to_list() for x in pipes]
return json_response(response)
@auth('deploy.app.add|deploy.app.edit|config.app.add|config.app.edit')
def post(self, request):
form, error = JsonParser(
Argument('id', type=int, required=False),
Argument('name', help='请输入流程名称'),
Argument('nodes', type=list, handler=json.dumps, default='[]')
).parse(request.body)
if error is None:
if form.id:
Pipeline.objects.filter(pk=form.id).update(**form)
pipe = Pipeline.objects.get(pk=form.id)
else:
pipe = Pipeline.objects.create(created_by=request.user, **form)
return json_response(pipe.to_view())
return json_response(error=error)
def patch(self, request):
form, error = JsonParser(
Argument('id', type=int, help='请指定操作对象'),
Argument('name', required=False),
Argument('nodes', type=list, handler=json.dumps, required=False),
).parse(request.body, True)
if error is None:
Pipeline.objects.filter(pk=form.id).update(**form)
return json_response(error=error)
@auth('deploy.app.del|config.app.del')
def delete(self, request):
form, error = JsonParser(
Argument('id', type=int, help='请指定操作对象')
).parse(request.GET)
if error is None:
Pipeline.objects.filter(pk=form.id).delete()
return json_response(error=error)
class DoView(View):
@auth('exec.task.do')
def get(self, request):
pass
@auth('exec.task.do')
def post(self, request):
form, error = JsonParser(
Argument('id', type=int, help='参数错误'),
).parse(request.body)
if error is None:
pipe = Pipeline.objects.get(pk=form.id)
nodes, ids = json.loads(pipe.nodes), set()
for item in filter(lambda x: x['module'] == 'ssh_exec', nodes):
ids.update(item['targets'])
for item in filter(lambda x: x['module'] == 'data_transfer', nodes):
ids.update(item['destination']['targets'])
dynamic_params = []
host_map = {x.id: f'{x.name}({x.hostname})' for x in Host.objects.filter(id__in=ids)}
for item in nodes:
if item['module'] in ('ssh_exec', 'data_upload'):
item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['targets']]
elif item['module'] == 'data_transfer':
item['_targets'] = [{'id': x, 'name': host_map[x]} for x in item['destination']['targets']]
if item['module'] == 'parameter':
if item.get('dynamic_params'):
dynamic_params.extend(item['dynamic_params'])
elif item['module'] == 'build':
if item.get('git_commit') == 'selective':
dynamic_params.append({'variable': 'git_commit', 'name': 'Git提交', 'type': 'select', 'options': [{'value': 1, 'label': 1}], 'required': True})
elif item.get('git_tag') == 'selective':
dynamic_params.append({'variable': 'tag_', 'name': 'Git标签', 'type': 'text', 'required': True})
elif item['module'] == 'data_upload':
tmp = {'variable': item['id'], 'name': item['name'], 'type': 'upload', 'required': True}
if item.get('accept'):
tmp['accept'] = item['accept']
if item.get('size'):
tmp['size'] = item['size']
dynamic_params.append(tmp)
token = uuid4().hex
if dynamic_params:
response = AttrDict(token=token, nodes=nodes, dynamic_params=dynamic_params)
else:
latest_history = pipe.pipehistory_set.first()
ordinal = latest_history.ordinal + 1 if latest_history else 1
PipeHistory.objects.create(pipeline=pipe, ordinal=ordinal, created_by=request.user)
rds = get_redis_connection()
executor = NodeExecutor(rds, token, json.loads(pipe.nodes))
Thread(target=executor.run).start()
response = AttrDict(token=token, nodes=nodes)
return json_response(response)
return json_response(error=error)
@auth('exec.task.do')
def patch(self, request):
form, error = JsonParser(
Argument('id', type=int, help='参数错误'),
Argument('token', help='参数错误'),
Argument('params', type=dict, help='参数错误'),
).parse(request.body)
if error is None:
for k, v in form.params.items():
if isinstance(v, list):
form.params[k] = ','.join(v)
pipe = Pipeline.objects.get(pk=form.id)
nodes = json.loads(pipe.nodes)
for item in nodes:
if item['module'] == 'parameter':
item['dynamic_params'] = form.params
break
latest_history = pipe.pipehistory_set.first()
ordinal = latest_history.ordinal + 1 if latest_history else 1
PipeHistory.objects.create(pipeline=pipe, ordinal=ordinal, created_by=request.user)
rds = get_redis_connection()
executor = NodeExecutor(rds, form.token, nodes)
Thread(target=executor.run).start()
return json_response()
return json_response(error=error)
def handle_data_upload(request):
token = request.POST.get('token')
node_id = request.POST.get('id')
file = request.FILES.get('file')
if not all([token, node_id, file]):
return HttpResponseBadRequest('参数错误')
file_path = Path(settings.TRANSFER_DIR) / token / node_id / file.name
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'wb') as f:
for chunk in file.chunks():
f.write(chunk)
return json_response('ok')