diff --git a/spug_api/apps/deploy/models.py b/spug_api/apps/deploy/models.py index 104ae65..d992fb2 100644 --- a/spug_api/apps/deploy/models.py +++ b/spug_api/apps/deploy/models.py @@ -8,8 +8,9 @@ class DeployRequest(models.Model, ModelMixin): STATUS = ( ('-3', '发布异常'), ('-1', '已驳回'), - ('1', '待审核'), - ('2', '待发布'), + ('0', '待审核'), + ('1', '待发布'), + ('2', '发布中'), ('3', '发布成功'), ) app = models.ForeignKey(App, on_delete=models.CASCADE) @@ -19,6 +20,7 @@ class DeployRequest(models.Model, ModelMixin): desc = models.CharField(max_length=255, null=True) status = models.CharField(max_length=2, choices=STATUS) reason = models.CharField(max_length=255, null=True) + version = models.CharField(max_length=50, null=True) created_at = models.CharField(max_length=20, default=human_datetime) created_by = models.ForeignKey(User, models.PROTECT, related_name='+') diff --git a/spug_api/apps/deploy/utils.py b/spug_api/apps/deploy/utils.py index e259ca9..5071f60 100644 --- a/spug_api/apps/deploy/utils.py +++ b/spug_api/apps/deploy/utils.py @@ -2,8 +2,7 @@ from django_redis import get_redis_connection from django.conf import settings from libs.utils import AttrDict, human_time from apps.host.models import Host -from datetime import datetime -from threading import Thread +from concurrent import futures import socket import subprocess import json @@ -13,25 +12,30 @@ REPOS_DIR = settings.REPOS_DIR def deploy_dispatch(request, req, token): - now = datetime.now() rds = get_redis_connection() - helper = Helper(rds, token) - helper.send_step('local', 1, f'完成\r\n{human_time()} 发布准备... ') - rds.expire(token, 60 * 60) - env = AttrDict( - APP_NAME=req.app.name, - APP_ID=str(req.app_id), - TASK_NAME=req.name, - TASK_ID=str(req.id), - VERSION=f'{req.app_id}_{req.id}_{now.strftime("%Y%m%d%H%M%S")}', - TIME=str(now.strftime(r'%Y-%m-%d\ %H:%M:%S')) - ) - if req.app.extend == '1': - env.update(json.loads(req.app.extend_obj.custom_envs)) - helper.local(f'cd {REPOS_DIR} && rm -rf {req.app_id}_*') - _ext1_deploy(request, req, helper, env) - else: - _ext2_deploy(request, req, helper, env) + try: + helper = Helper(rds, token) + helper.send_step('local', 1, f'完成\r\n{human_time()} 发布准备... ') + rds.expire(token, 60 * 60) + env = AttrDict( + APP_NAME=req.app.name, + APP_ID=str(req.app_id), + TASK_NAME=req.name, + TASK_ID=str(req.id), + VERSION=req.version + ) + if req.app.extend == '1': + env.update(json.loads(req.app.extend_obj.custom_envs)) + _ext1_deploy(request, req, helper, env) + else: + _ext2_deploy(request, req, helper, env) + req.status = '3' + except Exception as e: + req.status = '-3' + raise e + finally: + rds.close() + req.save() def _ext1_deploy(request, req, helper, env): @@ -44,6 +48,7 @@ def _ext1_deploy(request, req, helper, env): else: tree_ish = extras[1] env.update(TAG=extras[1]) + helper.local(f'cd {REPOS_DIR} && rm -rf {req.app_id}_*') helper.send_step('local', 1, '完成\r\n') if extend.hook_pre_server: @@ -70,8 +75,13 @@ def _ext1_deploy(request, req, helper, env): contain = ' '.join(f'{env.VERSION}/{x}' for x in files) helper.local(f'cd {REPOS_DIR} && tar zcf {env.VERSION}.tar.gz {exclude} {contain}') helper.send_step('local', 6, f'完成') - for h_id in json.loads(req.host_ids): - Thread(target=_deploy_host, args=(helper, h_id, extend, env)).start() + with futures.ThreadPoolExecutor(max_workers=min(16, os.cpu_count() + 4)) as executor: + threads = [] + for h_id in json.loads(req.host_ids): + threads.append(executor.submit(_deploy_host, helper, h_id, extend, env)) + for t in futures.as_completed(threads): + if t.exception(): + raise t.exception() def _ext2_deploy(request, req, helper, env): @@ -89,7 +99,7 @@ def _deploy_host(helper, h_id, extend, env): helper.send_error(host.id, f'please make sure the {extend.dst_dir!r} is not exists.') # clean clean_command = f'ls -rd {env.APP_ID}_* | tail -n +{extend.versions + 1} | xargs rm -rf' - helper.remote(host.id, ssh, f'cd {extend.dst_repo} && {clean_command}') + helper.remote(host.id, ssh, f'cd {extend.dst_repo} && rm -rf {env.VERSION} && {clean_command}') # transfer files tar_gz_file = f'{env.VERSION}.tar.gz' try: @@ -149,7 +159,7 @@ class Helper: self.rds.rpush(self.token, json.dumps({'key': key, 'step': step, 'data': data})) def local(self, command, env=None): - print(f'helper.local: {command!r}') + # print(f'helper.local: {command!r}') task = subprocess.Popen(command, env=env, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while True: message = task.stdout.readline() @@ -160,7 +170,7 @@ class Helper: self.send_error('local', f'exit code: {task.returncode}') def remote(self, key, ssh, command, env=None): - print(f'helper.remote: {command!r} env: {env!r}') + # print(f'helper.remote: {command!r} env: {env!r}') code = -1 try: for code, out in ssh.exec_command_with_stream(command, environment=env): diff --git a/spug_api/apps/deploy/views.py b/spug_api/apps/deploy/views.py index f012376..23f9349 100644 --- a/spug_api/apps/deploy/views.py +++ b/spug_api/apps/deploy/views.py @@ -6,6 +6,7 @@ from apps.deploy.utils import deploy_dispatch from apps.app.models import App from apps.host.models import Host from threading import Thread +from datetime import datetime import json import uuid @@ -44,7 +45,7 @@ class RequestView(View): app = App.objects.filter(pk=form.app_id).first() if not app: return json_response(error='未找到该应用') - form.status = '1' if app.is_audit else '2' + form.status = '0' if app.is_audit else '1' form.extra = json.dumps(form.extra) form.host_ids = json.dumps(form.host_ids) if form.id: @@ -74,14 +75,18 @@ class RequestDetailView(View): req = DeployRequest.objects.filter(pk=r_id).first() if not req: return json_response(error='未找到指定发布申请') - if req.status != '2': + if req.status not in ('1', '-3'): return json_response(error='该申请单当前状态还不能执行发布') hosts = Host.objects.filter(id__in=json.loads(req.host_ids)) token = uuid.uuid4().hex - Thread(target=deploy_dispatch, args=(request, req, token)).start() outputs = {str(x.id): {'data': ''} for x in hosts} outputs.update(local={'data': f'{human_time()} 建立接连... '}) targets = [{'id': x.id, 'title': f'{x.name}({x.hostname}:{x.port})'} for x in hosts] + req.status = '2' + if not req.version: + req.version = f'{req.app_id}_{req.id}_{datetime.now().strftime("%Y%m%d%H%M%S")}' + req.save() + Thread(target=deploy_dispatch, args=(request, req, token)).start() return json_response({'token': token, 'outputs': outputs, 'targets': targets}) def patch(self, request, r_id): @@ -95,11 +100,11 @@ class RequestDetailView(View): return json_response(error='未找到指定申请') if not form.is_pass and not form.reason: return json_response(error='请输入驳回原因') - if req.status != '1': + if req.status != '0': return json_response(error='该申请当前状态不允许审核') req.approve_at = human_datetime() req.approve_by = request.user - req.status = '2' if form.is_pass else '-1' + req.status = '1' if form.is_pass else '-1' req.reason = form.reason req.save() return json_response(error=error)