From df3c135dfdb2098a2904a990fbb4720883f6cb05 Mon Sep 17 00:00:00 2001 From: YYY Date: Tue, 14 Dec 2021 10:18:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B8=B8=E8=A7=84=E5=8F=91=E5=B8=83=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spug_api/apps/app/models.py | 30 +++++++ spug_api/apps/app/views.py | 17 +++- spug_api/apps/deploy/utils.py | 75 ++++++++++------- spug_api/libs/healthcheck.py | 84 +++++++++++++++++++ spug_web/src/pages/deploy/app/Ext1Setup1.js | 89 ++++++++++++++++++++- 5 files changed, 264 insertions(+), 31 deletions(-) create mode 100644 spug_api/libs/healthcheck.py diff --git a/spug_api/apps/app/models.py b/spug_api/apps/app/models.py index 1ad2901..a59c7b9 100644 --- a/spug_api/apps/app/models.py +++ b/spug_api/apps/app/models.py @@ -46,12 +46,17 @@ class Deploy(models.Model, ModelMixin): extend = models.CharField(max_length=2, choices=EXTENDS) is_audit = models.BooleanField() is_parallel = models.BooleanField(default=True) + parallel_num = models.IntegerField(default=1) rst_notify = models.CharField(max_length=255, null=True) created_at = models.CharField(max_length=20, default=human_datetime) created_by = models.ForeignKey(User, models.PROTECT, related_name='+') updated_at = models.CharField(max_length=20, null=True) updated_by = models.ForeignKey(User, models.PROTECT, related_name='+', null=True) + @property + def health_check_obj(self): + return DeployHealthCheck.objects.filter(deploy=self).first() + @property def extend_obj(self): cls = DeployExtend1 if self.extend == '1' else DeployExtend2 @@ -64,6 +69,9 @@ class Deploy(models.Model, ModelMixin): deploy['host_ids'] = json.loads(self.host_ids) deploy['rst_notify'] = json.loads(self.rst_notify) deploy.update(self.extend_obj.to_dict()) + health_check = self.health_check_obj + if health_check: + deploy.update(health_check.to_dict()) return deploy def delete(self, using=None, keep_parents=False): @@ -122,3 +130,25 @@ class DeployExtend2(models.Model, ModelMixin): class Meta: db_table = 'deploy_extend2' + + +class DeployHealthCheck(models.Model, ModelMixin): + deploy = models.OneToOneField(Deploy, primary_key=True, on_delete=models.CASCADE) + is_health_check_enabled = models.BooleanField(default=False) + is_http_check = models.BooleanField(default=True) + check_port = models.IntegerField(default=8080) + check_path = models.CharField(null=True, max_length=255, default="/healthz") + check_retry = models.IntegerField(default=3) + check_interval = models.IntegerField(default=60) + check_timeout = models.IntegerField(default=30) + check_failed_action = models.IntegerField(default=0, help_text="0 终止发布 1 忽略继续") + + def to_dict(self, *args, **kwargs): + tmp = super().to_dict(*args, **kwargs) + return tmp + + def __repr__(self): + return '' % self.deploy_id + + class Meta: + db_table = 'deploy_health_check' \ No newline at end of file diff --git a/spug_api/apps/app/views.py b/spug_api/apps/app/views.py index 45aebdd..58099ed 100644 --- a/spug_api/apps/app/views.py +++ b/spug_api/apps/app/views.py @@ -4,7 +4,7 @@ from django.views.generic import View from django.db.models import F from libs import JsonParser, Argument, json_response, auth -from apps.app.models import App, Deploy, DeployExtend1, DeployExtend2 +from apps.app.models import App, Deploy, DeployExtend1, DeployExtend2, DeployHealthCheck from apps.config.models import Config, ConfigHistory from apps.app.utils import fetch_versions, remove_repo from apps.setting.utils import AppSetting @@ -117,6 +117,7 @@ class DeployView(View): Argument('rst_notify', type=dict, help='请选择发布结果通知方式'), Argument('extend', filter=lambda x: x in dict(Deploy.EXTENDS), help='请选择发布类型'), Argument('is_parallel', type=bool, default=True), + Argument('parallel_num', type=int, default=1), Argument('is_audit', type=bool, default=False) ).parse(request.body) if error is None: @@ -125,6 +126,18 @@ class DeployView(View): return json_response(error='应用在该环境下已经存在发布配置') form.host_ids = json.dumps(form.host_ids) form.rst_notify = json.dumps(form.rst_notify) + health_check_form, error = JsonParser( + Argument('is_health_check_enabled', type=bool, default=False), + Argument('is_http_check', type=bool, default=False), + Argument('check_port', type=int, default=8080, help='健康检查端口'), + Argument('check_path', type=str, required=False, help='健康检查url'), + Argument('check_retry', type=int, default=3), + Argument('check_interval', type=int, default=60), + Argument('check_timeout', type=int, default=30), + Argument('check_failed_action', type=int, default=0) + ).parse(request.body) + if error: + return json_response(error=error) if form.extend == '1': extend_form, error = JsonParser( Argument('git_repo', handler=str.strip, help='请输入git仓库地址'), @@ -147,9 +160,11 @@ class DeployView(View): remove_repo(form.id) Deploy.objects.filter(pk=form.id).update(**form) DeployExtend1.objects.filter(deploy_id=form.id).update(**extend_form) + DeployHealthCheck.objects.filter(deploy_id=form.id).update(**health_check_form) else: deploy = Deploy.objects.create(created_by=request.user, **form) DeployExtend1.objects.create(deploy=deploy, **extend_form) + DeployHealthCheck.objects.create(deploy=deploy, **health_check_form) elif form.extend == '2': extend_form, error = JsonParser( Argument('server_actions', type=list, help='请输入执行动作'), diff --git a/spug_api/apps/deploy/utils.py b/spug_api/apps/deploy/utils.py index e3afca6..0b5f579 100644 --- a/spug_api/apps/deploy/utils.py +++ b/spug_api/apps/deploy/utils.py @@ -5,6 +5,7 @@ from django_redis import get_redis_connection from django.conf import settings from django.db import close_old_connections from libs.utils import AttrDict, human_time +from libs.healthcheck import HealthCheck from apps.host.models import Host from apps.config.utils import compose_configs from apps.repository.models import Repository @@ -12,6 +13,7 @@ from apps.repository.utils import dispatch as build_repository from apps.deploy.helper import Helper, SpugError from concurrent import futures from functools import partial +import queue import json import uuid import os @@ -75,6 +77,7 @@ def _ext1_deploy(req, helper, env): build_repository(rep, helper) req.repository = rep extend = req.deploy.extend_obj + hc_obj = req.deploy.health_check_obj env.update(SPUG_DST_DIR=extend.dst_dir) extras = json.loads(req.extra) if extras[0] == 'repository': @@ -83,35 +86,31 @@ def _ext1_deploy(req, helper, env): env.update(SPUG_GIT_BRANCH=extras[1], SPUG_GIT_COMMIT_ID=extras[2]) else: env.update(SPUG_GIT_TAG=extras[1]) - if req.deploy.is_parallel: - threads, latest_exception = [], None - max_workers = max(10, os.cpu_count() * 5) - with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - for h_id in json.loads(req.host_ids): - new_env = AttrDict(env.items()) - t = executor.submit(_deploy_ext1_host, req, helper, h_id, new_env) - t.h_id = h_id - threads.append(t) - for t in futures.as_completed(threads): - exception = t.exception() - if exception: - latest_exception = exception - if not isinstance(exception, SpugError): - helper.send_error(t.h_id, f'Exception: {exception}', False) - if latest_exception: - raise latest_exception - else: - host_ids = sorted(json.loads(req.host_ids), reverse=True) - while host_ids: - h_id = host_ids.pop() + threads, latest_exception = [], None + max_workers = max(10, os.cpu_count() * 5) if req.deploy.is_parallel else req.deploy.parallel_num + exception_queue = queue.Queue() + check_failed_action = 1 if not hc_obj else hc_obj.check_failed_action + + with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + for h_id in json.loads(req.host_ids): new_env = AttrDict(env.items()) - try: - _deploy_ext1_host(req, helper, h_id, new_env) - except Exception as e: - helper.send_error(h_id, f'Exception: {e}', False) - for h_id in host_ids: - helper.send_error(h_id, '终止发布', False) - raise e + t = executor.submit(_deploy_ext1_host, req, helper, h_id, new_env, exception_queue, check_failed_action) + t.h_id = h_id + threads.append(t) + futures.wait(threads, return_when=futures.FIRST_EXCEPTION) + for t in reversed(threads): + t.cancel() + futures.wait(threads, return_when=futures.ALL_COMPLETED) + for t in threads: + if "finished returned NoneType" in str(t) or t.cancelled(): + helper.send_error(t.h_id, '终止发布', False) + elif "finished raised SpugError" in str(t): + exception = t.exception() + latest_exception = exception + helper.send_error(t.h_id, f'Exception: {exception}', False) + if latest_exception: + raise latest_exception + def _ext2_deploy(req, helper, env): @@ -193,8 +192,12 @@ def _ext2_deploy(req, helper, env): helper.send_step('local', 100, f'\r\n{human_time()} ** 发布成功 **') -def _deploy_ext1_host(req, helper, h_id, env): +def _deploy_ext1_host(req, helper, h_id, env, exception_queue, failed_action=1): + # 异常队列不为空 且动作是中断发布 0 中断所有 1 忽略继续 + if not exception_queue.empty() and failed_action == 0: + return extend = req.deploy.extend_obj + hc_obj = req.deploy.health_check_obj helper.send_step(h_id, 1, f'\033[32m就绪√\033[0m\r\n{human_time()} 数据准备... ') host = Host.objects.filter(pk=h_id).first() if not host: @@ -241,7 +244,21 @@ def _deploy_ext1_host(req, helper, h_id, env): command = f'cd {extend.dst_dir} && {extend.hook_post_host}' helper.remote(host.id, ssh, command) + # healthcheck + if hc_obj: + if hc_obj.is_health_check_enabled: + helper.send_step(h_id, 4, f'{human_time()} 执行健康检查... \r\n') + hc = HealthCheck(host=host.hostname, host_id=h_id, helper=helper, **hc_obj.__dict__) + health_status = hc.is_health() + if not health_status and failed_action == 0: + exception_queue.put("quit") + helper.send_error(h_id, f'{human_time()} 健康检查失败... \r\n') + # raise Exception + else: + helper.send_step(h_id, 4, f'{human_time()} 健康检查通过... \r\n') + helper.send_step(h_id, 100, f'\r\n{human_time()} ** \033[32m发布成功\033[0m **') + return True def _deploy_ext2_host(helper, h_id, actions, env, spug_version): diff --git a/spug_api/libs/healthcheck.py b/spug_api/libs/healthcheck.py new file mode 100644 index 0000000..90c37cf --- /dev/null +++ b/spug_api/libs/healthcheck.py @@ -0,0 +1,84 @@ +# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug +# Copyright: (c) +# Released under the AGPL-3.0 License. +import socket +import requests +from time import sleep +from datetime import datetime + + +class HealthCheck: + + def __init__(self, host, host_id=None, helper=None, url=None, is_https=False, **kwargs): + self.host = host + self.port = kwargs['check_port'] + self.retry = kwargs['check_retry'] + self.path = kwargs['check_path'] + self.timeout = kwargs['check_timeout'] + self.interval = kwargs['check_interval'] + self.is_http_check = kwargs['is_http_check'] + if not url and self.is_http_check: + self.url = "{protocol}://{host}:{port}{path}".format( + protocol="https" if is_https else "http", + host=self.host, + port=self.port, + path=self.path if self.path.startswith('/') else '/'+self.path + ) + else: + self.url = url + self.host_id = host_id + self.helper = helper + + def notify(self, message): + if self.helper: + self.helper.send_step(self.host_id, 4, "{} {} \r\n".format(datetime.now().strftime('%H:%M:%S'), message)) + else: + print(message) + + def is_health(self) -> bool: + if self.is_http_check: + return self.health_check_with_http() + else: + return self.health_check_with_tcp() + + def health_check_with_tcp(self) -> bool: + for i in range(self.retry): + self.notify("第{}次TCP健康检查".format(i+1)) + if self.is_tcp_can_connect(): + return True + if i < self.retry-1: + sleep(self.interval) + return False + + def health_check_with_http(self) -> bool: + for i in range(self.retry): + self.notify("第{}次HTTP健康检查".format(i+1)) + if self.is_http_status_in_2xx_3xx(): + return True + if i < self.retry-1: + sleep(self.interval) + return False + + def is_tcp_can_connect(self) -> bool: + s = socket.socket() + s.settimeout(self.timeout) + try: + s.connect((self.host, self.port)) + self.notify("第TCP健康检查:{}:{} Connected".format(self.host, self.port)) + s.close() + return True + except socket.error as e: + self.notify("TCP健康检查:{}:{} --- {}".format(self.host, self.port, e)) + return False + + def is_http_status_in_2xx_3xx(self) -> bool: + try: + resp = requests.get(self.url, timeout=self.timeout) + self.notify("HTTP健康检查:{} status_code:{}".format(self.url, resp.status_code)) + # 判断是否2xx 3xx + return 200 >= resp.status_code <= 399 + except requests.exceptions.RequestException as e: + self.notify("HTTP健康检查:{} --- {}".format(self.url, e)) + return False + + diff --git a/spug_web/src/pages/deploy/app/Ext1Setup1.js b/spug_web/src/pages/deploy/app/Ext1Setup1.js index 4ff434c..6c84e18 100644 --- a/spug_web/src/pages/deploy/app/Ext1Setup1.js +++ b/spug_web/src/pages/deploy/app/Ext1Setup1.js @@ -6,7 +6,7 @@ import React, { useEffect, useState } from 'react'; import { observer } from 'mobx-react'; import { Link } from 'react-router-dom'; -import { Switch, Form, Input, Select, Button, Radio } from 'antd'; +import { Switch, Form, Input, Select, Button, Radio, InputNumber } from 'antd'; import envStore from 'pages/config/environment/store'; import Selector from 'pages/host/Selector'; import store from './store'; @@ -77,6 +77,93 @@ export default observer(function Ext1Setup1() { 串行 + { + !info['is_parallel']?( + + info['parallel_num'] = value} + /> + + ): null + } + + info['is_health_check_enabled'] = v}/> + + { + info['is_health_check_enabled']?( + <> + + info['is_http_check'] = e.target.value}> + TCP + HTTP + + + + info['check_port'] = value} /> + + { + info.is_http_check?( + + info['check_path'] = e.target.value} /> + + ):null + } + + info['check_retry'] = value} + /> + + + info['check_interval'] = value} + addonAfter="秒" + style={{ width: 120}} + /> + + + info['check_timeout'] = value} + addonAfter="秒" + style={{ width: 120}} + /> + + + + + + ):null + }