teleport/build/builder/core/utils.py

411 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf8 -*-
import os
import shutil
import subprocess
import sys
import time
from . import colorconsole as cc
from .env import env
if env.is_py2:
import imp
elif env.is_py3:
import importlib
import importlib.machinery
def _check_download_file(file_name):
if env.is_win:
# use 7z to test integrity of downloaded
ret, output = sys_exec('"{}" t "{}"'.format(env.zip7, file_name), False)
if 'Everything is Ok' in output:
return True
else:
x = os.path.splitext(file_name)
# print('ext:', x)
if x[-1].lower() == '.zip':
ret, output = sys_exec('zip -T "{}"'.format(file_name), False)
# print('test .zip:', ret, output)
if ret == 0:
return True
elif x[-1].lower() == '.xz':
ret, output = sys_exec('xz -t "{}"'.format(file_name), False)
# print('test .xz:', ret, output)
if ret == 0:
return True
elif x[-1].lower() == '.gz':
ret, output = sys_exec('gzip -t "{}"'.format(file_name), False)
# print('test .gz:', ret, output)
if ret == 0:
return True
else:
cc.w('[fixme] how to test {} on Linux? '.format(x[-1]), end='')
return True
return False
def download_file(desc, url, target_path, file_name):
cc.n('download {} ... '.format(desc), end='')
local_file_name = os.path.join(target_path, file_name)
if os.path.exists(local_file_name):
if not _check_download_file(local_file_name):
cc.w('already exists but broken, download it again...')
else:
cc.w('already exists, skip.')
return True
cc.v('')
# 因为下载过程会在命令行显示进度所以不能使用subprocess.Popen()的方式捕获输出,会很难看!
if env.is_win:
cmd = '""{}" --no-check-certificate {} -O "{}""'.format(env.wget, url, local_file_name)
os.system(cmd)
elif env.is_linux:
os.system('wget --no-check-certificate {} -O "{}"'.format(url, local_file_name))
else:
return False
if not os.path.exists(local_file_name) or not _check_download_file(local_file_name):
cc.e('downloading {} from {} failed.'.format(desc, url))
return False
return True
def extension_suffixes():
# imp.get_suffixes()
# 返回3元组列表(suffix, mode, type), 获得特殊模块的描述
# .suffix为文件后缀名;
# mode为打开文件模式;
# type为文件类型, 1代表PY_SOURCE, 2代表PY_COMPILED, 3代表C_EXTENSION
EXTENSION_SUFFIXES = list()
if env.is_py2:
suf = imp.get_suffixes()
for s in suf:
if s[2] == 3:
EXTENSION_SUFFIXES.append(s[0])
else:
EXTENSION_SUFFIXES = importlib.machinery.EXTENSION_SUFFIXES
if env.is_win:
if '.dll' not in EXTENSION_SUFFIXES:
EXTENSION_SUFFIXES.append('.dll')
elif env.is_linux:
if '.so' not in EXTENSION_SUFFIXES:
EXTENSION_SUFFIXES.append('.so')
else:
raise RuntimeError('not support this platform now.')
return EXTENSION_SUFFIXES
def remove(*args):
path = os.path.join(*args)
cc.v('remove [%s] ...' % path, end='')
if not os.path.exists(path):
cc.v('not exists, skip.')
return
for i in range(5):
cc.v('.', end='')
try:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
time.sleep(0.5)
else:
os.unlink(path)
except:
pass
if os.path.exists(path):
time.sleep(1)
else:
break
if os.path.exists(path):
cc.e('failed')
raise RuntimeError('')
else:
cc.i('done')
def makedirs(path, exist_ok=True):
if os.path.exists(path):
if not exist_ok:
raise RuntimeError('path already exists: %s' % path)
else:
return
for i in range(5):
try:
os.makedirs(path)
except:
time.sleep(1)
pass
if not os.path.exists(path):
time.sleep(1)
else:
break
if not os.path.exists(path):
raise RuntimeError('can not create: %s' % path)
def copy_file(s_path, t_path, f_name, force=True):
if isinstance(f_name, str):
f_from = f_name
f_to = f_name
elif isinstance(f_name, tuple):
f_from = f_name[0]
f_to = f_name[1]
else:
raise RuntimeError('utils.copy_file() got invalid param.')
s = os.path.join(s_path, f_from)
t = os.path.join(t_path, f_to)
if os.path.exists(t):
if force:
cc.v(' an exists version found, clean up...')
remove(t)
else:
cc.w(' an exists version found, skip copy.')
return
if not os.path.exists(t_path):
makedirs(t_path)
cc.v('copy [%s]\n -> [%s]' % (s, t))
shutil.copy(s, t)
def copy_ex(s_path, t_path, item_name=None, force=True):
if item_name is None:
s = s_path
t = t_path
else:
if isinstance(item_name, str):
f_from = item_name
f_to = item_name
elif isinstance(item_name, tuple):
f_from = item_name[0]
f_to = item_name[1]
else:
raise RuntimeError('utils.copy_ex() got invalid param.')
s = os.path.join(s_path, f_from)
t = os.path.join(t_path, f_to)
if os.path.exists(t):
if force:
remove(t)
else:
cc.w(t, 'already exists, skip copy.')
return
if os.path.isdir(s):
cc.v('copy [%s]\n -> [%s]' % (s, t))
shutil.copytree(s, t)
else:
if not os.path.exists(t_path):
os.makedirs(t_path)
cc.v('copy [%s]\n -> [%s]' % (s, t))
shutil.copy(s, t)
def update_file(s_path, t_path, f_name):
if isinstance(f_name, str):
f_from = f_name
f_to = f_name
elif isinstance(f_name, tuple):
f_from = f_name[0]
f_to = f_name[1]
else:
raise RuntimeError('utils.update_file() got invalid param.')
s = os.path.join(s_path, f_from)
t = os.path.join(t_path, f_to)
if not os.path.exists(s):
cc.w('try update file `%s` but not exists, skip.' % f_from)
return
# TODO: check file MD5 and update time.
if os.path.exists(t):
remove(t)
if not os.path.exists(t_path):
makedirs(t_path)
cc.v('update [%s]\n -> [%s]' % (s, t))
shutil.copy(os.path.join(s_path, f_from), t)
def ensure_file_exists(filename):
if not os.path.exists(filename):
raise RuntimeError('file not exists: {}'.format(filename))
if not os.path.isfile(filename):
raise RuntimeError('path exists but not a file: {}'.format(filename))
def sys_exec(cmd, direct_output=False, output_codec=None):
if output_codec is None:
if env.is_win:
output_codec = 'gb2312'
else:
output_codec = 'utf8'
p = None
if env.is_win:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
else:
p = subprocess.Popen(cmd, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, shell=True)
output = list()
f = p.stdout
while True:
line = f.readline()
if 0 == len(line):
break
line = line.rstrip('\r\n')
if direct_output:
cc.o((cc.CR_GRAY, line), end='\n')
output.append(line)
ret = p.wait()
return (ret, output)
def msvc_build(sln_file, proj_name, target, platform, force_rebuild):
if env.msbuild is None:
raise RuntimeError('where is `msbuild`?')
if force_rebuild:
cmd = '"{}" "{}" "/target:clean" "/property:Configuration={};Platform={}"'.format(env.msbuild, sln_file, target,
platform)
ret, _ = sys_exec(cmd, direct_output=True)
cc.v('ret:', ret)
cmd = '"{}" "{}" "/target:{}" "/property:Configuration={};Platform={}"'.format(env.msbuild, sln_file, proj_name,
target, platform)
ret, _ = sys_exec(cmd, direct_output=True)
if ret != 0:
raise RuntimeError('build MSVC project `{}` failed.'.format(proj_name))
def nsis_build(nsi_file, _define=''):
if env.nsis is None:
raise RuntimeError('where is `nsis`?')
cmd = '"{}" /V2 {} /X"SetCompressor /SOLID /FINAL lzma" "{}"'.format(env.nsis, _define, nsi_file)
ret, _ = sys_exec(cmd, direct_output=True)
if ret != 0:
raise RuntimeError('make installer with nsis failed. [{}]'.format(nsi_file))
def cmake(work_path, target, force_rebuild, cmake_define=''):
# I use cmake v3.5 which shipped with CLion.
if env.cmake is None:
raise RuntimeError('where is `cmake`?')
cc.n('make by cmake', target, sep=': ')
old_p = os.getcwd()
if os.path.exists(work_path):
if force_rebuild:
remove(work_path)
if not os.path.exists(work_path):
makedirs(work_path)
os.chdir(work_path)
if target == 'debug':
target = 'Debug'
else:
target = 'Release'
cmd = '"{}" -DCMAKE_BUILD_TYPE={} {} ..;make'.format(env.cmake, target, cmake_define)
ret, _ = sys_exec(cmd, direct_output=True)
os.chdir(old_p)
if ret != 0:
raise RuntimeError('build with cmake failed, ret={}. [{}]'.format(ret, target))
def strip(filename):
cc.n('strip binary file', filename)
if not os.path.exists(filename):
return False
cmd = 'strip {}'.format(filename)
ret, _ = sys_exec(cmd, direct_output=True)
if ret != 0:
raise RuntimeError('failed to strip binary file [{}], ret={}.'.format(filename, ret))
return True
def make_zip(src_path, to_file, from_parent=True):
cc.v('compress folder into .zip...')
src_path = os.path.abspath(src_path)
_parent = os.path.abspath(os.path.join(src_path, '..'))
_folder = src_path[len(_parent) + 1:]
if env.is_win:
old_p = os.getcwd()
if from_parent:
os.chdir(_parent)
cmd = '""{}" a "{}" "{}""'.format(env.zip7, to_file, _folder)
else:
os.chdir(src_path)
cmd = '""{}" a "{}" "*""'.format(env.zip7, to_file)
os.system(cmd)
os.chdir(old_p)
elif env.is_linux:
old_p = os.getcwd()
if from_parent:
os.chdir(_parent)
cmd = 'zip -r "{}" "{}"'.format(to_file, _folder)
else:
os.chdir(src_path)
cmd = 'zip -q -r "{}" ./*'.format(to_file)
os.system(cmd)
os.chdir(old_p)
else:
raise RuntimeError('not support this platform.')
ensure_file_exists(to_file)
def unzip(file_name, to_path):
if env.is_win:
cmd = '""{}" x "{}" -o"{}""'.format(env.zip7, file_name, to_path)
os.system(cmd)
elif env.is_linux:
os.system('unzip "{}" -d "{}"'.format(file_name, to_path))
else:
raise RuntimeError('not support this platform.')
def make_targz(work_path, folder, to_file):
cc.v('compress folder into .tar.gz...')
old_p = os.getcwd()
os.chdir(work_path)
cmd = 'tar zcf "{}" "{}"'.format(to_file, folder)
ret, _ = sys_exec(cmd, direct_output=True)
ensure_file_exists(to_file)
os.chdir(old_p)
if __name__ == '__main__':
# test()
pass