# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the MIT License. from django.views.generic import View from django.db.models import F from libs import json_response, JsonParser, Argument from apps.setting.utils import AppSetting from apps.host.models import Host from apps.app.models import Deploy from apps.schedule.models import Task from apps.monitor.models import Detection from libs.ssh import SSH, AuthenticationException from libs import human_datetime, AttrDict from openpyxl import load_workbook class HostView(View): def get(self, request): host_id = request.GET.get('id') if host_id: return json_response(Host.objects.get(pk=host_id)) hosts = Host.objects.filter(deleted_by_id__isnull=True) zones = [x['zone'] for x in hosts.order_by('zone').values('zone').distinct()] return json_response({'zones': zones, 'hosts': [x.to_dict() for x in hosts]}) def post(self, request): form, error = JsonParser( Argument('id', type=int, required=False), Argument('zone', help='请输入主机类型'), Argument('name', help='请输主机名称'), Argument('username', handler=str.strip, help='请输入登录用户名'), Argument('hostname', handler=str.strip, help='请输入主机名或IP'), Argument('port', type=int, help='请输入SSH端口'), Argument('desc', required=False), Argument('password', required=False), ).parse(request.body) if error is None: if valid_ssh(form.hostname, form.port, form.username, form.pop('password')) is False: return json_response('auth fail') if form.id: Host.objects.filter(pk=form.pop('id')).update(**form) elif Host.objects.filter(name=form.name, deleted_by_id__isnull=True).exists(): return json_response(error=f'已存在的主机名称【{form.name}】') else: Host.objects.create(created_by=request.user, **form) return json_response(error=error) def patch(self, request): form, error = JsonParser( Argument('id', type=int, required=False), Argument('zone', help='请输入主机类别') ).parse(request.body) if error is None: host = Host.objects.filter(pk=form.id).first() if not host: return json_response(error='未找到指定主机') count = Host.objects.filter(zone=host.zone, deleted_by_id__isnull=True).update(zone=form.zone) return json_response(count) return json_response(error=error) def delete(self, request): form, error = JsonParser( Argument('id', type=int, help='请指定操作对象') ).parse(request.GET) if error is None: deploy = Deploy.objects.filter(host_ids__regex=fr'\D{form.id}\D').annotate( app_name=F('app__name'), env_name=F('env__name') ).first() if deploy: return json_response(error=f'应用【{deploy.app_name}】在【{deploy.env_name}】的发布配置关联了该主机,请解除关联后再尝试删除该主机') task = Task.objects.filter(targets__regex=fr'\D{form.id}\D').first() if task: return json_response(error=f'任务计划中的任务【{task.name}】关联了该主机,请解除关联后再尝试删除该主机') detection = Detection.objects.filter(type__in=('3', '4'), addr=form.id).first() if detection: return json_response(error=f'监控中心的任务【{detection.name}】关联了该主机,请解除关联后再尝试删除该主机') Host.objects.filter(pk=form.id).update( deleted_at=human_datetime(), deleted_by=request.user, ) return json_response(error=error) def post_import(request): password = request.POST.get('password') file = request.FILES['file'] ws = load_workbook(file, read_only=True)['Sheet1'] summary = {'invalid': [], 'skip': [], 'fail': [], 'success': []} for i, row in enumerate(ws.rows): if i == 0: # 第1行是表头 略过 continue if not all([row[x].value for x in range(5)]): summary['invalid'].append(i) continue data = AttrDict( zone=row[0].value, name=row[1].value, hostname=row[2].value, port=row[3].value, username=row[4].value, password=row[5].value, desc=row[6].value ) if Host.objects.filter(hostname=data.hostname, port=data.port, username=data.username, deleted_by_id__isnull=True).exists(): summary['skip'].append(i) continue if valid_ssh(data.hostname, data.port, data.username, data.pop('password') or password) is False: summary['fail'].append(i) continue Host.objects.create(created_by=request.user, **data) summary['success'].append(i) return json_response(summary) def valid_ssh(hostname, port, username, password): try: private_key = AppSetting.get('private_key') public_key = AppSetting.get('public_key') except KeyError: private_key, public_key = SSH.generate_key() AppSetting.set('private_key', private_key, 'ssh private key') AppSetting.set('public_key', public_key, 'ssh public key') if password: cli = SSH(hostname, port, username, password=password) code, out = cli.exec_command('mkdir -p -m 700 ~/.ssh && \ echo %r >> ~/.ssh/authorized_keys && \ chmod 600 ~/.ssh/authorized_keys' % public_key) if code != 0: raise Exception(f'add public key error: {out!r}') else: cli = SSH(hostname, port, username, private_key) try: cli.ping() except AuthenticationException: return False return True