teleport/dist/server/script/core/utils.py

145 lines
3.3 KiB
Python

# -*- coding: utf8 -*-
import os
import platform
import shutil
import subprocess
import sys
import time
from . import colorconsole as cc
from .env import env
def remove(*args):
path = os.path.join(*args)
cc.v(' - remove [%s] ... ' % path, end='')
if not (os.path.exists(path) or os.path.islink(path)):
cc.v('not exists, skip.')
return
for i in range(5):
cc.v('.', end='')
try:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
time.sleep(0.5)
else:
os.unlink(path)
except:
pass
if os.path.exists(path):
time.sleep(1)
else:
break
if os.path.exists(path):
cc.e('[failed]')
raise RuntimeError('')
else:
cc.i('[done]')
def make_dirs(path, exist_ok=True):
if os.path.exists(path):
if not exist_ok:
raise RuntimeError('path already exists: %s' % path)
else:
return
for i in range(5):
try:
os.makedirs(path)
except:
time.sleep(1)
pass
if not os.path.exists(path):
time.sleep(1)
else:
break
if not os.path.exists(path):
raise RuntimeError('can not create: %s' % path)
def copy_ex(s_path, t_path, item_name=None, force=True):
if item_name is None:
s = s_path
t = t_path
else:
if isinstance(item_name, str):
f_from = item_name
f_to = item_name
elif isinstance(item_name, tuple):
f_from = item_name[0]
f_to = item_name[1]
else:
raise RuntimeError('utils.copy_ex() got invalid param.')
s = os.path.join(s_path, f_from)
t = os.path.join(t_path, f_to)
if os.path.exists(t):
if force:
remove(t)
else:
cc.w(t, 'already exists, skip copy.')
return
if os.path.isdir(s):
cc.v(' - copy [%s]\n -> [%s]' % (s, t))
shutil.copytree(s, t)
else:
if not os.path.exists(t_path):
os.makedirs(t_path)
cc.v(' - copy [%s]\n -> [%s]' % (s, t))
shutil.copy(s, t)
def ensure_file_exists(filename):
if not os.path.exists(filename):
raise RuntimeError('file not exists: {}'.format(filename))
if not os.path.isfile(filename):
raise RuntimeError('path exists but not a file: {}'.format(filename))
def sys_exec(cmd, direct_output=False, output_codec=None):
if output_codec is None:
if env.is_win:
output_codec = 'gb2312'
else:
output_codec = 'utf8'
p = None
if env.is_win:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
else:
p = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
output = list()
f = p.stdout
while True:
line = f.readline()
if 0 == len(line):
break
line = line.rstrip('\r\n')
if direct_output:
# cc.o((cc.CR_GRAY, line), end='\n')
cc.v(line, end='\n')
output.append(line)
ret = p.wait()
return ret, output
if __name__ == '__main__':
pass