temp: config类

pull/32/head
Apex Liu 2017-05-18 18:33:24 +08:00
parent 7e1a961bd3
commit 1894dbc5b4
1 changed files with 308 additions and 1 deletions

View File

@ -154,6 +154,281 @@ class WebConfig:
return self.cfg['ip']
class AppConfig(dict):
def __init__(self, **kwargs):
super().__init__(**kwargs)
import builtins
if '__app_cfg__' in builtins.__dict__:
raise RuntimeError('AppConfig instance already exists.')
self['_cfg_default'] = {
'common': { # ini 小节名称
'port': { # 名称
'value': '8081', # 内容
'comment': None # 注释
},
'log_file': {
# 'value': None,
'comment': '`log_file` define the log file location. if not set, default location\n'
'to %APPROOT%/log/web.log\n'
'log-file=/var/log/blockchain-dashboard/web.log'
},
'log_level': {
'value': 2,
'comment': 'log_level can be 0 ~ 4, default value is 2.\n'
'LOG_LEVEL_DEBUG 0 log every-thing.\n'
'LOG_LEVEL_VERBOSE 1 log every-thing but without debug message.\n'
'LOG_LEVEL_INFO 2 log infomation/warning/error message.\n'
'LOG_LEVEL_WARN 3 log warning and error message.\n'
'LOG_LEVEL_ERROR 4 log error message only.'
}
}
}
self['_cfg_loaded'] = {}
# self['_kvs'] = AttrDict()
self['_kvs'] = {'_': AttrDict()}
self['_cfg_file'] = ''
def __getattr__(self, name):
if name in self['_kvs']:
return self['_kvs'][name]
else:
if name in self['_kvs']['_']:
return self['_kvs']['_'][name]
else:
return AttrDict()
def __setattr__(self, key, val):
x = key.split('::')
if 1 == len(x):
_sec = '_'
_key = x[0]
elif 2 == len(x):
_sec = x[0]
_key = x[1]
else:
raise RuntimeError('invalid name.')
if _sec not in self['_kvs']:
self['_kvs'][_sec] = {}
self['_kvs'][_sec][_key] = val
def set_kv(self, key, val):
x = key.split('::')
if 1 == len(x):
_sec = '_'
_key = x[0]
elif 2 == len(x):
_sec = x[0]
_key = x[1]
else:
raise RuntimeError('invalid name.')
if _sec not in self['_cfg_loaded']:
self['_cfg_loaded'][_sec] = {}
self['_cfg_loaded'][_sec][_key] = val
self._update_kvs(_sec, _key, val)
def set_default(self, key, val, comment=None):
x = key.split('::')
if 1 == len(x):
_sec = '_'
_key = x[0]
elif 2 == len(x):
_sec = x[0]
_key = x[1]
else:
raise RuntimeError('invalid name.')
if _sec not in self['_cfg_default']:
self['_cfg_default'][_sec] = {}
if _key not in self['_cfg_default'][_sec]:
self['_cfg_default'][_sec][_key] = {}
self['_cfg_default'][_sec][_key]['value'] = val
self['_cfg_default'][_sec][_key]['comment'] = comment
else:
self['_cfg_default'][_sec][_key]['value'] = val
if comment is not None:
self['_cfg_default'][_sec][_key]['comment'] = comment
elif 'comment' not in self['_cfg_default'][_sec][_key]:
self['_cfg_default'][_sec][_key]['comment'] = None
self._update_kvs(_sec, _key, val)
def load(self, cfg_file):
if not os.path.exists(cfg_file):
log.e('configuration file does not exists: [{}]\n'.format(cfg_file))
return False
try:
_cfg = configparser.ConfigParser()
_cfg.read(cfg_file)
except:
log.e('can not load configuration file: [{}]\n'.format(cfg_file))
return False
if 'common' not in _cfg:
log.e('invalid configuration file: [{}]\n'.format(cfg_file))
return False
_comm = _cfg['common']
_tmp_int = _comm.getint('port', -1)
if -1 != _tmp_int:
self.set_kv('common::port', _tmp_int)
_tmp_str = _comm.get('log-file', None)
if _tmp_str is not None:
self.set_kv('common::log_file', _tmp_str)
_tmp_int = _comm.getint('log-level', -1)
if LOG_DEBUG <= _tmp_int <= LOG_ERROR:
self.set_kv('common::log_level', _tmp_int)
# self['server_port'] = _comm.getint('port', -1)
# self['log_file'] = _comm.get('log-file', None)
# if self['log_file'] is not None:
# self['log_path'] = os.path.dirname(self['log_file'])
# log_level, ok = self.cfg_log_level()
# if ok:
# log.set_attribute(min_level=log_level)
self['_cfg_file'] = cfg_file
# self._make_final()
return True
def save(self, cfg_file=None):
if cfg_file is None:
cfg_file = self['_cfg_file']
print('save to', cfg_file)
_save = [
{'common': ['port', 'log_file', 'log_level']},
# {'test': ['abc', 'def']}
]
cnt = ['; codec: utf-8\n']
is_first_section = True
for sections in _save:
for sec_name in sections:
if sec_name in self['_cfg_default'] or sec_name in self['_cfg_loaded']:
if not is_first_section:
cnt.append('\n')
cnt.append('[{}]'.format(sec_name))
is_first_section = False
for k in sections[sec_name]:
have_comment = False
if sec_name in self['_cfg_default'] and k in self['_cfg_default'][sec_name] and 'comment' in self['_cfg_default'][sec_name][k]:
comments = self['_cfg_default'][sec_name][k]['comment']
if comments is not None:
comments = self['_cfg_default'][sec_name][k]['comment'].split('\n')
cnt.append('')
have_comment = True
for comment in comments:
cnt.append('; {}'.format(comment))
if sec_name in self['_cfg_loaded'] and k in self['_cfg_loaded'][sec_name]:
if not have_comment:
cnt.append('')
cnt.append('{}={}'.format(k, self['_cfg_loaded'][sec_name][k]))
cnt.append('\n')
tmp_file = '{}.tmp'.format(cfg_file)
try:
with open(tmp_file, 'w', encoding='utf8') as f:
f.write('\n'.join(cnt))
if os.path.exists(cfg_file):
os.unlink(cfg_file)
os.rename(tmp_file, cfg_file)
return True
except Exception as e:
print(e.__str__())
return False
def _update_kvs(self, section, key, val):
if section not in self['_kvs']:
self['_kvs'][section] = AttrDict()
self['_kvs'][section][key] = val
def get_str(self, key, def_value=None):
x = key.split('::')
if 1 == len(x):
_sec = '_'
_key = x[0]
elif 2 == len(x):
_sec = x[0]
_key = x[1]
else:
return def_value, False
if _sec not in self['_kvs']:
return def_value, False
if _key not in self['_kvs'][_sec]:
return def_value, False
return str(self['_kvs'][_sec][_key]), True
def get_int(self, key, def_value=-1):
x = key.split('::')
if 1 == len(x):
_sec = '_'
_key = x[0]
elif 2 == len(x):
_sec = x[0]
_key = x[1]
else:
return def_value, False
if _sec not in self['_kvs']:
return def_value, False
if _key not in self['_kvs'][_sec]:
return def_value, False
try:
return int(self['_kvs'][_sec][_key]), True
except ValueError as e:
print(e.__str__())
return def_value, False
def get_bool(self, key, def_value=False):
x = key.split('::')
if 1 == len(x):
_sec = '_'
_key = x[0]
elif 2 == len(x):
_sec = x[0]
_key = x[1]
else:
return def_value, False
if _sec not in self['_kvs']:
return def_value, False
if _key not in self['_kvs'][_sec]:
return def_value, False
tmp = str(self['_kvs'][_sec][_key]).lower()
if tmp in ['yes', 'true', '1']:
return True, True
elif tmp in ['no', 'false', '0']:
return False, True
else:
return def_value, False
# def dump_var(obj, indent=' '):
# x = {'data': obj}
# y = json.dumps(x, indent=indent)
# t = y.split('\n')
# t = t[2:-2]
# for i in t:
# print(i[4:])
def app_cfg():
import builtins
if '__web_config__' not in builtins.__dict__:
@ -162,4 +437,36 @@ def app_cfg():
if __name__ == '__main__':
cfg = ConfigFile()
cfg = AppConfig()
# cfg.load('/Users/apex/work/otc-tech/contract-demo/dashboard/config/web.ini')
cfg.set_default('common::log_file', 'E:/test/log/web.log')
cfg.set_default('test::abc', 'this is a test', 'abcd\ndefa\ntttt\n')
cfg.set_kv('test::abc', '1234')
cfg.load('E:/test/config/web.ini')
# cfg = _cfg.config()
cfg.aaa = 'this is aaa'
cfg.bbb = 123
cfg.ccc = False
print('----usage--------------------')
print(cfg.common.port)
print(cfg.get_str('aaa'))
print(cfg.get_str('bbb'))
print(cfg.get_str('ccc'))
print('----usage--------------------')
print(cfg.get_int('aaa'))
print(cfg.get_int('bbb'))
print(cfg.get_int('ccc'))
print('----usage--------------------')
print(cfg.get_bool('aaa'))
print(cfg.get_bool('bbb'))
print(cfg.get_bool('ccc'))
print('----usage--------------------')
print(cfg.common)
print('----usage--------------------')
print(cfg.aaa)
print(cfg.bbb)
print(cfg.ccc)
cfg.save('E:/test/config/web-new.ini')
# cfg.save()