U 改进发布稳定性

pull/154/head
vapao 2020-07-17 20:02:57 +08:00
parent a046679ea7
commit a533683af9
1 changed files with 30 additions and 18 deletions

View File

@ -8,7 +8,6 @@ from apps.host.models import Host
from apps.notify.models import Notify from apps.notify.models import Notify
from concurrent import futures from concurrent import futures
import requests import requests
import socket
import subprocess import subprocess
import json import json
import uuid import uuid
@ -17,6 +16,10 @@ import os
REPOS_DIR = settings.REPOS_DIR REPOS_DIR = settings.REPOS_DIR
class SpugError(Exception):
pass
def deploy_dispatch(request, req, token): def deploy_dispatch(request, req, token):
rds = get_redis_connection() rds = get_redis_connection()
try: try:
@ -92,16 +95,21 @@ def _ext1_deploy(req, helper, env):
contain = ' '.join(f'{env.SPUG_VERSION}/{x}' for x in files) contain = ' '.join(f'{env.SPUG_VERSION}/{x}' for x in files)
helper.local(f'cd {REPOS_DIR} && tar zcf {env.SPUG_VERSION}.tar.gz {exclude} {contain}') helper.local(f'cd {REPOS_DIR} && tar zcf {env.SPUG_VERSION}.tar.gz {exclude} {contain}')
helper.send_step('local', 6, f'完成') helper.send_step('local', 6, f'完成')
threads, latest_exception = [], None
with futures.ThreadPoolExecutor(max_workers=min(10, os.cpu_count() + 5)) as executor: with futures.ThreadPoolExecutor(max_workers=min(10, os.cpu_count() + 5)) as executor:
threads = []
for h_id in json.loads(req.host_ids): for h_id in json.loads(req.host_ids):
env = AttrDict(env.items()) env = AttrDict(env.items())
threads.append(executor.submit(_deploy_ext1_host, helper, h_id, extend, env)) t = executor.submit(_deploy_ext1_host, helper, h_id, extend, env)
t.h_id = h_id
threads.append(t)
for t in futures.as_completed(threads): for t in futures.as_completed(threads):
exception = t.exception() exception = t.exception()
if exception: if exception:
helper.send_error(h_id, f'Exception: {exception}') latest_exception = exception
raise exception if not isinstance(exception, SpugError):
helper.send_error(t.h_id, f'Exception: {exception}', False)
if latest_exception:
raise latest_exception
def _ext2_deploy(req, helper, env): def _ext2_deploy(req, helper, env):
@ -118,15 +126,21 @@ def _ext2_deploy(req, helper, env):
step += 1 step += 1
helper.send_step('local', 100, '完成\r\n' if step == 2 else '\r\n') helper.send_step('local', 100, '完成\r\n' if step == 2 else '\r\n')
if host_actions: if host_actions:
threads, latest_exception = [], None
with futures.ThreadPoolExecutor(max_workers=min(10, os.cpu_count() + 5)) as executor: with futures.ThreadPoolExecutor(max_workers=min(10, os.cpu_count() + 5)) as executor:
threads = []
for h_id in json.loads(req.host_ids): for h_id in json.loads(req.host_ids):
threads.append(executor.submit(_deploy_ext2_host, helper, h_id, host_actions, env)) env = AttrDict(env.items())
t = executor.submit(_deploy_ext2_host, helper, h_id, host_actions, env)
t.h_id = h_id
threads.append(t)
for t in futures.as_completed(threads): for t in futures.as_completed(threads):
exception = t.exception() exception = t.exception()
if exception: if exception:
helper.send_error(h_id, f'Exception: {exception}') latest_exception = exception
raise exception if not isinstance(exception, SpugError):
helper.send_error(t.h_id, f'Exception: {exception}', False)
if latest_exception:
raise latest_exception
else: else:
helper.send_step('local', 100, f'\r\n{human_time()} ** 发布成功 **') helper.send_step('local', 100, f'\r\n{human_time()} ** 发布成功 **')
@ -354,10 +368,11 @@ class Helper:
def send_info(self, key, message): def send_info(self, key, message):
self._send({'key': key, 'status': 'info', 'data': message}) self._send({'key': key, 'status': 'info', 'data': message})
def send_error(self, key, message): def send_error(self, key, message, with_break=True):
message = '\r\n' + message message = '\r\n' + message
self._send({'key': key, 'status': 'error', 'data': message}) self._send({'key': key, 'status': 'error', 'data': message})
raise Exception(message) if with_break:
raise SpugError
def send_step(self, key, step, data): def send_step(self, key, step, data):
self._send({'key': key, 'step': step, 'data': data}) self._send({'key': key, 'step': step, 'data': data})
@ -378,10 +393,7 @@ class Helper:
def remote(self, key, ssh, command, env=None): def remote(self, key, ssh, command, env=None):
code = -1 code = -1
try:
for code, out in ssh.exec_command_with_stream(command, environment=env): for code, out in ssh.exec_command_with_stream(command, environment=env):
self.send_info(key, out) self.send_info(key, out)
if code != 0: if code != 0:
self.send_error(key, f'exit code: {code}') self.send_error(key, f'exit code: {code}')
except socket.timeout:
self.send_error(key, 'time out')