mirror of https://github.com/tp4a/teleport
411 lines
12 KiB
Python
411 lines
12 KiB
Python
# -*- coding: utf8 -*-
|
||
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
|
||
from . import colorconsole as cc
|
||
from .env import env
|
||
|
||
if env.is_py2:
|
||
import imp
|
||
elif env.is_py3:
|
||
import importlib
|
||
import importlib.machinery
|
||
|
||
|
||
def _check_download_file(file_name):
|
||
if env.is_win:
|
||
# use 7z to test integrity of downloaded
|
||
ret, output = sys_exec('"{}" t "{}"'.format(env.zip7, file_name), False)
|
||
if 'Everything is Ok' in output:
|
||
return True
|
||
else:
|
||
x = os.path.splitext(file_name)
|
||
# print('ext:', x)
|
||
if x[-1].lower() == '.zip':
|
||
ret, output = sys_exec('zip -T "{}"'.format(file_name), False)
|
||
# print('test .zip:', ret, output)
|
||
if ret == 0:
|
||
return True
|
||
elif x[-1].lower() == '.xz':
|
||
ret, output = sys_exec('xz -t "{}"'.format(file_name), False)
|
||
# print('test .xz:', ret, output)
|
||
if ret == 0:
|
||
return True
|
||
elif x[-1].lower() == '.gz':
|
||
ret, output = sys_exec('gzip -t "{}"'.format(file_name), False)
|
||
# print('test .gz:', ret, output)
|
||
if ret == 0:
|
||
return True
|
||
else:
|
||
cc.w('[fixme] how to test {} on Linux? '.format(x[-1]), end='')
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def download_file(desc, url, target_path, file_name):
|
||
cc.n('download {} ... '.format(desc), end='')
|
||
|
||
local_file_name = os.path.join(target_path, file_name)
|
||
if os.path.exists(local_file_name):
|
||
if not _check_download_file(local_file_name):
|
||
cc.w('already exists but broken, download it again...')
|
||
else:
|
||
cc.w('already exists, skip.')
|
||
return True
|
||
|
||
cc.v('')
|
||
# 因为下载过程会在命令行显示进度,所以不能使用subprocess.Popen()的方式捕获输出,会很难看!
|
||
if env.is_win:
|
||
cmd = '""{}" --no-check-certificate {} -O "{}""'.format(env.wget, url, local_file_name)
|
||
os.system(cmd)
|
||
elif env.is_linux:
|
||
os.system('wget --no-check-certificate {} -O "{}"'.format(url, local_file_name))
|
||
else:
|
||
return False
|
||
|
||
if not os.path.exists(local_file_name) or not _check_download_file(local_file_name):
|
||
cc.e('downloading {} from {} failed.'.format(desc, url))
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def extension_suffixes():
|
||
# imp.get_suffixes()
|
||
# 返回3元组列表(suffix, mode, type), 获得特殊模块的描述
|
||
# .suffix为文件后缀名;
|
||
# mode为打开文件模式;
|
||
# type为文件类型, 1代表PY_SOURCE, 2代表PY_COMPILED, 3代表C_EXTENSION
|
||
|
||
EXTENSION_SUFFIXES = list()
|
||
if env.is_py2:
|
||
suf = imp.get_suffixes()
|
||
for s in suf:
|
||
if s[2] == 3:
|
||
EXTENSION_SUFFIXES.append(s[0])
|
||
else:
|
||
EXTENSION_SUFFIXES = importlib.machinery.EXTENSION_SUFFIXES
|
||
|
||
if env.is_win:
|
||
if '.dll' not in EXTENSION_SUFFIXES:
|
||
EXTENSION_SUFFIXES.append('.dll')
|
||
|
||
elif env.is_linux:
|
||
if '.so' not in EXTENSION_SUFFIXES:
|
||
EXTENSION_SUFFIXES.append('.so')
|
||
|
||
else:
|
||
raise RuntimeError('not support this platform now.')
|
||
|
||
return EXTENSION_SUFFIXES
|
||
|
||
|
||
def remove(*args):
|
||
path = os.path.join(*args)
|
||
|
||
cc.v('remove [%s] ...' % path, end='')
|
||
if not os.path.exists(path):
|
||
cc.v('not exists, skip.')
|
||
return
|
||
|
||
for i in range(5):
|
||
cc.v('.', end='')
|
||
try:
|
||
if os.path.isdir(path):
|
||
shutil.rmtree(path, ignore_errors=True)
|
||
time.sleep(0.5)
|
||
else:
|
||
os.unlink(path)
|
||
except:
|
||
pass
|
||
|
||
if os.path.exists(path):
|
||
time.sleep(1)
|
||
else:
|
||
break
|
||
|
||
if os.path.exists(path):
|
||
cc.e('failed')
|
||
raise RuntimeError('')
|
||
else:
|
||
cc.i('done')
|
||
|
||
|
||
def makedirs(path, exist_ok=True):
|
||
if os.path.exists(path):
|
||
if not exist_ok:
|
||
raise RuntimeError('path already exists: %s' % path)
|
||
else:
|
||
return
|
||
|
||
for i in range(5):
|
||
try:
|
||
os.makedirs(path)
|
||
except:
|
||
time.sleep(1)
|
||
pass
|
||
|
||
if not os.path.exists(path):
|
||
time.sleep(1)
|
||
else:
|
||
break
|
||
|
||
if not os.path.exists(path):
|
||
raise RuntimeError('can not create: %s' % path)
|
||
|
||
|
||
def copy_file(s_path, t_path, f_name, force=True):
|
||
if isinstance(f_name, str):
|
||
f_from = f_name
|
||
f_to = f_name
|
||
elif isinstance(f_name, tuple):
|
||
f_from = f_name[0]
|
||
f_to = f_name[1]
|
||
else:
|
||
raise RuntimeError('utils.copy_file() got invalid param.')
|
||
|
||
s = os.path.join(s_path, f_from)
|
||
t = os.path.join(t_path, f_to)
|
||
if os.path.exists(t):
|
||
if force:
|
||
cc.v(' an exists version found, clean up...')
|
||
remove(t)
|
||
else:
|
||
cc.w(' an exists version found, skip copy.')
|
||
return
|
||
|
||
if not os.path.exists(t_path):
|
||
makedirs(t_path)
|
||
cc.v('copy [%s]\n -> [%s]' % (s, t))
|
||
shutil.copy(s, t)
|
||
|
||
|
||
def copy_ex(s_path, t_path, item_name=None, force=True):
|
||
if item_name is None:
|
||
s = s_path
|
||
t = t_path
|
||
else:
|
||
if isinstance(item_name, str):
|
||
f_from = item_name
|
||
f_to = item_name
|
||
elif isinstance(item_name, tuple):
|
||
f_from = item_name[0]
|
||
f_to = item_name[1]
|
||
else:
|
||
raise RuntimeError('utils.copy_ex() got invalid param.')
|
||
|
||
s = os.path.join(s_path, f_from)
|
||
t = os.path.join(t_path, f_to)
|
||
|
||
if os.path.exists(t):
|
||
if force:
|
||
remove(t)
|
||
else:
|
||
cc.w(t, 'already exists, skip copy.')
|
||
return
|
||
|
||
if os.path.isdir(s):
|
||
cc.v('copy [%s]\n -> [%s]' % (s, t))
|
||
shutil.copytree(s, t)
|
||
else:
|
||
if not os.path.exists(t_path):
|
||
os.makedirs(t_path)
|
||
cc.v('copy [%s]\n -> [%s]' % (s, t))
|
||
shutil.copy(s, t)
|
||
|
||
|
||
def update_file(s_path, t_path, f_name):
|
||
if isinstance(f_name, str):
|
||
f_from = f_name
|
||
f_to = f_name
|
||
elif isinstance(f_name, tuple):
|
||
f_from = f_name[0]
|
||
f_to = f_name[1]
|
||
else:
|
||
raise RuntimeError('utils.update_file() got invalid param.')
|
||
|
||
s = os.path.join(s_path, f_from)
|
||
t = os.path.join(t_path, f_to)
|
||
if not os.path.exists(s):
|
||
cc.w('try update file `%s` but not exists, skip.' % f_from)
|
||
return
|
||
|
||
# TODO: check file MD5 and update time.
|
||
|
||
if os.path.exists(t):
|
||
remove(t)
|
||
|
||
if not os.path.exists(t_path):
|
||
makedirs(t_path)
|
||
cc.v('update [%s]\n -> [%s]' % (s, t))
|
||
shutil.copy(os.path.join(s_path, f_from), t)
|
||
|
||
|
||
def ensure_file_exists(filename):
|
||
if not os.path.exists(filename):
|
||
raise RuntimeError('file not exists: {}'.format(filename))
|
||
if not os.path.isfile(filename):
|
||
raise RuntimeError('path exists but not a file: {}'.format(filename))
|
||
|
||
|
||
def sys_exec(cmd, direct_output=False, output_codec=None):
|
||
if output_codec is None:
|
||
if env.is_win:
|
||
output_codec = 'gb2312'
|
||
else:
|
||
output_codec = 'utf8'
|
||
|
||
p = None
|
||
if env.is_win:
|
||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
|
||
|
||
else:
|
||
p = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||
universal_newlines=True, shell=True)
|
||
|
||
output = list()
|
||
f = p.stdout
|
||
while True:
|
||
line = f.readline()
|
||
if 0 == len(line):
|
||
break
|
||
|
||
line = line.rstrip('\r\n')
|
||
|
||
if direct_output:
|
||
cc.o((cc.CR_GRAY, line), end='\n')
|
||
|
||
output.append(line)
|
||
|
||
ret = p.wait()
|
||
|
||
return (ret, output)
|
||
|
||
|
||
def msvc_build(sln_file, proj_name, target, platform, force_rebuild):
|
||
if env.msbuild is None:
|
||
raise RuntimeError('where is `msbuild`?')
|
||
|
||
if force_rebuild:
|
||
cmd = '"{}" "{}" "/target:clean" "/property:Configuration={};Platform={}"'.format(env.msbuild, sln_file, target,
|
||
platform)
|
||
ret, _ = sys_exec(cmd, direct_output=True)
|
||
cc.v('ret:', ret)
|
||
|
||
cmd = '"{}" "{}" "/target:{}" "/property:Configuration={};Platform={}"'.format(env.msbuild, sln_file, proj_name,
|
||
target, platform)
|
||
ret, _ = sys_exec(cmd, direct_output=True)
|
||
if ret != 0:
|
||
raise RuntimeError('build MSVC project `{}` failed.'.format(proj_name))
|
||
|
||
|
||
def nsis_build(nsi_file, _define=''):
|
||
if env.nsis is None:
|
||
raise RuntimeError('where is `nsis`?')
|
||
|
||
cmd = '"{}" /V2 {} /X"SetCompressor /SOLID /FINAL lzma" "{}"'.format(env.nsis, _define, nsi_file)
|
||
ret, _ = sys_exec(cmd, direct_output=True)
|
||
if ret != 0:
|
||
raise RuntimeError('make installer with nsis failed. [{}]'.format(nsi_file))
|
||
|
||
|
||
def cmake(work_path, target, force_rebuild, cmake_define=''):
|
||
# I use cmake v3.5 which shipped with CLion.
|
||
if env.cmake is None:
|
||
raise RuntimeError('where is `cmake`?')
|
||
|
||
cc.n('make by cmake', target, sep=': ')
|
||
old_p = os.getcwd()
|
||
|
||
if os.path.exists(work_path):
|
||
if force_rebuild:
|
||
remove(work_path)
|
||
if not os.path.exists(work_path):
|
||
makedirs(work_path)
|
||
|
||
os.chdir(work_path)
|
||
if target == 'debug':
|
||
target = 'Debug'
|
||
else:
|
||
target = 'Release'
|
||
cmd = '"{}" -DCMAKE_BUILD_TYPE={} {} ..;make'.format(env.cmake, target, cmake_define)
|
||
ret, _ = sys_exec(cmd, direct_output=True)
|
||
os.chdir(old_p)
|
||
if ret != 0:
|
||
raise RuntimeError('build with cmake failed, ret={}. [{}]'.format(ret, target))
|
||
|
||
|
||
def strip(filename):
|
||
cc.n('strip binary file', filename)
|
||
if not os.path.exists(filename):
|
||
return False
|
||
cmd = 'strip {}'.format(filename)
|
||
ret, _ = sys_exec(cmd, direct_output=True)
|
||
if ret != 0:
|
||
raise RuntimeError('failed to strip binary file [{}], ret={}.'.format(filename, ret))
|
||
return True
|
||
|
||
|
||
def make_zip(src_path, to_file, from_parent=True):
|
||
cc.v('compress folder into .zip...')
|
||
|
||
src_path = os.path.abspath(src_path)
|
||
_parent = os.path.abspath(os.path.join(src_path, '..'))
|
||
_folder = src_path[len(_parent) + 1:]
|
||
|
||
if env.is_win:
|
||
old_p = os.getcwd()
|
||
if from_parent:
|
||
os.chdir(_parent)
|
||
cmd = '""{}" a "{}" "{}""'.format(env.zip7, to_file, _folder)
|
||
else:
|
||
os.chdir(src_path)
|
||
cmd = '""{}" a "{}" "*""'.format(env.zip7, to_file)
|
||
os.system(cmd)
|
||
os.chdir(old_p)
|
||
elif env.is_linux:
|
||
old_p = os.getcwd()
|
||
if from_parent:
|
||
os.chdir(_parent)
|
||
cmd = 'zip -r "{}" "{}"'.format(to_file, _folder)
|
||
else:
|
||
os.chdir(src_path)
|
||
cmd = 'zip -q -r "{}" ./*'.format(to_file)
|
||
os.system(cmd)
|
||
os.chdir(old_p)
|
||
else:
|
||
raise RuntimeError('not support this platform.')
|
||
|
||
ensure_file_exists(to_file)
|
||
|
||
|
||
def unzip(file_name, to_path):
|
||
if env.is_win:
|
||
cmd = '""{}" x "{}" -o"{}""'.format(env.zip7, file_name, to_path)
|
||
os.system(cmd)
|
||
elif env.is_linux:
|
||
os.system('unzip "{}" -d "{}"'.format(file_name, to_path))
|
||
else:
|
||
raise RuntimeError('not support this platform.')
|
||
|
||
|
||
def make_targz(work_path, folder, to_file):
|
||
cc.v('compress folder into .tar.gz...')
|
||
old_p = os.getcwd()
|
||
|
||
os.chdir(work_path)
|
||
cmd = 'tar zcf "{}" "{}"'.format(to_file, folder)
|
||
ret, _ = sys_exec(cmd, direct_output=True)
|
||
ensure_file_exists(to_file)
|
||
os.chdir(old_p)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# test()
|
||
pass
|