spug/spug_api/apps/host/views.py

182 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from django.views.generic import View
from django.db.models import F
from django.http.response import HttpResponseBadRequest
from libs import json_response, JsonParser, Argument
from apps.setting.utils import AppSetting
from apps.host.models import Host
from apps.app.models import Deploy
from apps.schedule.models import Task
from apps.monitor.models import Detection
from apps.account.models import Role
from libs.ssh import SSH, AuthenticationException
from paramiko.ssh_exception import BadAuthenticationType
from libs import human_datetime, AttrDict
from openpyxl import load_workbook
import socket
class HostView(View):
def get(self, request):
host_id = request.GET.get('id')
if host_id:
if not request.user.has_host_perm(host_id):
return json_response(error='无权访问该主机,请联系管理员')
return json_response(Host.objects.get(pk=host_id))
hosts = Host.objects.filter(deleted_by_id__isnull=True)
zones = [x['zone'] for x in hosts.order_by('zone').values('zone').distinct()]
perms = [x.id for x in hosts] if request.user.is_supper else request.user.host_perms
return json_response({'zones': zones, 'hosts': [x.to_dict() for x in hosts], 'perms': perms})
def post(self, request):
form, error = JsonParser(
Argument('id', type=int, required=False),
Argument('zone', help='请输入主机类型'),
Argument('name', help='请输主机名称'),
Argument('username', handler=str.strip, help='请输入登录用户名'),
Argument('hostname', handler=str.strip, help='请输入主机名或IP'),
Argument('port', type=int, help='请输入SSH端口'),
Argument('pkey', required=False),
Argument('desc', required=False),
Argument('password', required=False),
).parse(request.body)
if error is None:
if valid_ssh(form.hostname, form.port, form.username, password=form.pop('password'),
pkey=form.pkey) is False:
return json_response('auth fail')
if form.id:
Host.objects.filter(pk=form.pop('id')).update(**form)
elif Host.objects.filter(name=form.name, deleted_by_id__isnull=True).exists():
return json_response(error=f'已存在的主机名称【{form.name}')
else:
host = Host.objects.create(created_by=request.user, **form)
if request.user.role:
request.user.role.add_host_perm(host.id)
return json_response(error=error)
def patch(self, request):
form, error = JsonParser(
Argument('id', type=int, required=False),
Argument('zone', help='请输入主机类别')
).parse(request.body)
if error is None:
host = Host.objects.filter(pk=form.id).first()
if not host:
return json_response(error='未找到指定主机')
count = Host.objects.filter(zone=host.zone, deleted_by_id__isnull=True).update(zone=form.zone)
return json_response(count)
return json_response(error=error)
def delete(self, request):
form, error = JsonParser(
Argument('id', type=int, help='请指定操作对象')
).parse(request.GET)
if error is None:
deploy = Deploy.objects.filter(host_ids__regex=fr'[^0-9]{form.id}[^0-9]').annotate(
app_name=F('app__name'),
env_name=F('env__name')
).first()
if deploy:
return json_response(error=f'应用【{deploy.app_name}】在【{deploy.env_name}】的发布配置关联了该主机,请解除关联后再尝试删除该主机')
task = Task.objects.filter(targets__regex=fr'[^0-9]{form.id}[^0-9]').first()
if task:
return json_response(error=f'任务计划中的任务【{task.name}】关联了该主机,请解除关联后再尝试删除该主机')
detection = Detection.objects.filter(type__in=('3', '4'), addr=form.id).first()
if detection:
return json_response(error=f'监控中心的任务【{detection.name}】关联了该主机,请解除关联后再尝试删除该主机')
role = Role.objects.filter(host_perms__regex=fr'[^0-9]{form.id}[^0-9]').first()
if role:
return json_response(error=f'角色【{role.name}】的主机权限关联了该主机,请解除关联后再尝试删除该主机')
Host.objects.filter(pk=form.id).update(
deleted_at=human_datetime(),
deleted_by=request.user,
)
return json_response(error=error)
def post_import(request):
password = request.POST.get('password')
file = request.FILES['file']
ws = load_workbook(file, read_only=True)['Sheet1']
summary = {'invalid': [], 'skip': [], 'fail': [], 'network': [], 'repeat': [], 'success': [], 'error': []}
for i, row in enumerate(ws.rows):
if i == 0: # 第1行是表头 略过
continue
if not all([row[x].value for x in range(5)]):
summary['invalid'].append(i)
continue
data = AttrDict(
zone=row[0].value,
name=row[1].value,
hostname=row[2].value,
port=row[3].value,
username=row[4].value,
password=row[5].value,
desc=row[6].value
)
if Host.objects.filter(hostname=data.hostname, port=data.port, username=data.username,
deleted_by_id__isnull=True).exists():
summary['skip'].append(i)
continue
try:
if valid_ssh(data.hostname, data.port, data.username, data.pop('password') or password, None,
False) is False:
summary['fail'].append(i)
continue
except AuthenticationException:
summary['fail'].append(i)
continue
except socket.error:
summary['network'].append(i)
continue
except Exception:
summary['error'].append(i)
continue
if Host.objects.filter(name=data.name, deleted_by_id__isnull=True).exists():
summary['repeat'].append(i)
continue
host = Host.objects.create(created_by=request.user, **data)
if request.user.role:
request.user.role.add_host_perm(host.id)
summary['success'].append(i)
return json_response(summary)
def valid_ssh(hostname, port, username, password=None, pkey=None, with_expect=True):
try:
private_key = AppSetting.get('private_key')
public_key = AppSetting.get('public_key')
except KeyError:
private_key, public_key = SSH.generate_key()
AppSetting.set('private_key', private_key, 'ssh private key')
AppSetting.set('public_key', public_key, 'ssh public key')
if password:
_cli = SSH(hostname, port, username, password=str(password))
_cli.add_public_key(public_key)
if pkey:
private_key = pkey
try:
cli = SSH(hostname, port, username, private_key)
cli.ping()
except BadAuthenticationType:
if with_expect:
raise TypeError('该主机不支持密钥认证请参考官方文档错误代码E01')
return False
except AuthenticationException:
if password and with_expect:
raise ValueError('密钥认证失败请参考官方文档错误代码E02')
return False
return True
def post_parse(request):
file = request.FILES['file']
if file:
data = file.read()
return json_response(data.decode())
else:
return HttpResponseBadRequest()