spug/spug_api/apps/host/views.py

269 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.views.generic import View
from django.db.models import F
from django.http.response import HttpResponseBadRequest, HttpResponse
from libs import json_response, JsonParser, Argument, AttrDict, auth
from apps.setting.utils import AppSetting
from apps.account.utils import get_host_perms
from apps.host.models import Host, Group
from apps.host.utils import batch_sync_host, _sync_host_extend
from apps.app.models import Deploy
from apps.schedule.models import Task
from apps.monitor.models import Detection
from apps.exec.models import ExecTemplate
from libs.ssh import SSH, AuthenticationException
from paramiko.ssh_exception import BadAuthenticationType
from openpyxl import load_workbook, Workbook
from threading import Thread
import socket
import uuid
import json
class HostView(View):
def get(self, request):
hosts = Host.objects.select_related('hostextend')
if not request.user.is_supper:
hosts = hosts.filter(id__in=get_host_perms(request.user))
hosts = {x.id: x.to_view() for x in hosts}
for rel in Group.hosts.through.objects.filter(host_id__in=hosts.keys()):
hosts[rel.host_id]['group_ids'].append(rel.group_id)
return json_response(list(hosts.values()))
@auth('host.host.add|host.host.edit')
def post(self, request):
form, error = JsonParser(
Argument('id', type=int, required=False),
Argument('group_ids', type=list, filter=lambda x: len(x), help='请选择主机分组'),
Argument('name', help='请输主机名称'),
Argument('username', handler=str.strip, help='请输入登录用户名'),
Argument('hostname', handler=str.strip, help='请输入主机名或IP'),
Argument('port', type=int, help='请输入SSH端口'),
Argument('pkey', required=False),
Argument('desc', required=False),
Argument('password', required=False),
).parse(request.body)
if error is None:
if not _do_host_verify(form):
return json_response('auth fail')
group_ids = form.pop('group_ids')
other = Host.objects.filter(name=form.name).first()
if other and (not form.id or other.id != form.id):
return json_response(error=f'已存在的主机名称【{form.name}')
if form.id:
Host.objects.filter(pk=form.id).update(is_verified=True, **form)
host = Host.objects.get(pk=form.id)
else:
host = Host.objects.create(created_by=request.user, is_verified=True, **form)
host.groups.set(group_ids)
response = host.to_view()
response['group_ids'] = group_ids
return json_response(response)
return json_response(error=error)
@auth('host.host.add|host.host.edit')
def put(self, request):
form, error = JsonParser(
Argument('id', type=int, help='参数错误')
).parse(request.body)
if error is None:
host = Host.objects.get(pk=form.id)
with host.get_ssh() as ssh:
_sync_host_extend(host, ssh=ssh)
return json_response(error=error)
@auth('admin')
def patch(self, request):
form, error = JsonParser(
Argument('host_ids', type=list, filter=lambda x: len(x), help='请选择主机'),
Argument('s_group_id', type=int, help='参数错误'),
Argument('t_group_id', type=int, help='参数错误'),
Argument('is_copy', type=bool, help='参数错误'),
).parse(request.body)
if error is None:
if form.t_group_id == form.s_group_id:
return json_response(error='不能选择本分组的主机')
s_group = Group.objects.get(pk=form.s_group_id)
t_group = Group.objects.get(pk=form.t_group_id)
t_group.hosts.add(*form.host_ids)
if not form.is_copy:
s_group.hosts.remove(*form.host_ids)
return json_response(error=error)
@auth('host.host.del')
def delete(self, request):
form, error = JsonParser(
Argument('id', type=int, required=False),
Argument('group_id', type=int, required=False),
).parse(request.GET)
if error is None:
if form.id:
host_ids = [form.id]
elif form.group_id:
group = Group.objects.get(pk=form.group_id)
host_ids = [x.id for x in group.hosts.all()]
else:
return json_response(error='参数错误')
for host_id in host_ids:
regex = fr'[^0-9]{host_id}[^0-9]'
deploy = Deploy.objects.filter(host_ids__regex=regex) \
.annotate(app_name=F('app__name'), env_name=F('env__name')).first()
if deploy:
return json_response(
error=f'应用【{deploy.app_name}】在【{deploy.env_name}】的发布配置关联了该主机,请解除关联后再尝试删除该主机')
task = Task.objects.filter(targets__regex=regex).first()
if task:
return json_response(
error=f'任务计划中的任务【{task.name}】关联了该主机,请解除关联后再尝试删除该主机')
detection = Detection.objects.filter(type__in=('3', '4'), targets__regex=regex).first()
if detection:
return json_response(
error=f'监控中心的任务【{detection.name}】关联了该主机,请解除关联后再尝试删除该主机')
tpl = ExecTemplate.objects.filter(host_ids__regex=regex).first()
if tpl:
return json_response(error=f'执行模板【{tpl.name}】关联了该主机,请解除关联后再尝试删除该主机')
Host.objects.filter(id__in=host_ids).delete()
return json_response(error=error)
@auth('host.host.add')
def post_import(request):
group_id = request.POST.get('group_id')
file = request.FILES['file']
hosts = []
ws = load_workbook(file, read_only=True)['Sheet1']
summary = {'fail': 0, 'success': 0, 'invalid': [], 'skip': [], 'repeat': []}
for i, row in enumerate(ws.rows, start=1):
if i == 1: # 第1行是表头 略过
continue
if not all([row[x].value for x in range(4)]):
summary['invalid'].append(i)
summary['fail'] += 1
continue
data = AttrDict(
name=row[0].value,
hostname=row[1].value,
port=row[2].value,
username=row[3].value,
desc=row[5].value
)
if Host.objects.filter(hostname=data.hostname, port=data.port, username=data.username).exists():
summary['skip'].append(i)
summary['fail'] += 1
continue
if Host.objects.filter(name=data.name).exists():
summary['repeat'].append(i)
summary['fail'] += 1
continue
host = Host.objects.create(created_by=request.user, **data)
host.groups.add(group_id)
summary['success'] += 1
host.password = row[4].value
hosts.append(host)
token = uuid.uuid4().hex
if hosts:
Thread(target=batch_sync_host, args=(token, hosts)).start()
return json_response({'summary': summary, 'token': token, 'hosts': {x.id: {'name': x.name} for x in hosts}})
@auth('host.host.view')
def post_export(request):
hosts = Host.objects.select_related('hostextend')
if not request.user.is_supper:
hosts = hosts.filter(id__in=get_host_perms(request.user))
wb = Workbook()
ws = wb.active
ws.append(('主机名称', 'SSH地址', 'SSH端口', 'SSH用户', 'SSH密码', '备注信息', '实例ID', '操作系统', 'CPU核心数',
'内存GB', '磁盘GB', '内网IP'
'公网IP', '实例计费方式', '网络计费方式', '创建时间', '到期时间'))
for item in hosts:
data = [item.name, item.hostname, item.port, item.username, '', item.desc]
if hasattr(item, 'hostextend'):
data.extend([
item.hostextend.instance_id,
item.hostextend.os_name,
item.hostextend.cpu,
item.hostextend.memory,
','.join(str(x) for x in json.loads(item.hostextend.disk)),
','.join(json.loads(item.hostextend.private_ip_address)),
','.join(json.loads(item.hostextend.public_ip_address)),
item.hostextend.get_instance_charge_type_display(),
item.hostextend.get_internet_charge_type_display(),
item.hostextend.created_time,
item.hostextend.expired_time
])
ws.append(data)
response = HttpResponse(content_type='application/octet-stream')
wb.save(response)
return response
@auth('host.host.add')
def post_parse(request):
file = request.FILES['file']
if file:
data = file.read()
return json_response(data.decode())
else:
return HttpResponseBadRequest()
@auth('host.host.add')
def batch_valid(request):
form, error = JsonParser(
Argument('password', required=False),
Argument('range', filter=lambda x: x in ('1', '2'), help='参数错误')
).parse(request.body)
if error is None:
if form.range == '1': # all hosts
hosts = Host.objects.all()
else:
hosts = Host.objects.filter(is_verified=False).all()
token = uuid.uuid4().hex
Thread(target=batch_sync_host, args=(token, hosts, form.password)).start()
return json_response({'token': token, 'hosts': {x.id: {'name': x.name} for x in hosts}})
return json_response(error=error)
def _do_host_verify(form):
password = form.pop('password')
if form.pkey:
try:
with SSH(form.hostname, form.port, form.username, form.pkey) as ssh:
ssh.ping()
return True
except BadAuthenticationType:
raise Exception('该主机不支持密钥认证请参考官方文档错误代码E01')
except AuthenticationException:
raise Exception('上传的独立密钥认证失败,请检查该密钥是否能正常连接主机(推荐使用全局密钥)')
except socket.timeout:
raise Exception('连接主机超时,请检查网络')
private_key, public_key = AppSetting.get_ssh_key()
if password:
try:
with SSH(form.hostname, form.port, form.username, password=password) as ssh:
ssh.add_public_key(public_key)
except BadAuthenticationType:
raise Exception('该主机不支持密码认证请参考官方文档错误代码E00')
except AuthenticationException:
raise Exception('密码连接认证失败,请检查密码是否正确')
except socket.timeout:
raise Exception('连接主机超时,请检查网络')
try:
with SSH(form.hostname, form.port, form.username, private_key) as ssh:
ssh.ping()
except BadAuthenticationType:
raise Exception('该主机不支持密钥认证请参考官方文档错误代码E01')
except AuthenticationException:
if password:
raise Exception('密钥认证失败请参考官方文档错误代码E02')
return False
except socket.timeout:
raise Exception('连接主机超时,请检查网络')
return True