From a533683af9b1239db1686772968dabfbc25ffb21 Mon Sep 17 00:00:00 2001 From: vapao Date: Fri, 17 Jul 2020 20:02:57 +0800 Subject: [PATCH] =?UTF-8?q?U=20=E6=94=B9=E8=BF=9B=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E7=A8=B3=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spug_api/apps/deploy/utils.py | 48 ++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/spug_api/apps/deploy/utils.py b/spug_api/apps/deploy/utils.py index 57926da..f68f018 100644 --- a/spug_api/apps/deploy/utils.py +++ b/spug_api/apps/deploy/utils.py @@ -8,7 +8,6 @@ from apps.host.models import Host from apps.notify.models import Notify from concurrent import futures import requests -import socket import subprocess import json import uuid @@ -17,6 +16,10 @@ import os REPOS_DIR = settings.REPOS_DIR +class SpugError(Exception): + pass + + def deploy_dispatch(request, req, token): rds = get_redis_connection() try: @@ -92,16 +95,21 @@ def _ext1_deploy(req, helper, env): contain = ' '.join(f'{env.SPUG_VERSION}/{x}' for x in files) helper.local(f'cd {REPOS_DIR} && tar zcf {env.SPUG_VERSION}.tar.gz {exclude} {contain}') helper.send_step('local', 6, f'完成') + threads, latest_exception = [], None with futures.ThreadPoolExecutor(max_workers=min(10, os.cpu_count() + 5)) as executor: - threads = [] for h_id in json.loads(req.host_ids): env = AttrDict(env.items()) - threads.append(executor.submit(_deploy_ext1_host, helper, h_id, extend, env)) + t = executor.submit(_deploy_ext1_host, helper, h_id, extend, env) + t.h_id = h_id + threads.append(t) for t in futures.as_completed(threads): exception = t.exception() if exception: - helper.send_error(h_id, f'Exception: {exception}') - raise exception + latest_exception = exception + if not isinstance(exception, SpugError): + helper.send_error(t.h_id, f'Exception: {exception}', False) + if latest_exception: + raise latest_exception def _ext2_deploy(req, helper, env): @@ -118,15 +126,21 @@ def _ext2_deploy(req, helper, env): step += 1 helper.send_step('local', 100, '完成\r\n' if step == 2 else '\r\n') if host_actions: + threads, latest_exception = [], None with futures.ThreadPoolExecutor(max_workers=min(10, os.cpu_count() + 5)) as executor: - threads = [] for h_id in json.loads(req.host_ids): - threads.append(executor.submit(_deploy_ext2_host, helper, h_id, host_actions, env)) + env = AttrDict(env.items()) + t = executor.submit(_deploy_ext2_host, helper, h_id, host_actions, env) + t.h_id = h_id + threads.append(t) for t in futures.as_completed(threads): exception = t.exception() if exception: - helper.send_error(h_id, f'Exception: {exception}') - raise exception + latest_exception = exception + if not isinstance(exception, SpugError): + helper.send_error(t.h_id, f'Exception: {exception}', False) + if latest_exception: + raise latest_exception else: helper.send_step('local', 100, f'\r\n{human_time()} ** 发布成功 **') @@ -354,10 +368,11 @@ class Helper: def send_info(self, key, message): self._send({'key': key, 'status': 'info', 'data': message}) - def send_error(self, key, message): + def send_error(self, key, message, with_break=True): message = '\r\n' + message self._send({'key': key, 'status': 'error', 'data': message}) - raise Exception(message) + if with_break: + raise SpugError def send_step(self, key, step, data): self._send({'key': key, 'step': step, 'data': data}) @@ -378,10 +393,7 @@ class Helper: def remote(self, key, ssh, command, env=None): code = -1 - try: - for code, out in ssh.exec_command_with_stream(command, environment=env): - self.send_info(key, out) - if code != 0: - self.send_error(key, f'exit code: {code}') - except socket.timeout: - self.send_error(key, 'time out') + for code, out in ssh.exec_command_with_stream(command, environment=env): + self.send_info(key, out) + if code != 0: + self.send_error(key, f'exit code: {code}')