mirror of https://github.com/fail2ban/fail2ban
minimize influence of implicit conversions errors (between unicode, bytes and str), provide new universal helper `uni_string`, which uses safe explicit conversion to string (also if default encoding is ascii); avoid conversion errors on wrong-chars by replace tags.
parent
227550684a
commit
85fd1854bc
|
@ -49,11 +49,18 @@ if sys.version_info < (3,): # python >= 2.6
|
|||
def __resetDefaultEncoding(encoding):
|
||||
global PREFER_ENC
|
||||
ode = sys.getdefaultencoding().upper()
|
||||
if ode == 'ASCII' or ode != PREFER_ENC.upper():
|
||||
if ode == 'ASCII' and ode != PREFER_ENC.upper():
|
||||
# setdefaultencoding is normally deleted after site initialized, so hack-in using load of sys-module:
|
||||
from imp import load_dynamic as __ldm
|
||||
_sys = __ldm('_sys', 'sys')
|
||||
_sys.setdefaultencoding(encoding)
|
||||
_sys = sys
|
||||
if not hasattr(_sys, "setdefaultencoding"):
|
||||
try:
|
||||
from imp import load_dynamic as __ldm
|
||||
_sys = __ldm('_sys', 'sys')
|
||||
except ImportError: # pragma: no cover (only if load_dynamic fails)
|
||||
reload(sys)
|
||||
_sys = sys
|
||||
if hasattr(_sys, "setdefaultencoding"):
|
||||
_sys.setdefaultencoding(encoding)
|
||||
# override to PREFER_ENC:
|
||||
__resetDefaultEncoding(PREFER_ENC)
|
||||
del __resetDefaultEncoding
|
||||
|
@ -61,11 +68,58 @@ if sys.version_info < (3,): # python >= 2.6
|
|||
# todo: rewrite explicit (and implicit) str-conversions via encode/decode with IO-encoding (sys.stdout.encoding),
|
||||
# e. g. inside tags-replacement by command-actions, etc.
|
||||
|
||||
#
|
||||
# Following "uni_decode", "uni_string" functions unified python independent any
|
||||
# to string converting.
|
||||
#
|
||||
# Typical example resp. work-case for understanding the coding/decoding issues:
|
||||
#
|
||||
# [isinstance('', str), isinstance(b'', str), isinstance(u'', str)]
|
||||
# [True, True, False]; # -- python2
|
||||
# [True, False, True]; # -- python3
|
||||
#
|
||||
if sys.version_info >= (3,):
|
||||
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
||||
try:
|
||||
if isinstance(x, bytes):
|
||||
return x.decode(enc, errors)
|
||||
return x
|
||||
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
||||
if errors != 'strict':
|
||||
raise
|
||||
return uni_decode(x, enc, 'replace')
|
||||
def uni_string(x):
|
||||
if not isinstance(x, bytes):
|
||||
return str(x)
|
||||
return uni_decode(x)
|
||||
else:
|
||||
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
||||
try:
|
||||
if isinstance(x, unicode):
|
||||
return x.encode(enc, errors)
|
||||
return x
|
||||
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
||||
if errors != 'strict':
|
||||
raise
|
||||
return uni_decode(x, enc, 'replace')
|
||||
if sys.getdefaultencoding().upper() != 'UTF-8':
|
||||
def uni_string(x):
|
||||
if not isinstance(x, unicode):
|
||||
return str(x)
|
||||
return uni_decode(x)
|
||||
else:
|
||||
uni_string = str
|
||||
|
||||
|
||||
def _as_bool(val):
|
||||
return bool(val) if not isinstance(val, basestring) \
|
||||
else val.lower() in ('1', 'on', 'true', 'yes')
|
||||
|
||||
|
||||
def formatExceptionInfo():
|
||||
""" Consistently format exception information """
|
||||
cla, exc = sys.exc_info()[:2]
|
||||
return (cla.__name__, str(exc))
|
||||
return (cla.__name__, uni_string(exc))
|
||||
|
||||
|
||||
#
|
||||
|
@ -235,41 +289,6 @@ else:
|
|||
r.update(y)
|
||||
return r
|
||||
|
||||
#
|
||||
# Following "uni_decode" function unified python independent any to string converting
|
||||
#
|
||||
# Typical example resp. work-case for understanding the coding/decoding issues:
|
||||
#
|
||||
# [isinstance('', str), isinstance(b'', str), isinstance(u'', str)]
|
||||
# [True, True, False]; # -- python2
|
||||
# [True, False, True]; # -- python3
|
||||
#
|
||||
if sys.version_info >= (3,):
|
||||
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
||||
try:
|
||||
if isinstance(x, bytes):
|
||||
return x.decode(enc, errors)
|
||||
return x
|
||||
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
||||
if errors != 'strict':
|
||||
raise
|
||||
return uni_decode(x, enc, 'replace')
|
||||
else:
|
||||
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
||||
try:
|
||||
if isinstance(x, unicode):
|
||||
return x.encode(enc, errors)
|
||||
return x
|
||||
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
||||
if errors != 'strict':
|
||||
raise
|
||||
return uni_decode(x, enc, 'replace')
|
||||
|
||||
|
||||
def _as_bool(val):
|
||||
return bool(val) if not isinstance(val, basestring) \
|
||||
else val.lower() in ('1', 'on', 'true', 'yes')
|
||||
|
||||
#
|
||||
# Following function used for parse options from parameter (e.g. `name[p1=0, p2="..."][p3='...']`).
|
||||
#
|
||||
|
@ -347,7 +366,7 @@ def substituteRecursiveTags(inptags, conditional='',
|
|||
if tag in ignore or tag in done: continue
|
||||
# ignore replacing callable items from calling map - should be converted on demand only (by get):
|
||||
if noRecRepl and callable(tags.getRawItem(tag)): continue
|
||||
value = orgval = str(tags[tag])
|
||||
value = orgval = uni_string(tags[tag])
|
||||
# search and replace all tags within value, that can be interpolated using other tags:
|
||||
m = tre_search(value)
|
||||
refCounts = {}
|
||||
|
@ -382,7 +401,7 @@ def substituteRecursiveTags(inptags, conditional='',
|
|||
m = tre_search(value, m.end())
|
||||
continue
|
||||
# if calling map - be sure we've string:
|
||||
if noRecRepl: repl = str(repl)
|
||||
if noRecRepl: repl = uni_string(repl)
|
||||
value = value.replace('<%s>' % rtag, repl)
|
||||
#logSys.log(5, 'value now: %s' % value)
|
||||
# increment reference count:
|
||||
|
|
|
@ -36,7 +36,7 @@ from .failregex import mapTag2Opt
|
|||
from .ipdns import asip, DNSUtils
|
||||
from .mytime import MyTime
|
||||
from .utils import Utils
|
||||
from ..helpers import getLogger, _merge_copy_dicts, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
|
||||
from ..helpers import getLogger, _merge_copy_dicts, uni_string, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
|
||||
|
||||
# Gets the instance of the logger.
|
||||
logSys = getLogger(__name__)
|
||||
|
@ -608,7 +608,7 @@ class CommandAction(ActionBase):
|
|||
if value is None:
|
||||
# fallback (no or default replacement)
|
||||
return ADD_REPL_TAGS_CM.get(tag, m.group())
|
||||
value = str(value) # assure string
|
||||
value = uni_string(value) # assure string
|
||||
if tag in cls._escapedTags:
|
||||
# That one needs to be escaped since its content is
|
||||
# out of our control
|
||||
|
@ -687,7 +687,7 @@ class CommandAction(ActionBase):
|
|||
except KeyError:
|
||||
# fallback (no or default replacement)
|
||||
return ADD_REPL_TAGS_CM.get(tag, m.group())
|
||||
value = str(value) # assure string
|
||||
value = uni_string(value) # assure string
|
||||
# replacement for tag:
|
||||
return escapeVal(tag, value)
|
||||
|
||||
|
@ -701,7 +701,7 @@ class CommandAction(ActionBase):
|
|||
def substTag(m):
|
||||
tag = mapTag2Opt(m.groups()[0])
|
||||
try:
|
||||
value = str(tickData[tag])
|
||||
value = uni_string(tickData[tag])
|
||||
except KeyError:
|
||||
return ""
|
||||
return escapeVal("F_"+tag, value)
|
||||
|
|
Loading…
Reference in New Issue