常规发布增加健康检查

pull/418/head
YYY 2021-12-14 10:18:15 +08:00
parent 23ef656ac8
commit df3c135dfd
5 changed files with 264 additions and 31 deletions

View File

@ -46,12 +46,17 @@ class Deploy(models.Model, ModelMixin):
extend = models.CharField(max_length=2, choices=EXTENDS)
is_audit = models.BooleanField()
is_parallel = models.BooleanField(default=True)
parallel_num = models.IntegerField(default=1)
rst_notify = models.CharField(max_length=255, null=True)
created_at = models.CharField(max_length=20, default=human_datetime)
created_by = models.ForeignKey(User, models.PROTECT, related_name='+')
updated_at = models.CharField(max_length=20, null=True)
updated_by = models.ForeignKey(User, models.PROTECT, related_name='+', null=True)
@property
def health_check_obj(self):
return DeployHealthCheck.objects.filter(deploy=self).first()
@property
def extend_obj(self):
cls = DeployExtend1 if self.extend == '1' else DeployExtend2
@ -64,6 +69,9 @@ class Deploy(models.Model, ModelMixin):
deploy['host_ids'] = json.loads(self.host_ids)
deploy['rst_notify'] = json.loads(self.rst_notify)
deploy.update(self.extend_obj.to_dict())
health_check = self.health_check_obj
if health_check:
deploy.update(health_check.to_dict())
return deploy
def delete(self, using=None, keep_parents=False):
@ -122,3 +130,25 @@ class DeployExtend2(models.Model, ModelMixin):
class Meta:
db_table = 'deploy_extend2'
class DeployHealthCheck(models.Model, ModelMixin):
deploy = models.OneToOneField(Deploy, primary_key=True, on_delete=models.CASCADE)
is_health_check_enabled = models.BooleanField(default=False)
is_http_check = models.BooleanField(default=True)
check_port = models.IntegerField(default=8080)
check_path = models.CharField(null=True, max_length=255, default="/healthz")
check_retry = models.IntegerField(default=3)
check_interval = models.IntegerField(default=60)
check_timeout = models.IntegerField(default=30)
check_failed_action = models.IntegerField(default=0, help_text="0 终止发布 1 忽略继续")
def to_dict(self, *args, **kwargs):
tmp = super().to_dict(*args, **kwargs)
return tmp
def __repr__(self):
return '<DeployHealthCheck deploy_id=%r>' % self.deploy_id
class Meta:
db_table = 'deploy_health_check'

View File

@ -4,7 +4,7 @@
from django.views.generic import View
from django.db.models import F
from libs import JsonParser, Argument, json_response, auth
from apps.app.models import App, Deploy, DeployExtend1, DeployExtend2
from apps.app.models import App, Deploy, DeployExtend1, DeployExtend2, DeployHealthCheck
from apps.config.models import Config, ConfigHistory
from apps.app.utils import fetch_versions, remove_repo
from apps.setting.utils import AppSetting
@ -117,6 +117,7 @@ class DeployView(View):
Argument('rst_notify', type=dict, help='请选择发布结果通知方式'),
Argument('extend', filter=lambda x: x in dict(Deploy.EXTENDS), help='请选择发布类型'),
Argument('is_parallel', type=bool, default=True),
Argument('parallel_num', type=int, default=1),
Argument('is_audit', type=bool, default=False)
).parse(request.body)
if error is None:
@ -125,6 +126,18 @@ class DeployView(View):
return json_response(error='应用在该环境下已经存在发布配置')
form.host_ids = json.dumps(form.host_ids)
form.rst_notify = json.dumps(form.rst_notify)
health_check_form, error = JsonParser(
Argument('is_health_check_enabled', type=bool, default=False),
Argument('is_http_check', type=bool, default=False),
Argument('check_port', type=int, default=8080, help='健康检查端口'),
Argument('check_path', type=str, required=False, help='健康检查url'),
Argument('check_retry', type=int, default=3),
Argument('check_interval', type=int, default=60),
Argument('check_timeout', type=int, default=30),
Argument('check_failed_action', type=int, default=0)
).parse(request.body)
if error:
return json_response(error=error)
if form.extend == '1':
extend_form, error = JsonParser(
Argument('git_repo', handler=str.strip, help='请输入git仓库地址'),
@ -147,9 +160,11 @@ class DeployView(View):
remove_repo(form.id)
Deploy.objects.filter(pk=form.id).update(**form)
DeployExtend1.objects.filter(deploy_id=form.id).update(**extend_form)
DeployHealthCheck.objects.filter(deploy_id=form.id).update(**health_check_form)
else:
deploy = Deploy.objects.create(created_by=request.user, **form)
DeployExtend1.objects.create(deploy=deploy, **extend_form)
DeployHealthCheck.objects.create(deploy=deploy, **health_check_form)
elif form.extend == '2':
extend_form, error = JsonParser(
Argument('server_actions', type=list, help='请输入执行动作'),

View File

@ -5,6 +5,7 @@ from django_redis import get_redis_connection
from django.conf import settings
from django.db import close_old_connections
from libs.utils import AttrDict, human_time
from libs.healthcheck import HealthCheck
from apps.host.models import Host
from apps.config.utils import compose_configs
from apps.repository.models import Repository
@ -12,6 +13,7 @@ from apps.repository.utils import dispatch as build_repository
from apps.deploy.helper import Helper, SpugError
from concurrent import futures
from functools import partial
import queue
import json
import uuid
import os
@ -75,6 +77,7 @@ def _ext1_deploy(req, helper, env):
build_repository(rep, helper)
req.repository = rep
extend = req.deploy.extend_obj
hc_obj = req.deploy.health_check_obj
env.update(SPUG_DST_DIR=extend.dst_dir)
extras = json.loads(req.extra)
if extras[0] == 'repository':
@ -83,35 +86,31 @@ def _ext1_deploy(req, helper, env):
env.update(SPUG_GIT_BRANCH=extras[1], SPUG_GIT_COMMIT_ID=extras[2])
else:
env.update(SPUG_GIT_TAG=extras[1])
if req.deploy.is_parallel:
threads, latest_exception = [], None
max_workers = max(10, os.cpu_count() * 5)
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for h_id in json.loads(req.host_ids):
new_env = AttrDict(env.items())
t = executor.submit(_deploy_ext1_host, req, helper, h_id, new_env)
t.h_id = h_id
threads.append(t)
for t in futures.as_completed(threads):
exception = t.exception()
if exception:
latest_exception = exception
if not isinstance(exception, SpugError):
helper.send_error(t.h_id, f'Exception: {exception}', False)
if latest_exception:
raise latest_exception
else:
host_ids = sorted(json.loads(req.host_ids), reverse=True)
while host_ids:
h_id = host_ids.pop()
threads, latest_exception = [], None
max_workers = max(10, os.cpu_count() * 5) if req.deploy.is_parallel else req.deploy.parallel_num
exception_queue = queue.Queue()
check_failed_action = 1 if not hc_obj else hc_obj.check_failed_action
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for h_id in json.loads(req.host_ids):
new_env = AttrDict(env.items())
try:
_deploy_ext1_host(req, helper, h_id, new_env)
except Exception as e:
helper.send_error(h_id, f'Exception: {e}', False)
for h_id in host_ids:
helper.send_error(h_id, '终止发布', False)
raise e
t = executor.submit(_deploy_ext1_host, req, helper, h_id, new_env, exception_queue, check_failed_action)
t.h_id = h_id
threads.append(t)
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in reversed(threads):
t.cancel()
futures.wait(threads, return_when=futures.ALL_COMPLETED)
for t in threads:
if "finished returned NoneType" in str(t) or t.cancelled():
helper.send_error(t.h_id, '终止发布', False)
elif "finished raised SpugError" in str(t):
exception = t.exception()
latest_exception = exception
helper.send_error(t.h_id, f'Exception: {exception}', False)
if latest_exception:
raise latest_exception
def _ext2_deploy(req, helper, env):
@ -193,8 +192,12 @@ def _ext2_deploy(req, helper, env):
helper.send_step('local', 100, f'\r\n{human_time()} ** 发布成功 **')
def _deploy_ext1_host(req, helper, h_id, env):
def _deploy_ext1_host(req, helper, h_id, env, exception_queue, failed_action=1):
# 异常队列不为空 且动作是中断发布 0 中断所有 1 忽略继续
if not exception_queue.empty() and failed_action == 0:
return
extend = req.deploy.extend_obj
hc_obj = req.deploy.health_check_obj
helper.send_step(h_id, 1, f'\033[32m就绪√\033[0m\r\n{human_time()} 数据准备... ')
host = Host.objects.filter(pk=h_id).first()
if not host:
@ -241,7 +244,21 @@ def _deploy_ext1_host(req, helper, h_id, env):
command = f'cd {extend.dst_dir} && {extend.hook_post_host}'
helper.remote(host.id, ssh, command)
# healthcheck
if hc_obj:
if hc_obj.is_health_check_enabled:
helper.send_step(h_id, 4, f'{human_time()} 执行健康检查... \r\n')
hc = HealthCheck(host=host.hostname, host_id=h_id, helper=helper, **hc_obj.__dict__)
health_status = hc.is_health()
if not health_status and failed_action == 0:
exception_queue.put("quit")
helper.send_error(h_id, f'{human_time()} 健康检查失败... \r\n')
# raise Exception
else:
helper.send_step(h_id, 4, f'{human_time()} 健康检查通过... \r\n')
helper.send_step(h_id, 100, f'\r\n{human_time()} ** \033[32m发布成功\033[0m **')
return True
def _deploy_ext2_host(helper, h_id, actions, env, spug_version):

View File

@ -0,0 +1,84 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
import socket
import requests
from time import sleep
from datetime import datetime
class HealthCheck:
def __init__(self, host, host_id=None, helper=None, url=None, is_https=False, **kwargs):
self.host = host
self.port = kwargs['check_port']
self.retry = kwargs['check_retry']
self.path = kwargs['check_path']
self.timeout = kwargs['check_timeout']
self.interval = kwargs['check_interval']
self.is_http_check = kwargs['is_http_check']
if not url and self.is_http_check:
self.url = "{protocol}://{host}:{port}{path}".format(
protocol="https" if is_https else "http",
host=self.host,
port=self.port,
path=self.path if self.path.startswith('/') else '/'+self.path
)
else:
self.url = url
self.host_id = host_id
self.helper = helper
def notify(self, message):
if self.helper:
self.helper.send_step(self.host_id, 4, "{} {} \r\n".format(datetime.now().strftime('%H:%M:%S'), message))
else:
print(message)
def is_health(self) -> bool:
if self.is_http_check:
return self.health_check_with_http()
else:
return self.health_check_with_tcp()
def health_check_with_tcp(self) -> bool:
for i in range(self.retry):
self.notify("{}次TCP健康检查".format(i+1))
if self.is_tcp_can_connect():
return True
if i < self.retry-1:
sleep(self.interval)
return False
def health_check_with_http(self) -> bool:
for i in range(self.retry):
self.notify("{}次HTTP健康检查".format(i+1))
if self.is_http_status_in_2xx_3xx():
return True
if i < self.retry-1:
sleep(self.interval)
return False
def is_tcp_can_connect(self) -> bool:
s = socket.socket()
s.settimeout(self.timeout)
try:
s.connect((self.host, self.port))
self.notify("第TCP健康检查:{}:{} Connected".format(self.host, self.port))
s.close()
return True
except socket.error as e:
self.notify("TCP健康检查:{}:{} --- {}".format(self.host, self.port, e))
return False
def is_http_status_in_2xx_3xx(self) -> bool:
try:
resp = requests.get(self.url, timeout=self.timeout)
self.notify("HTTP健康检查{} status_code:{}".format(self.url, resp.status_code))
# 判断是否2xx 3xx
return 200 >= resp.status_code <= 399
except requests.exceptions.RequestException as e:
self.notify("HTTP健康检查{} --- {}".format(self.url, e))
return False

View File

@ -6,7 +6,7 @@
import React, { useEffect, useState } from 'react';
import { observer } from 'mobx-react';
import { Link } from 'react-router-dom';
import { Switch, Form, Input, Select, Button, Radio } from 'antd';
import { Switch, Form, Input, Select, Button, Radio, InputNumber } from 'antd';
import envStore from 'pages/config/environment/store';
import Selector from 'pages/host/Selector';
import store from './store';
@ -77,6 +77,93 @@ export default observer(function Ext1Setup1() {
<Radio.Button value={false}>串行</Radio.Button>
</Radio.Group>
</Form.Item>
{
!info['is_parallel']?(
<Form.Item label="串行并发">
<InputNumber min={1} max={info.host_ids.length}
defaultValue={1}
value={info.parallel_num}
onChange={(value) => info['parallel_num'] = value}
/>
</Form.Item>
): null
}
<Form.Item label="健康检查">
<Switch
disabled={store.isReadOnly}
checkedChildren="开启"
unCheckedChildren="关闭"
checked={info['is_health_check_enabled']}
onChange={v => info['is_health_check_enabled'] = v}/>
</Form.Item>
{
info['is_health_check_enabled']?(
<>
<Form.Item label="健康检查方式">
<Radio.Group
buttonStyle="solid"
defaultValue={false}
value={info.is_http_check}
onChange={(e) => info['is_http_check'] = e.target.value}>
<Radio.Button value={false}>TCP</Radio.Button>
<Radio.Button value={true}>HTTP</Radio.Button>
</Radio.Group>
</Form.Item>
<Form.Item label="健康检查端口">
<InputNumber placeholder="8080" style={{ width: 120}} value={info.check_port} onChange={(value) => info['check_port'] = value} />
</Form.Item>
{
info.is_http_check?(
<Form.Item label="健康检查URL">
<Input placeholder="/healthz" style={{ width: 500}} value={info.check_path} onChange={(e) => info['check_path'] = e.target.value} />
</Form.Item>
):null
}
<Form.Item label="健康检查重试次数">
<InputNumber
min={1}
max={100}
defaultValue={3}
value={info.check_retry}
addonAfter="次"
style={{ width: 120}}
onChange={(value) => info['check_retry'] = value}
/>
</Form.Item>
<Form.Item label="健康检查间隔时间">
<InputNumber
min={1}
defaultValue={60}
value={info.check_interval}
onChange={(value) => info['check_interval'] = value}
addonAfter="秒"
style={{ width: 120}}
/>
</Form.Item>
<Form.Item label="健康检查超时时间">
<InputNumber
min={1}
defaultValue={30}
value={info.check_timeout}
onChange={(value) => info['check_timeout'] = value}
addonAfter="秒"
style={{ width: 120}}
/>
</Form.Item>
<Form.Item label="健康检查失败">
<Select
defaultValue={0}
value={info.check_failed_action}
onChange={(e) => info['check_failed_action'] = e.target.value}
style={{width: 120}}
>
<Select.Option value={0}>终止发布</Select.Option>
<Select.Option value={1}>忽略继续</Select.Option>
</Select>
</Form.Item>
</>
):null
}
<Form.Item label="发布审核">
<Switch
disabled={store.isReadOnly}