spug/spug_api/apps/exec/transfer.py

119 lines
5.0 KiB
Python

# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.views.generic import View
from django.conf import settings
from django_redis import get_redis_connection
from apps.exec.models import Transfer
from apps.account.utils import has_host_perm
from apps.host.models import Host
from apps.setting.utils import AppSetting
from libs import json_response, JsonParser, Argument, auth
from concurrent import futures
import subprocess
import tempfile
import uuid
import json
import os
class TransferView(View):
@auth('exec.task.do')
def get(self, request):
records = Transfer.objects.filter(user=request.user)
return json_response([x.to_view() for x in records])
@auth('exec.transfer.do')
def post(self, request):
data = request.POST.get('data')
form, error = JsonParser(
Argument('host', required=False),
Argument('dst_dir', help='请输入目标路径'),
Argument('host_ids', type=list, filter=lambda x: len(x), help='请选择目标主机'),
).parse(data)
if error is None:
if not has_host_perm(request.user, form.host_ids):
return json_response(error='无权访问主机,请联系管理员')
host_id = None
token = uuid.uuid4().hex
base_dir = os.path.join(settings.TRANSFER_DIR, token)
os.makedirs(base_dir)
if form.host:
host_id, path = json.loads(form.host)
host = Host.objects.get(pk=host_id)
with tempfile.NamedTemporaryFile(mode='w') as fp:
fp.write(host.pkey or AppSetting.get('private_key'))
fp.flush()
target = f'{host.username}@{host.hostname}:{path}'
command = f'sshfs -o ro -o ssh_command="ssh -p {host.port} -i {fp.name}" {target} {base_dir}'
task = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if task.returncode != 0:
return json_response(error=task.stdout.decode())
else:
index = 0
while True:
file = request.FILES.get(f'file{index}')
if not file:
break
with open(os.path.join(base_dir, file.name), 'wb') as f:
for chunk in file.chunks():
f.write(chunk)
index += 1
Transfer.objects.create(
user=request.user,
digest=token,
host_id=host_id,
src_dir=base_dir,
dst_dir=form.dst_dir,
host_ids=json.dumps(form.host_ids),
)
return json_response(token)
return json_response(error=error)
@auth('exec.transfer.do')
def patch(self, request):
form, error = JsonParser(
Argument('token', help='参数错误')
).parse(request.body)
if error is None:
rds = get_redis_connection()
task = Transfer.objects.get(digest=form.token)
threads = []
max_workers = max(10, os.cpu_count() * 5)
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for host in Host.objects.filter(id__in=json.loads(task.host_ids)):
t = executor.submit(_do_sync, rds, task, host)
t.token = task.digest
t.key = host.id
threads.append(t)
for t in futures.as_completed(threads):
exc = t.exception()
if exc:
rds.publish(t.token, json.dumps({'key': t.key, 'status': -1, 'data': f'Exception: {exc}'}))
if task.host_id:
command = f'umount -f {task.src_dir} && rm -rf {task.src_dir}'
else:
command = f'rm -rf {task.src_dir}'
subprocess.run(command, shell=True)
return json_response(error=error)
def _do_sync(rds, task, host):
token = task.digest
rds.publish(token, json.dumps({'key': host.id, 'data': '\r\n\x1b[36m### Executing ...\x1b[0m\r\n'}))
with tempfile.NamedTemporaryFile(mode='w') as fp:
fp.write(host.pkey or AppSetting.get('private_key'))
fp.flush()
options = '-azv' if task.host_id else '-rzv'
target = f'{host.username}@{host.hostname}:{task.dst_dir}'
command = f'rsync {options} -h -e "ssh -p {host.port} -i {fp.name}" {task.src_dir}/ {target}'
task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while True:
message = task.stdout.readline()
if not message:
break
message = message.decode().rstrip('\r\n')
rds.publish(token, json.dumps({'key': host.id, 'data': message + '\r\n'}))
rds.publish(token, json.dumps({'key': host.id, 'status': task.wait()}))