mirror of https://github.com/openspug/spug
232 lines
7.9 KiB
Python
232 lines
7.9 KiB
Python
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
|
|
# Copyright: (c) <spug.dev@gmail.com>
|
|
# Released under the AGPL-3.0 License.
|
|
from django.conf import settings
|
|
from django.template.defaultfilters import filesizeformat
|
|
from django_redis import get_redis_connection
|
|
from libs.utils import SpugError, human_time, render_str, human_seconds_time
|
|
from apps.config.utils import update_config_by_var
|
|
from collections import defaultdict
|
|
import time
|
|
import json
|
|
import os
|
|
import re
|
|
|
|
|
|
class KitMixin:
|
|
regex = re.compile(r'^((\r\n)*)(.*?)((\r\n)*)$', re.DOTALL)
|
|
|
|
@classmethod
|
|
def term_message(cls, message, color_mode='info', with_time=False):
|
|
prefix = f'{human_time()} ' if with_time else ''
|
|
if color_mode == 'info':
|
|
mode = '36m'
|
|
elif color_mode == 'warn':
|
|
mode = '33m'
|
|
elif color_mode == 'error':
|
|
mode = '31m'
|
|
elif color_mode == 'success':
|
|
mode = '32m'
|
|
else:
|
|
raise TypeError
|
|
|
|
return cls.regex.sub(fr'\1\033[{mode}{prefix}\3\033[0m\4', message)
|
|
|
|
|
|
class Helper(KitMixin):
|
|
def __init__(self, rds, rds_key):
|
|
self.rds = rds
|
|
self.rds_key = rds_key
|
|
self.buffers = defaultdict(str)
|
|
self.flags = defaultdict(bool)
|
|
self.already_clear = False
|
|
self.files = {}
|
|
|
|
def __del__(self):
|
|
self.clear()
|
|
|
|
@classmethod
|
|
def make(cls, rds, rds_key):
|
|
rds.delete(rds_key)
|
|
return cls(rds, rds_key)
|
|
|
|
@classmethod
|
|
def fill_outputs(cls, outputs, deploy_key):
|
|
rds = get_redis_connection()
|
|
key_ttl = rds.ttl(deploy_key)
|
|
counter, hit_keys = 0, set()
|
|
if key_ttl > 30 or key_ttl == -1:
|
|
data = rds.lrange(deploy_key, counter, counter + 9)
|
|
while data:
|
|
for item in data:
|
|
counter += 1
|
|
item = json.loads(item.decode())
|
|
key = item['key']
|
|
if key in outputs:
|
|
hit_keys.add(key)
|
|
if 'data' in item:
|
|
outputs[key]['data'] += item['data']
|
|
if 'status' in item:
|
|
outputs[key]['status'] = item['status']
|
|
data = rds.lrange(deploy_key, counter, counter + 9)
|
|
|
|
for key in outputs.keys():
|
|
if key in hit_keys:
|
|
continue
|
|
file_name = os.path.join(settings.DEPLOY_DIR, f'{deploy_key}:{key}')
|
|
if not os.path.exists(file_name):
|
|
continue
|
|
with open(file_name, newline='\r\n') as f:
|
|
line = f.readline()
|
|
while line:
|
|
status, data = line.split(',', 1)
|
|
if data:
|
|
outputs[key]['data'] += data
|
|
if status:
|
|
outputs[key]['status'] = status
|
|
line = f.readline()
|
|
return counter
|
|
|
|
def get_file(self, key):
|
|
if key in self.files:
|
|
return self.files[key]
|
|
file = open(os.path.join(settings.DEPLOY_DIR, f'{self.rds_key}:{key}'), 'w')
|
|
self.files[key] = file
|
|
return file
|
|
|
|
def get_cross_env(self, key):
|
|
file = os.path.join(settings.DEPLOY_DIR, key)
|
|
if os.path.exists(file):
|
|
with open(file, 'r') as f:
|
|
return json.loads(f.read())
|
|
return {}
|
|
|
|
def set_cross_env(self, key, envs):
|
|
file_envs = {}
|
|
for k, v in envs.items():
|
|
if k == 'SPUG_SET':
|
|
try:
|
|
update_config_by_var(v)
|
|
except SpugError as e:
|
|
self.send_error(key, f'{e}')
|
|
elif k.startswith('SPUG_GEV_'):
|
|
file_envs[k] = v
|
|
|
|
file = os.path.join(settings.DEPLOY_DIR, key)
|
|
with open(file, 'w') as f:
|
|
f.write(json.dumps(file_envs))
|
|
|
|
def save_pid(self, pid, key):
|
|
self.rds.set(f'PID:{self.rds_key}:{key}', pid, 3600)
|
|
|
|
def parse_filter_rule(self, data: str, sep='\n', env=None):
|
|
data, files = data.strip(), []
|
|
if data:
|
|
for line in data.split(sep):
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
files.append(render_str(line, env))
|
|
return files
|
|
|
|
def send(self, key, data, *, status=''):
|
|
message = {'key': key, 'data': data}
|
|
if status:
|
|
message['status'] = status
|
|
self.rds.rpush(self.rds_key, json.dumps(message))
|
|
|
|
file = self.get_file(key)
|
|
for idx, line in enumerate(data.split('\r\n')):
|
|
if idx != 0:
|
|
tmp = [status, self.buffers[key] + '\r\n']
|
|
file.write(','.join(tmp))
|
|
file.flush()
|
|
self.buffers[key] = ''
|
|
self.flags[key] = False
|
|
if line:
|
|
for idx2, item in enumerate(line.split('\r')):
|
|
if idx2 != 0:
|
|
self.flags[key] = True
|
|
if item:
|
|
if self.flags[key]:
|
|
self.buffers[key] = item
|
|
self.flags[key] = False
|
|
else:
|
|
self.buffers[key] += item
|
|
|
|
def send_clear(self, key):
|
|
self.send(key, '\033[2J\033[3J\033[1;1H')
|
|
|
|
def send_info(self, key, message, status='', with_time=True):
|
|
message = self.term_message(message, 'info', with_time)
|
|
self.send(key, message, status=status)
|
|
|
|
def send_warn(self, key, message, status=''):
|
|
message = self.term_message(message, 'warn')
|
|
self.send(key, message, status=status)
|
|
|
|
def send_success(self, key, message, with_time=True, start_time=None):
|
|
if start_time:
|
|
message += f', 耗时: {human_seconds_time(time.time() - start_time)}'
|
|
message = self.term_message(f'\r\n** {message} **', 'success', with_time)
|
|
self.send(key, message, status='success')
|
|
|
|
def send_error(self, key, message, with_break=False):
|
|
message = self.term_message(f'\r\n{message}', 'error')
|
|
self.send(key, message, status='error')
|
|
if with_break:
|
|
raise SpugError
|
|
|
|
def send_status(self, key, status):
|
|
self.send(key, '', status=status)
|
|
|
|
def clear(self):
|
|
if self.already_clear:
|
|
return
|
|
self.already_clear = True
|
|
for key, value in self.buffers.items():
|
|
if value:
|
|
file = self.get_file(key)
|
|
file.write(f',{value}')
|
|
for file in self.files.values():
|
|
file.close()
|
|
if self.rds.ttl(self.rds_key) == -1:
|
|
self.rds.expire(self.rds_key, 60)
|
|
|
|
def progress_callback(self, key):
|
|
def func(n, t):
|
|
message = f'\r {filesizeformat(n):<8}/{filesizeformat(t):>8} '
|
|
self.send(key, message)
|
|
|
|
self.send(key, '\r\n')
|
|
return func
|
|
|
|
def remote_exec(self, key, ssh, command, env=None):
|
|
code = -1
|
|
for code, out in ssh.exec_command_with_stream(command, environment=env):
|
|
self.send(key, out)
|
|
if code != 0:
|
|
self.send_error(key, f'exit code: {code}')
|
|
return code == 0
|
|
|
|
def local_exec(self, key, et, command):
|
|
code = -1
|
|
for code, out in et.exec_command_with_stream(command):
|
|
self.send(key, out)
|
|
if code != 0:
|
|
self.send_error(key, f'exit code: {code}')
|
|
return code == 0
|
|
|
|
def get_dynamic_envs(self, key, ssh):
|
|
code, out = ssh.exec_command('env')
|
|
if code == 0:
|
|
envs = {}
|
|
for line in out.splitlines():
|
|
if not line.startswith('SPUG_SET_'):
|
|
continue
|
|
k, v = line.split('=', 1)
|
|
envs[k.replace('SPUG_SET_', '', 1)] = v
|
|
envs[k] = v
|
|
return True, envs
|
|
self.send_error(key, f'获取动态环境变量失败: {out}')
|
|
return False, None
|