mirror of https://github.com/fail2ban/fail2ban
Merge pull request #2171 from sebres/0.10-fix-decoding-issues
commit
857d6954c4
|
@ -7,5 +7,6 @@ source =
|
||||||
|
|
||||||
[report]
|
[report]
|
||||||
exclude_lines =
|
exclude_lines =
|
||||||
pragma: no cover
|
pragma: ?no ?cover
|
||||||
pragma: systemd no cover
|
pragma: ?${F2B_PY}.x no ?cover
|
||||||
|
pragma: ?systemd no ?cover
|
||||||
|
|
15
.travis.yml
15
.travis.yml
|
@ -18,8 +18,9 @@ python:
|
||||||
- pypy3.3-5.5-alpha
|
- pypy3.3-5.5-alpha
|
||||||
before_install:
|
before_install:
|
||||||
- echo "running under $TRAVIS_PYTHON_VERSION"
|
- echo "running under $TRAVIS_PYTHON_VERSION"
|
||||||
- if [[ $TRAVIS_PYTHON_VERSION == 2* || $TRAVIS_PYTHON_VERSION == pypy* && $TRAVIS_PYTHON_VERSION != pypy3* ]]; then export F2B_PY_2=true && echo "Set F2B_PY_2"; fi
|
- if [[ $TRAVIS_PYTHON_VERSION == 2* || $TRAVIS_PYTHON_VERSION == pypy* && $TRAVIS_PYTHON_VERSION != pypy3* ]]; then export F2B_PY=2; fi
|
||||||
- if [[ $TRAVIS_PYTHON_VERSION == 3* || $TRAVIS_PYTHON_VERSION == pypy3* ]]; then export F2B_PY_3=true && echo "Set F2B_PY_3"; fi
|
- if [[ $TRAVIS_PYTHON_VERSION == 3* || $TRAVIS_PYTHON_VERSION == pypy3* ]]; then export F2B_PY=3; fi
|
||||||
|
- echo "Set F2B_PY=$F2B_PY"
|
||||||
- travis_retry sudo apt-get update -qq
|
- travis_retry sudo apt-get update -qq
|
||||||
# Set this so sudo executes the correct python binary
|
# Set this so sudo executes the correct python binary
|
||||||
# Anything not using sudo will already have the correct environment
|
# Anything not using sudo will already have the correct environment
|
||||||
|
@ -31,20 +32,20 @@ install:
|
||||||
# coveralls
|
# coveralls
|
||||||
- travis_retry pip install coveralls codecov
|
- travis_retry pip install coveralls codecov
|
||||||
# dnspython or dnspython3
|
# dnspython or dnspython3
|
||||||
- if [[ "$F2B_PY_2" ]]; then travis_retry pip install dnspython; fi
|
- if [[ "$F2B_PY" = 2 ]]; then travis_retry pip install dnspython; fi
|
||||||
- if [[ "$F2B_PY_3" ]]; then travis_retry pip install dnspython3; fi
|
- if [[ "$F2B_PY" = 3 ]]; then travis_retry pip install dnspython3; fi
|
||||||
# gamin - install manually (not in PyPI) - travis-ci system Python is 2.7
|
# gamin - install manually (not in PyPI) - travis-ci system Python is 2.7
|
||||||
- if [[ $TRAVIS_PYTHON_VERSION == 2.7 ]]; then travis_retry sudo apt-get install -qq python-gamin && cp /usr/share/pyshared/gamin.py /usr/lib/pyshared/python2.7/_gamin.so $VIRTUAL_ENV/lib/python2.7/site-packages/; fi
|
- if [[ $TRAVIS_PYTHON_VERSION == 2.7 ]]; then travis_retry sudo apt-get install -qq python-gamin && cp /usr/share/pyshared/gamin.py /usr/lib/pyshared/python2.7/_gamin.so $VIRTUAL_ENV/lib/python2.7/site-packages/; fi
|
||||||
# pyinotify
|
# pyinotify
|
||||||
- travis_retry pip install pyinotify
|
- travis_retry pip install pyinotify
|
||||||
before_script:
|
before_script:
|
||||||
# Manually execute 2to3 for now
|
# Manually execute 2to3 for now
|
||||||
- if [[ "$F2B_PY_3" ]]; then ./fail2ban-2to3; fi
|
- if [[ "$F2B_PY" = 3 ]]; then ./fail2ban-2to3; fi
|
||||||
script:
|
script:
|
||||||
# Keep the legacy setup.py test approach of checking coverage for python2
|
# Keep the legacy setup.py test approach of checking coverage for python2
|
||||||
- if [[ "$F2B_PY_2" ]]; then coverage run setup.py test; fi
|
- if [[ "$F2B_PY" = 2 ]]; then coverage run setup.py test; fi
|
||||||
# Coverage doesn't pick up setup.py test with python3, so run it directly (with same verbosity as from setup)
|
# Coverage doesn't pick up setup.py test with python3, so run it directly (with same verbosity as from setup)
|
||||||
- if [[ "$F2B_PY_3" ]]; then coverage run bin/fail2ban-testcases --verbosity=2; fi
|
- if [[ "$F2B_PY" = 3 ]]; then coverage run bin/fail2ban-testcases --verbosity=2; fi
|
||||||
# Use $VENV_BIN (not python) or else sudo will always run the system's python (2.7)
|
# Use $VENV_BIN (not python) or else sudo will always run the system's python (2.7)
|
||||||
- sudo $VENV_BIN/pip install .
|
- sudo $VENV_BIN/pip install .
|
||||||
# Doc files should get installed on Travis under Linux
|
# Doc files should get installed on Travis under Linux
|
||||||
|
|
|
@ -38,6 +38,13 @@ ver. 0.10.4-dev-1 (20??/??/??) - development edition
|
||||||
* `filter.d/dovecot.conf`: failregex enhancement to catch sql password mismatch errors (gh-2153);
|
* `filter.d/dovecot.conf`: failregex enhancement to catch sql password mismatch errors (gh-2153);
|
||||||
* `action.d/hostsdeny.conf`: fix parameter in config (dynamic parameters stating with '_' are protected
|
* `action.d/hostsdeny.conf`: fix parameter in config (dynamic parameters stating with '_' are protected
|
||||||
and don't allowed in command-actions), see gh-2114;
|
and don't allowed in command-actions), see gh-2114;
|
||||||
|
* decoding stability fix by wrong encoded characters like utf-8 surrogate pairs, etc (gh-2171):
|
||||||
|
- fail2ban running in the preferred encoding now (as default encoding also within python 2.x), mostly
|
||||||
|
`UTF-8` in opposite to `ascii` previously, so minimizes influence of implicit conversions errors;
|
||||||
|
- actions: avoid possible conversion errors on wrong-chars by replace tags;
|
||||||
|
- database: improve adapter/converter handlers working on invalid characters in sense of json and/or sqlite-database;
|
||||||
|
additionally both are exception-safe now, so avoid possible locking of database (closes gh-2137);
|
||||||
|
- logging in fail2ban is process-wide exception-safe now.
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
|
|
||||||
|
|
|
@ -18,16 +18,16 @@
|
||||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
if sys.version_info < (2, 7):
|
if sys.version_info < (2, 7): # pragma: no cover
|
||||||
raise ImportError("badips.py action requires Python >= 2.7")
|
raise ImportError("badips.py action requires Python >= 2.7")
|
||||||
import json
|
import json
|
||||||
import threading
|
import threading
|
||||||
import logging
|
import logging
|
||||||
if sys.version_info >= (3, ):
|
if sys.version_info >= (3, ): # pragma: 2.x no cover
|
||||||
from urllib.request import Request, urlopen
|
from urllib.request import Request, urlopen
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
from urllib.error import HTTPError
|
from urllib.error import HTTPError
|
||||||
else:
|
else: # pragma: 3.x no cover
|
||||||
from urllib2 import Request, urlopen, HTTPError
|
from urllib2 import Request, urlopen, HTTPError
|
||||||
from urllib import urlencode
|
from urllib import urlencode
|
||||||
|
|
||||||
|
|
|
@ -36,14 +36,90 @@ from .server.mytime import MyTime
|
||||||
PREFER_ENC = locale.getpreferredencoding()
|
PREFER_ENC = locale.getpreferredencoding()
|
||||||
# correct preferred encoding if lang not set in environment:
|
# correct preferred encoding if lang not set in environment:
|
||||||
if PREFER_ENC.startswith('ANSI_'): # pragma: no cover
|
if PREFER_ENC.startswith('ANSI_'): # pragma: no cover
|
||||||
if all((os.getenv(v) in (None, "") for v in ('LANGUAGE', 'LC_ALL', 'LC_CTYPE', 'LANG'))):
|
if sys.stdout and not sys.stdout.encoding.startswith('ANSI_'):
|
||||||
|
PREFER_ENC = sys.stdout.encoding
|
||||||
|
elif all((os.getenv(v) in (None, "") for v in ('LANGUAGE', 'LC_ALL', 'LC_CTYPE', 'LANG'))):
|
||||||
PREFER_ENC = 'UTF-8';
|
PREFER_ENC = 'UTF-8';
|
||||||
|
|
||||||
|
# py-2.x: try to minimize influence of sporadic conversion errors on python 2.x,
|
||||||
|
# caused by implicit converting of string/unicode (e. g. `str(u"\uFFFD")` produces an error
|
||||||
|
# if default encoding is 'ascii');
|
||||||
|
if sys.version_info < (3,): # pragma: 3.x no cover
|
||||||
|
# correct default (global system) encoding (mostly UTF-8):
|
||||||
|
def __resetDefaultEncoding(encoding):
|
||||||
|
global PREFER_ENC
|
||||||
|
ode = sys.getdefaultencoding().upper()
|
||||||
|
if ode == 'ASCII' and ode != PREFER_ENC.upper():
|
||||||
|
# setdefaultencoding is normally deleted after site initialized, so hack-in using load of sys-module:
|
||||||
|
_sys = sys
|
||||||
|
if not hasattr(_sys, "setdefaultencoding"):
|
||||||
|
try:
|
||||||
|
from imp import load_dynamic as __ldm
|
||||||
|
_sys = __ldm('_sys', 'sys')
|
||||||
|
except ImportError: # pragma: no cover - only if load_dynamic fails
|
||||||
|
reload(sys)
|
||||||
|
_sys = sys
|
||||||
|
if hasattr(_sys, "setdefaultencoding"):
|
||||||
|
_sys.setdefaultencoding(encoding)
|
||||||
|
# override to PREFER_ENC:
|
||||||
|
__resetDefaultEncoding(PREFER_ENC)
|
||||||
|
del __resetDefaultEncoding
|
||||||
|
|
||||||
|
# todo: rewrite explicit (and implicit) str-conversions via encode/decode with IO-encoding (sys.stdout.encoding),
|
||||||
|
# e. g. inside tags-replacement by command-actions, etc.
|
||||||
|
|
||||||
|
#
|
||||||
|
# Following "uni_decode", "uni_string" functions unified python independent any
|
||||||
|
# to string converting.
|
||||||
|
#
|
||||||
|
# Typical example resp. work-case for understanding the coding/decoding issues:
|
||||||
|
#
|
||||||
|
# [isinstance('', str), isinstance(b'', str), isinstance(u'', str)]
|
||||||
|
# [True, True, False]; # -- python2
|
||||||
|
# [True, False, True]; # -- python3
|
||||||
|
#
|
||||||
|
if sys.version_info >= (3,): # pragma: 2.x no cover
|
||||||
|
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
||||||
|
try:
|
||||||
|
if isinstance(x, bytes):
|
||||||
|
return x.decode(enc, errors)
|
||||||
|
return x
|
||||||
|
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
||||||
|
if errors != 'strict':
|
||||||
|
raise
|
||||||
|
return x.decode(enc, 'replace')
|
||||||
|
def uni_string(x):
|
||||||
|
if not isinstance(x, bytes):
|
||||||
|
return str(x)
|
||||||
|
return x.decode(PREFER_ENC, 'replace')
|
||||||
|
else: # pragma: 3.x no cover
|
||||||
|
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
||||||
|
try:
|
||||||
|
if isinstance(x, unicode):
|
||||||
|
return x.encode(enc, errors)
|
||||||
|
return x
|
||||||
|
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
||||||
|
if errors != 'strict':
|
||||||
|
raise
|
||||||
|
return x.encode(enc, 'replace')
|
||||||
|
if sys.getdefaultencoding().upper() != 'UTF-8': # pragma: no cover - utf-8 is default encoding now
|
||||||
|
def uni_string(x):
|
||||||
|
if not isinstance(x, unicode):
|
||||||
|
return str(x)
|
||||||
|
return x.encode(PREFER_ENC, 'replace')
|
||||||
|
else:
|
||||||
|
uni_string = str
|
||||||
|
|
||||||
|
|
||||||
|
def _as_bool(val):
|
||||||
|
return bool(val) if not isinstance(val, basestring) \
|
||||||
|
else val.lower() in ('1', 'on', 'true', 'yes')
|
||||||
|
|
||||||
|
|
||||||
def formatExceptionInfo():
|
def formatExceptionInfo():
|
||||||
""" Consistently format exception information """
|
""" Consistently format exception information """
|
||||||
cla, exc = sys.exc_info()[:2]
|
cla, exc = sys.exc_info()[:2]
|
||||||
return (cla.__name__, str(exc))
|
return (cla.__name__, uni_string(exc))
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -126,6 +202,35 @@ class FormatterWithTraceBack(logging.Formatter):
|
||||||
return logging.Formatter.format(self, record)
|
return logging.Formatter.format(self, record)
|
||||||
|
|
||||||
|
|
||||||
|
__origLog = logging.Logger._log
|
||||||
|
def __safeLog(self, level, msg, args, **kwargs):
|
||||||
|
"""Safe log inject to avoid possible errors by unsafe log-handlers,
|
||||||
|
concat, str. conversion, representation fails, etc.
|
||||||
|
|
||||||
|
Used to intrude exception-safe _log-method instead of _log-method
|
||||||
|
of Logger class to be always safe by logging and to get more-info about.
|
||||||
|
|
||||||
|
See testSafeLogging test-case for more information. At least the errors
|
||||||
|
covered in phase 3 seems to affected in all known pypy/python versions
|
||||||
|
until now.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# if isEnabledFor(level) already called...
|
||||||
|
__origLog(self, level, msg, args, **kwargs)
|
||||||
|
except Exception as e: # pragma: no cover - unreachable if log-handler safe in this python-version
|
||||||
|
try:
|
||||||
|
for args in (
|
||||||
|
("logging failed: %r on %s", (e, uni_string(msg))),
|
||||||
|
(" args: %r", ([uni_string(a) for a in args],))
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
__origLog(self, level, *args)
|
||||||
|
except: # pragma: no cover
|
||||||
|
pass
|
||||||
|
except: # pragma: no cover
|
||||||
|
pass
|
||||||
|
logging.Logger._log = __safeLog
|
||||||
|
|
||||||
def getLogger(name):
|
def getLogger(name):
|
||||||
"""Get logging.Logger instance with Fail2Ban logger name convention
|
"""Get logging.Logger instance with Fail2Ban logger name convention
|
||||||
"""
|
"""
|
||||||
|
@ -213,41 +318,6 @@ else:
|
||||||
r.update(y)
|
r.update(y)
|
||||||
return r
|
return r
|
||||||
|
|
||||||
#
|
|
||||||
# Following "uni_decode" function unified python independent any to string converting
|
|
||||||
#
|
|
||||||
# Typical example resp. work-case for understanding the coding/decoding issues:
|
|
||||||
#
|
|
||||||
# [isinstance('', str), isinstance(b'', str), isinstance(u'', str)]
|
|
||||||
# [True, True, False]; # -- python2
|
|
||||||
# [True, False, True]; # -- python3
|
|
||||||
#
|
|
||||||
if sys.version_info >= (3,):
|
|
||||||
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
|
||||||
try:
|
|
||||||
if isinstance(x, bytes):
|
|
||||||
return x.decode(enc, errors)
|
|
||||||
return x
|
|
||||||
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
|
||||||
if errors != 'strict':
|
|
||||||
raise
|
|
||||||
return uni_decode(x, enc, 'replace')
|
|
||||||
else:
|
|
||||||
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
|
|
||||||
try:
|
|
||||||
if isinstance(x, unicode):
|
|
||||||
return x.encode(enc, errors)
|
|
||||||
return x
|
|
||||||
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
|
|
||||||
if errors != 'strict':
|
|
||||||
raise
|
|
||||||
return uni_decode(x, enc, 'replace')
|
|
||||||
|
|
||||||
|
|
||||||
def _as_bool(val):
|
|
||||||
return bool(val) if not isinstance(val, basestring) \
|
|
||||||
else val.lower() in ('1', 'on', 'true', 'yes')
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Following function used for parse options from parameter (e.g. `name[p1=0, p2="..."][p3='...']`).
|
# Following function used for parse options from parameter (e.g. `name[p1=0, p2="..."][p3='...']`).
|
||||||
#
|
#
|
||||||
|
@ -325,7 +395,7 @@ def substituteRecursiveTags(inptags, conditional='',
|
||||||
if tag in ignore or tag in done: continue
|
if tag in ignore or tag in done: continue
|
||||||
# ignore replacing callable items from calling map - should be converted on demand only (by get):
|
# ignore replacing callable items from calling map - should be converted on demand only (by get):
|
||||||
if noRecRepl and callable(tags.getRawItem(tag)): continue
|
if noRecRepl and callable(tags.getRawItem(tag)): continue
|
||||||
value = orgval = str(tags[tag])
|
value = orgval = uni_string(tags[tag])
|
||||||
# search and replace all tags within value, that can be interpolated using other tags:
|
# search and replace all tags within value, that can be interpolated using other tags:
|
||||||
m = tre_search(value)
|
m = tre_search(value)
|
||||||
refCounts = {}
|
refCounts = {}
|
||||||
|
@ -360,7 +430,7 @@ def substituteRecursiveTags(inptags, conditional='',
|
||||||
m = tre_search(value, m.end())
|
m = tre_search(value, m.end())
|
||||||
continue
|
continue
|
||||||
# if calling map - be sure we've string:
|
# if calling map - be sure we've string:
|
||||||
if noRecRepl: repl = str(repl)
|
if noRecRepl: repl = uni_string(repl)
|
||||||
value = value.replace('<%s>' % rtag, repl)
|
value = value.replace('<%s>' % rtag, repl)
|
||||||
#logSys.log(5, 'value now: %s' % value)
|
#logSys.log(5, 'value now: %s' % value)
|
||||||
# increment reference count:
|
# increment reference count:
|
||||||
|
|
|
@ -36,7 +36,7 @@ from .failregex import mapTag2Opt
|
||||||
from .ipdns import asip, DNSUtils
|
from .ipdns import asip, DNSUtils
|
||||||
from .mytime import MyTime
|
from .mytime import MyTime
|
||||||
from .utils import Utils
|
from .utils import Utils
|
||||||
from ..helpers import getLogger, _merge_copy_dicts, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
|
from ..helpers import getLogger, _merge_copy_dicts, uni_string, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
|
||||||
|
|
||||||
# Gets the instance of the logger.
|
# Gets the instance of the logger.
|
||||||
logSys = getLogger(__name__)
|
logSys = getLogger(__name__)
|
||||||
|
@ -83,6 +83,8 @@ class CallingMap(MutableMapping, object):
|
||||||
The dictionary data which can be accessed to obtain items uncalled
|
The dictionary data which can be accessed to obtain items uncalled
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
CM_REPR_ITEMS = ()
|
||||||
|
|
||||||
# immutable=True saves content between actions, without interim copying (save original on demand, recoverable via reset)
|
# immutable=True saves content between actions, without interim copying (save original on demand, recoverable via reset)
|
||||||
__slots__ = ('data', 'storage', 'immutable', '__org_data')
|
__slots__ = ('data', 'storage', 'immutable', '__org_data')
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
@ -98,14 +100,29 @@ class CallingMap(MutableMapping, object):
|
||||||
pass
|
pass
|
||||||
self.immutable = immutable
|
self.immutable = immutable
|
||||||
|
|
||||||
def __repr__(self):
|
def _asrepr(self, calculated=False):
|
||||||
return "%s(%r)" % (self.__class__.__name__, self._asdict())
|
# be sure it is suitable as string, so use str as checker:
|
||||||
|
return "%s(%r)" % (self.__class__.__name__, self._asdict(calculated, str))
|
||||||
|
|
||||||
def _asdict(self):
|
__repr__ = _asrepr
|
||||||
|
|
||||||
|
def _asdict(self, calculated=False, checker=None):
|
||||||
|
d = dict(self.data, **self.storage)
|
||||||
|
if not calculated:
|
||||||
|
return dict((n,v) for n,v in d.iteritems() \
|
||||||
|
if not callable(v) or n in self.CM_REPR_ITEMS)
|
||||||
|
for n,v in d.items():
|
||||||
|
if callable(v):
|
||||||
try:
|
try:
|
||||||
return dict(self)
|
# calculate:
|
||||||
except:
|
v = self.__getitem__(n)
|
||||||
return dict(self.data, **self.storage)
|
# convert if needed:
|
||||||
|
if checker: checker(v)
|
||||||
|
# store calculated:
|
||||||
|
d[n] = v
|
||||||
|
except: # can't calculate - just ignore it
|
||||||
|
pass
|
||||||
|
return d
|
||||||
|
|
||||||
def getRawItem(self, key):
|
def getRawItem(self, key):
|
||||||
try:
|
try:
|
||||||
|
@ -156,7 +173,7 @@ class CallingMap(MutableMapping, object):
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return len(self.data)
|
return len(self.data)
|
||||||
|
|
||||||
def copy(self): # pargma: no cover
|
def copy(self): # pragma: no cover
|
||||||
return self.__class__(_merge_copy_dicts(self.data, self.storage))
|
return self.__class__(_merge_copy_dicts(self.data, self.storage))
|
||||||
|
|
||||||
|
|
||||||
|
@ -591,7 +608,7 @@ class CommandAction(ActionBase):
|
||||||
if value is None:
|
if value is None:
|
||||||
# fallback (no or default replacement)
|
# fallback (no or default replacement)
|
||||||
return ADD_REPL_TAGS_CM.get(tag, m.group())
|
return ADD_REPL_TAGS_CM.get(tag, m.group())
|
||||||
value = str(value) # assure string
|
value = uni_string(value) # assure string
|
||||||
if tag in cls._escapedTags:
|
if tag in cls._escapedTags:
|
||||||
# That one needs to be escaped since its content is
|
# That one needs to be escaped since its content is
|
||||||
# out of our control
|
# out of our control
|
||||||
|
@ -670,7 +687,7 @@ class CommandAction(ActionBase):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# fallback (no or default replacement)
|
# fallback (no or default replacement)
|
||||||
return ADD_REPL_TAGS_CM.get(tag, m.group())
|
return ADD_REPL_TAGS_CM.get(tag, m.group())
|
||||||
value = str(value) # assure string
|
value = uni_string(value) # assure string
|
||||||
# replacement for tag:
|
# replacement for tag:
|
||||||
return escapeVal(tag, value)
|
return escapeVal(tag, value)
|
||||||
|
|
||||||
|
@ -684,7 +701,7 @@ class CommandAction(ActionBase):
|
||||||
def substTag(m):
|
def substTag(m):
|
||||||
tag = mapTag2Opt(m.groups()[0])
|
tag = mapTag2Opt(m.groups()[0])
|
||||||
try:
|
try:
|
||||||
value = str(tickData[tag])
|
value = uni_string(tickData[tag])
|
||||||
except KeyError:
|
except KeyError:
|
||||||
return ""
|
return ""
|
||||||
return escapeVal("F_"+tag, value)
|
return escapeVal("F_"+tag, value)
|
||||||
|
|
|
@ -290,6 +290,8 @@ class Actions(JailThread, Mapping):
|
||||||
|
|
||||||
class ActionInfo(CallingMap):
|
class ActionInfo(CallingMap):
|
||||||
|
|
||||||
|
CM_REPR_ITEMS = ("fid", "raw-ticket")
|
||||||
|
|
||||||
AI_DICT = {
|
AI_DICT = {
|
||||||
"ip": lambda self: self.__ticket.getIP(),
|
"ip": lambda self: self.__ticket.getIP(),
|
||||||
"family": lambda self: self['ip'].familyStr,
|
"family": lambda self: self['ip'].familyStr,
|
||||||
|
@ -307,7 +309,9 @@ class Actions(JailThread, Mapping):
|
||||||
"ipmatches": lambda self: "\n".join(self._mi4ip(True).getMatches()),
|
"ipmatches": lambda self: "\n".join(self._mi4ip(True).getMatches()),
|
||||||
"ipjailmatches": lambda self: "\n".join(self._mi4ip().getMatches()),
|
"ipjailmatches": lambda self: "\n".join(self._mi4ip().getMatches()),
|
||||||
"ipfailures": lambda self: self._mi4ip(True).getAttempt(),
|
"ipfailures": lambda self: self._mi4ip(True).getAttempt(),
|
||||||
"ipjailfailures": lambda self: self._mi4ip().getAttempt()
|
"ipjailfailures": lambda self: self._mi4ip().getAttempt(),
|
||||||
|
# raw ticket info:
|
||||||
|
"raw-ticket": lambda self: repr(self.__ticket)
|
||||||
}
|
}
|
||||||
|
|
||||||
__slots__ = CallingMap.__slots__ + ('__ticket', '__jail', '__mi4ip')
|
__slots__ = CallingMap.__slots__ + ('__ticket', '__jail', '__mi4ip')
|
||||||
|
@ -319,7 +323,7 @@ class Actions(JailThread, Mapping):
|
||||||
self.immutable = immutable
|
self.immutable = immutable
|
||||||
self.data = data
|
self.data = data
|
||||||
|
|
||||||
def copy(self): # pargma: no cover
|
def copy(self): # pragma: no cover
|
||||||
return self.__class__(self.__ticket, self.__jail, self.immutable, self.data.copy())
|
return self.__class__(self.__ticket, self.__jail, self.immutable, self.data.copy())
|
||||||
|
|
||||||
def _mi4ip(self, overalljails=False):
|
def _mi4ip(self, overalljails=False):
|
||||||
|
|
|
@ -33,60 +33,65 @@ from threading import RLock
|
||||||
from .mytime import MyTime
|
from .mytime import MyTime
|
||||||
from .ticket import FailTicket
|
from .ticket import FailTicket
|
||||||
from .utils import Utils
|
from .utils import Utils
|
||||||
from ..helpers import getLogger, PREFER_ENC
|
from ..helpers import getLogger, uni_string, PREFER_ENC
|
||||||
|
|
||||||
# Gets the instance of the logger.
|
# Gets the instance of the logger.
|
||||||
logSys = getLogger(__name__)
|
logSys = getLogger(__name__)
|
||||||
|
|
||||||
if sys.version_info >= (3,):
|
|
||||||
def _json_default(x):
|
def _json_default(x):
|
||||||
|
"""Avoid errors on types unknown in json-adapters."""
|
||||||
if isinstance(x, set):
|
if isinstance(x, set):
|
||||||
x = list(x)
|
x = list(x)
|
||||||
return x
|
return uni_string(x)
|
||||||
|
|
||||||
|
if sys.version_info >= (3,): # pragma: 2.x no cover
|
||||||
def _json_dumps_safe(x):
|
def _json_dumps_safe(x):
|
||||||
try:
|
try:
|
||||||
x = json.dumps(x, ensure_ascii=False, default=_json_default).encode(
|
x = json.dumps(x, ensure_ascii=False, default=_json_default).encode(
|
||||||
PREFER_ENC, 'replace')
|
PREFER_ENC, 'replace')
|
||||||
except Exception as e: # pragma: no cover
|
except Exception as e:
|
||||||
logSys.error('json dumps failed: %s', e)
|
# adapter handler should be exception-safe
|
||||||
|
logSys.error('json dumps failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
|
||||||
x = '{}'
|
x = '{}'
|
||||||
return x
|
return x
|
||||||
|
|
||||||
def _json_loads_safe(x):
|
def _json_loads_safe(x):
|
||||||
try:
|
try:
|
||||||
x = json.loads(x.decode(
|
x = json.loads(x.decode(PREFER_ENC, 'replace'))
|
||||||
PREFER_ENC, 'replace'))
|
except Exception as e:
|
||||||
except Exception as e: # pragma: no cover
|
# converter handler should be exception-safe
|
||||||
logSys.error('json loads failed: %s', e)
|
logSys.error('json loads failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
|
||||||
x = {}
|
x = {}
|
||||||
return x
|
return x
|
||||||
else:
|
else: # pragma: 3.x no cover
|
||||||
def _normalize(x):
|
def _normalize(x):
|
||||||
if isinstance(x, dict):
|
if isinstance(x, dict):
|
||||||
return dict((_normalize(k), _normalize(v)) for k, v in x.iteritems())
|
return dict((_normalize(k), _normalize(v)) for k, v in x.iteritems())
|
||||||
elif isinstance(x, (list, set)):
|
elif isinstance(x, (list, set)):
|
||||||
return [_normalize(element) for element in x]
|
return [_normalize(element) for element in x]
|
||||||
elif isinstance(x, unicode):
|
elif isinstance(x, unicode):
|
||||||
return x.encode(PREFER_ENC)
|
# in 2.x default text_factory is unicode - so return proper unicode here:
|
||||||
else:
|
return x.encode(PREFER_ENC, 'replace').decode(PREFER_ENC)
|
||||||
|
elif isinstance(x, basestring):
|
||||||
|
return x.decode(PREFER_ENC, 'replace')
|
||||||
return x
|
return x
|
||||||
|
|
||||||
def _json_dumps_safe(x):
|
def _json_dumps_safe(x):
|
||||||
try:
|
try:
|
||||||
x = json.dumps(_normalize(x), ensure_ascii=False).decode(
|
x = json.dumps(_normalize(x), ensure_ascii=False, default=_json_default)
|
||||||
PREFER_ENC, 'replace')
|
except Exception as e:
|
||||||
except Exception as e: # pragma: no cover
|
# adapter handler should be exception-safe
|
||||||
logSys.error('json dumps failed: %s', e)
|
logSys.error('json dumps failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
|
||||||
x = '{}'
|
x = '{}'
|
||||||
return x
|
return x
|
||||||
|
|
||||||
def _json_loads_safe(x):
|
def _json_loads_safe(x):
|
||||||
try:
|
try:
|
||||||
x = _normalize(json.loads(x.decode(
|
x = json.loads(x.decode(PREFER_ENC, 'replace'))
|
||||||
PREFER_ENC, 'replace')))
|
except Exception as e:
|
||||||
except Exception as e: # pragma: no cover
|
# converter handler should be exception-safe
|
||||||
logSys.error('json loads failed: %s', e)
|
logSys.error('json loads failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
|
||||||
x = {}
|
x = {}
|
||||||
return x
|
return x
|
||||||
|
|
||||||
|
@ -184,6 +189,8 @@ class Fail2BanDb(object):
|
||||||
self._db = sqlite3.connect(
|
self._db = sqlite3.connect(
|
||||||
filename, check_same_thread=False,
|
filename, check_same_thread=False,
|
||||||
detect_types=sqlite3.PARSE_DECLTYPES)
|
detect_types=sqlite3.PARSE_DECLTYPES)
|
||||||
|
# # to allow use multi-byte utf-8
|
||||||
|
# self._db.text_factory = str
|
||||||
|
|
||||||
self._bansMergedCache = {}
|
self._bansMergedCache = {}
|
||||||
|
|
||||||
|
|
|
@ -138,7 +138,7 @@ class Ticket(object):
|
||||||
self._data['matches'] = matches or []
|
self._data['matches'] = matches or []
|
||||||
|
|
||||||
def getMatches(self):
|
def getMatches(self):
|
||||||
return [(line if isinstance(line, basestring) else "".join(line)) \
|
return [(line if not isinstance(line, (list, tuple)) else "".join(line)) \
|
||||||
for line in self._data.get('matches', ())]
|
for line in self._data.get('matches', ())]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -29,9 +29,9 @@ from ..dummyjail import DummyJail
|
||||||
from ..servertestcase import IPAddr
|
from ..servertestcase import IPAddr
|
||||||
from ..utils import LogCaptureTestCase, CONFIG_DIR
|
from ..utils import LogCaptureTestCase, CONFIG_DIR
|
||||||
|
|
||||||
if sys.version_info >= (3, ):
|
if sys.version_info >= (3, ): # pragma: 2.x no cover
|
||||||
from urllib.error import HTTPError, URLError
|
from urllib.error import HTTPError, URLError
|
||||||
else:
|
else: # pragma: 3.x no cover
|
||||||
from urllib2 import HTTPError, URLError
|
from urllib2 import HTTPError, URLError
|
||||||
|
|
||||||
def skip_if_not_available(f):
|
def skip_if_not_available(f):
|
||||||
|
|
|
@ -567,13 +567,18 @@ class CommandActionTest(LogCaptureTestCase):
|
||||||
'b': lambda self: self['a'] + 6,
|
'b': lambda self: self['a'] + 6,
|
||||||
'c': ''
|
'c': ''
|
||||||
})
|
})
|
||||||
s = repr(m)
|
s = repr(m); # only stored values (no calculated)
|
||||||
|
self.assertNotIn("'a': ", s)
|
||||||
|
self.assertNotIn("'b': ", s)
|
||||||
|
self.assertIn("'c': ''", s)
|
||||||
|
|
||||||
|
s = m._asrepr(True) # all values (including calculated)
|
||||||
self.assertIn("'a': 5", s)
|
self.assertIn("'a': 5", s)
|
||||||
self.assertIn("'b': 11", s)
|
self.assertIn("'b': 11", s)
|
||||||
self.assertIn("'c': ''", s)
|
self.assertIn("'c': ''", s)
|
||||||
|
|
||||||
m['c'] = lambda self: self['xxx'] + 7; # unresolvable
|
m['c'] = lambda self: self['xxx'] + 7; # unresolvable
|
||||||
s = repr(m)
|
s = m._asrepr(True)
|
||||||
self.assertIn("'a': 5", s)
|
self.assertIn("'a': 5", s)
|
||||||
self.assertIn("'b': 11", s)
|
self.assertIn("'b': 11", s)
|
||||||
self.assertIn("'c': ", s) # presents as callable
|
self.assertIn("'c': ", s) # presents as callable
|
||||||
|
|
|
@ -35,10 +35,11 @@ from ..server.ticket import FailTicket
|
||||||
from ..server.actions import Actions, Utils
|
from ..server.actions import Actions, Utils
|
||||||
from .dummyjail import DummyJail
|
from .dummyjail import DummyJail
|
||||||
try:
|
try:
|
||||||
from ..server.database import Fail2BanDb as Fail2BanDb
|
from ..server import database
|
||||||
|
Fail2BanDb = database.Fail2BanDb
|
||||||
except ImportError: # pragma: no cover
|
except ImportError: # pragma: no cover
|
||||||
Fail2BanDb = None
|
Fail2BanDb = None
|
||||||
from .utils import LogCaptureTestCase
|
from .utils import LogCaptureTestCase, logSys as DefLogSys
|
||||||
|
|
||||||
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
||||||
|
|
||||||
|
@ -238,30 +239,61 @@ class DatabaseTest(LogCaptureTestCase):
|
||||||
self.testAddJail()
|
self.testAddJail()
|
||||||
# invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ...
|
# invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ...
|
||||||
tickets = [
|
tickets = [
|
||||||
FailTicket("127.0.0.1", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
|
FailTicket("127.0.0.1", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
|
||||||
FailTicket("127.0.0.2", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
|
FailTicket("127.0.0.2", 0, ['user "test"', u'user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
|
||||||
FailTicket("127.0.0.3", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')])
|
FailTicket("127.0.0.3", 0, ['user "test"', b'user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
|
||||||
|
FailTicket("127.0.0.4", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xe4\xf6\xfc\xdf"']),
|
||||||
|
FailTicket("127.0.0.5", 0, ['user "test"', 'unterminated \xcf']),
|
||||||
|
FailTicket("127.0.0.6", 0, ['user "test"', u'unterminated \xcf']),
|
||||||
|
FailTicket("127.0.0.7", 0, ['user "test"', b'unterminated \xcf'])
|
||||||
]
|
]
|
||||||
self.db.addBan(self.jail, tickets[0])
|
for ticket in tickets:
|
||||||
self.db.addBan(self.jail, tickets[1])
|
self.db.addBan(self.jail, ticket)
|
||||||
self.db.addBan(self.jail, tickets[2])
|
|
||||||
|
self.assertNotLogged("json dumps failed")
|
||||||
|
|
||||||
readtickets = self.db.getBans(jail=self.jail)
|
readtickets = self.db.getBans(jail=self.jail)
|
||||||
self.assertEqual(len(readtickets), 3)
|
|
||||||
## python 2 or 3 :
|
self.assertNotLogged("json loads failed")
|
||||||
invstr = u'user "\ufffd\ufffd\ufffd\ufffd\ufffd"'.encode('utf-8', 'replace')
|
|
||||||
self.assertTrue(
|
## all tickets available
|
||||||
readtickets[0] == FailTicket("127.0.0.1", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
|
self.assertEqual(len(readtickets), 7)
|
||||||
or readtickets[0] == tickets[0]
|
|
||||||
)
|
## too different to cover all possible constellations for python 2 and 3,
|
||||||
self.assertTrue(
|
## can replace/ignore some non-ascii chars by json dump/load (unicode/str),
|
||||||
readtickets[1] == FailTicket("127.0.0.2", 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')])
|
## so check ip and matches count only:
|
||||||
or readtickets[1] == tickets[1]
|
for i, ticket in enumerate(tickets):
|
||||||
)
|
DefLogSys.debug('readtickets[%d]: %r', i, readtickets[i].getData())
|
||||||
self.assertTrue(
|
DefLogSys.debug(' == tickets[%d]: %r', i, ticket.getData())
|
||||||
readtickets[2] == FailTicket("127.0.0.3", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
|
self.assertEqual(readtickets[i].getIP(), ticket.getIP())
|
||||||
or readtickets[2] == tickets[2]
|
self.assertEqual(len(readtickets[i].getMatches()), len(ticket.getMatches()))
|
||||||
)
|
|
||||||
|
self.pruneLog('[test-phase 2] simulate errors')
|
||||||
|
## simulate errors in dumps/loads:
|
||||||
|
priorEnc = database.PREFER_ENC
|
||||||
|
try:
|
||||||
|
database.PREFER_ENC = 'f2b-test::non-existing-encoding'
|
||||||
|
|
||||||
|
for ticket in tickets:
|
||||||
|
self.db.addBan(self.jail, ticket)
|
||||||
|
|
||||||
|
self.assertLogged("json dumps failed")
|
||||||
|
|
||||||
|
readtickets = self.db.getBans(jail=self.jail)
|
||||||
|
|
||||||
|
self.assertLogged("json loads failed")
|
||||||
|
|
||||||
|
## despite errors all tickets written and loaded (check adapter-handlers are error-safe):
|
||||||
|
self.assertEqual(len(readtickets), 14)
|
||||||
|
finally:
|
||||||
|
database.PREFER_ENC = priorEnc
|
||||||
|
|
||||||
|
## check the database is still operable (not locked) after all the errors:
|
||||||
|
self.pruneLog('[test-phase 3] still operable?')
|
||||||
|
self.db.addBan(self.jail, FailTicket("127.0.0.8"))
|
||||||
|
readtickets = self.db.getBans(jail=self.jail)
|
||||||
|
self.assertEqual(len(readtickets), 15)
|
||||||
|
self.assertNotLogged("json loads failed", "json dumps failed")
|
||||||
|
|
||||||
def _testAdd3Bans(self):
|
def _testAdd3Bans(self):
|
||||||
self.testAddJail()
|
self.testAddJail()
|
||||||
|
|
|
@ -1223,8 +1223,8 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
||||||
_write_file(lgfn, "w+",
|
_write_file(lgfn, "w+",
|
||||||
str(int(MyTime.time())) + ' failure "125-000-001" - 192.0.2.1',
|
str(int(MyTime.time())) + ' failure "125-000-001" - 192.0.2.1',
|
||||||
str(int(MyTime.time())) + ' failure "125-000-002" - 192.0.2.1',
|
str(int(MyTime.time())) + ' failure "125-000-002" - 192.0.2.1',
|
||||||
str(int(MyTime.time())) + ' failure "125-000-003" - 192.0.2.1',
|
str(int(MyTime.time())) + ' failure "125-000-003" - 192.0.2.1 (\xf2\xf0\xe5\xf2\xe8\xe9)',
|
||||||
str(int(MyTime.time())) + ' failure "125-000-004" - 192.0.2.1',
|
str(int(MyTime.time())) + ' failure "125-000-004" - 192.0.2.1 (\xf2\xf0\xe5\xf2\xe8\xe9)',
|
||||||
str(int(MyTime.time())) + ' failure "125-000-005" - 192.0.2.1',
|
str(int(MyTime.time())) + ' failure "125-000-005" - 192.0.2.1',
|
||||||
)
|
)
|
||||||
# check all sessions are banned (and blacklisted in map-file):
|
# check all sessions are banned (and blacklisted in map-file):
|
||||||
|
|
|
@ -33,8 +33,8 @@ from StringIO import StringIO
|
||||||
|
|
||||||
from utils import LogCaptureTestCase, logSys as DefLogSys
|
from utils import LogCaptureTestCase, logSys as DefLogSys
|
||||||
|
|
||||||
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, uni_decode
|
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, \
|
||||||
from ..helpers import splitwords
|
splitwords, uni_decode, uni_string
|
||||||
from ..server.mytime import MyTime
|
from ..server.mytime import MyTime
|
||||||
|
|
||||||
|
|
||||||
|
@ -193,6 +193,56 @@ class TestsUtilsTest(LogCaptureTestCase):
|
||||||
self.assertEqual(mbasename("/long/path/base.py"), 'path.base')
|
self.assertEqual(mbasename("/long/path/base.py"), 'path.base')
|
||||||
self.assertEqual(mbasename("/long/path/base"), 'path.base')
|
self.assertEqual(mbasename("/long/path/base"), 'path.base')
|
||||||
|
|
||||||
|
def testUniConverters(self):
|
||||||
|
self.assertRaises(Exception, uni_decode,
|
||||||
|
(b'test' if sys.version_info >= (3,) else u'test'), 'f2b-test::non-existing-encoding')
|
||||||
|
uni_decode((b'test\xcf' if sys.version_info >= (3,) else u'test\xcf'))
|
||||||
|
uni_string(b'test\xcf')
|
||||||
|
uni_string('test\xcf')
|
||||||
|
uni_string(u'test\xcf')
|
||||||
|
|
||||||
|
def testSafeLogging(self):
|
||||||
|
# logging should be exception-safe, to avoid possible errors (concat, str. conversion, representation failures, etc)
|
||||||
|
logSys = DefLogSys
|
||||||
|
class Test:
|
||||||
|
def __init__(self, err=1):
|
||||||
|
self.err = err
|
||||||
|
def __repr__(self):
|
||||||
|
if self.err:
|
||||||
|
raise Exception('no represenation for test!')
|
||||||
|
else:
|
||||||
|
return u'conv-error (\xf2\xf0\xe5\xf2\xe8\xe9), unterminated utf \xcf'
|
||||||
|
test = Test()
|
||||||
|
logSys.log(logging.NOTICE, "test 1a: %r", test)
|
||||||
|
self.assertLogged("Traceback", "no represenation for test!")
|
||||||
|
self.pruneLog()
|
||||||
|
logSys.notice("test 1b: %r", test)
|
||||||
|
self.assertLogged("Traceback", "no represenation for test!")
|
||||||
|
|
||||||
|
self.pruneLog('[phase 2] test error conversion by encoding %s' % sys.getdefaultencoding())
|
||||||
|
test = Test(0)
|
||||||
|
# this may produce coversion error on ascii default encoding:
|
||||||
|
#str(test)
|
||||||
|
logSys.log(logging.NOTICE, "test 2a: %r, %s", test, test)
|
||||||
|
self.assertLogged("test 2a", "Error by logging handler", all=False)
|
||||||
|
logSys.notice("test 2b: %r, %s", test, test)
|
||||||
|
self.assertLogged("test 2b", "Error by logging handler", all=False)
|
||||||
|
|
||||||
|
self.pruneLog('[phase 3] test unexpected error in handler')
|
||||||
|
class _ErrorHandler(logging.Handler):
|
||||||
|
def handle(self, record):
|
||||||
|
raise Exception('error in handler test!')
|
||||||
|
_org_handler = logSys.handlers
|
||||||
|
try:
|
||||||
|
logSys.handlers = list(logSys.handlers)
|
||||||
|
logSys.handlers += [_ErrorHandler()]
|
||||||
|
logSys.log(logging.NOTICE, "test 3a")
|
||||||
|
logSys.notice("test 3b")
|
||||||
|
finally:
|
||||||
|
logSys.handlers = _org_handler
|
||||||
|
# we should reach this line without errors!
|
||||||
|
self.pruneLog('OK')
|
||||||
|
|
||||||
def testTraceBack(self):
|
def testTraceBack(self):
|
||||||
# pretty much just a smoke test since tests runners swallow all the detail
|
# pretty much just a smoke test since tests runners swallow all the detail
|
||||||
|
|
||||||
|
@ -304,8 +354,10 @@ class TestsUtilsTest(LogCaptureTestCase):
|
||||||
## assertLogged, assertNotLogged negative case:
|
## assertLogged, assertNotLogged negative case:
|
||||||
self.pruneLog()
|
self.pruneLog()
|
||||||
logSys.debug('test "xyz"')
|
logSys.debug('test "xyz"')
|
||||||
self._testAssertionErrorRE(r"All of the .* were found present in the log",
|
self._testAssertionErrorRE(r".* was found in the log",
|
||||||
self.assertNotLogged, 'test "xyz"')
|
self.assertNotLogged, 'test "xyz"')
|
||||||
|
self._testAssertionErrorRE(r"All of the .* were found present in the log",
|
||||||
|
self.assertNotLogged, 'test "xyz"', 'test')
|
||||||
self._testAssertionErrorRE(r"was found in the log",
|
self._testAssertionErrorRE(r"was found in the log",
|
||||||
self.assertNotLogged, 'test', 'xyz', all=True)
|
self.assertNotLogged, 'test', 'xyz', all=True)
|
||||||
self._testAssertionErrorRE(r"was not found in the log",
|
self._testAssertionErrorRE(r"was not found in the log",
|
||||||
|
@ -364,13 +416,10 @@ class TestsUtilsTest(LogCaptureTestCase):
|
||||||
|
|
||||||
def testLazyLogging(self):
|
def testLazyLogging(self):
|
||||||
logSys = DefLogSys
|
logSys = DefLogSys
|
||||||
if unittest.F2B.log_lazy:
|
logSys.debug('lazy logging: %r', unittest.F2B.log_lazy)
|
||||||
# wrong logging syntax will throw an error lazy (on demand):
|
# wrong logging syntax will don't throw an error anymore (logged now):
|
||||||
logSys.debug('test', 1, 2, 3)
|
logSys.notice('test', 1, 2, 3)
|
||||||
self.assertRaisesRegexp(Exception, 'not all arguments converted', lambda: self.assertNotLogged('test'))
|
self.assertLogged('not all arguments converted')
|
||||||
else: # pragma: no cover
|
|
||||||
# wrong logging syntax will throw an error directly:
|
|
||||||
self.assertRaisesRegexp(Exception, 'not all arguments converted', lambda: logSys.debug('test', 1, 2, 3))
|
|
||||||
|
|
||||||
|
|
||||||
class MyTimeTest(unittest.TestCase):
|
class MyTimeTest(unittest.TestCase):
|
||||||
|
|
|
@ -186,7 +186,7 @@ class F2B(DefaultTestOptions):
|
||||||
|
|
||||||
def __init__(self, opts):
|
def __init__(self, opts):
|
||||||
self.__dict__ = opts.__dict__
|
self.__dict__ = opts.__dict__
|
||||||
if self.fast:
|
if self.fast: # pragma: no cover - normal mode in travis
|
||||||
self.memory_db = True
|
self.memory_db = True
|
||||||
self.no_gamin = True
|
self.no_gamin = True
|
||||||
self.__dict__['share_config'] = {}
|
self.__dict__['share_config'] = {}
|
||||||
|
@ -658,7 +658,7 @@ class LogCaptureTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def truncate(self, size=None):
|
def truncate(self, size=None):
|
||||||
"""Truncate the internal buffer and records."""
|
"""Truncate the internal buffer and records."""
|
||||||
if size:
|
if size: # pragma: no cover - not implemented now
|
||||||
raise Exception('invalid size argument: %r, should be None or 0' % size)
|
raise Exception('invalid size argument: %r, should be None or 0' % size)
|
||||||
self._val = ''
|
self._val = ''
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
@ -667,11 +667,14 @@ class LogCaptureTestCase(unittest.TestCase):
|
||||||
self._strm.truncate(0)
|
self._strm.truncate(0)
|
||||||
|
|
||||||
def __write(self, record):
|
def __write(self, record):
|
||||||
|
try:
|
||||||
msg = record.getMessage() + '\n'
|
msg = record.getMessage() + '\n'
|
||||||
try:
|
try:
|
||||||
self._strm.write(msg)
|
self._strm.write(msg)
|
||||||
except UnicodeEncodeError:
|
except UnicodeEncodeError: # pragma: no cover - normally unreachable now
|
||||||
self._strm.write(msg.encode('UTF-8'))
|
self._strm.write(msg.encode('UTF-8', 'replace'))
|
||||||
|
except Exception as e: # pragma: no cover - normally unreachable
|
||||||
|
self._strm.write('Error by logging handler: %r' % e)
|
||||||
|
|
||||||
def getvalue(self):
|
def getvalue(self):
|
||||||
"""Return current buffer as whole string."""
|
"""Return current buffer as whole string."""
|
||||||
|
@ -698,7 +701,8 @@ class LogCaptureTestCase(unittest.TestCase):
|
||||||
# submit already emitted (delivered to handle) records:
|
# submit already emitted (delivered to handle) records:
|
||||||
for record in recs:
|
for record in recs:
|
||||||
self.__write(record)
|
self.__write(record)
|
||||||
elif lck: # reset dirty buffer flag (if we can lock, otherwise just next time):
|
elif lck: # pragma: no cover - too sporadic for coverage
|
||||||
|
# reset dirty buffer flag (if we can lock, otherwise just next time):
|
||||||
self._dirty &= ~1 # reset dirty buffer flag
|
self._dirty &= ~1 # reset dirty buffer flag
|
||||||
self._lock.release()
|
self._lock.release()
|
||||||
# cache (outside of log to avoid dead-locking during cross lock within self._strm):
|
# cache (outside of log to avoid dead-locking during cross lock within self._strm):
|
||||||
|
@ -807,7 +811,7 @@ class LogCaptureTestCase(unittest.TestCase):
|
||||||
all : boolean (default False) if True should fail if any of s logged
|
all : boolean (default False) if True should fail if any of s logged
|
||||||
"""
|
"""
|
||||||
logged = self._log.getvalue()
|
logged = self._log.getvalue()
|
||||||
if not kwargs.get('all', False):
|
if len(s) > 1 and not kwargs.get('all', False):
|
||||||
for s_ in s:
|
for s_ in s:
|
||||||
if s_ not in logged:
|
if s_ not in logged:
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue