Merge branch '0.10-fix-decoding-issues' into 0.11

pull/2125/merge
sebres 2018-07-04 20:47:34 +02:00
commit 76cb1c64ce
12 changed files with 244 additions and 121 deletions

View File

@ -7,5 +7,6 @@ source =
[report]
exclude_lines =
pragma: no cover
pragma: systemd no cover
pragma: ?no ?cover
pragma: ?${F2B_PY}.x no ?cover
pragma: ?systemd no ?cover

View File

@ -18,8 +18,9 @@ python:
- pypy3.3-5.5-alpha
before_install:
- echo "running under $TRAVIS_PYTHON_VERSION"
- if [[ $TRAVIS_PYTHON_VERSION == 2* || $TRAVIS_PYTHON_VERSION == pypy* && $TRAVIS_PYTHON_VERSION != pypy3* ]]; then export F2B_PY_2=true && echo "Set F2B_PY_2"; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3* || $TRAVIS_PYTHON_VERSION == pypy3* ]]; then export F2B_PY_3=true && echo "Set F2B_PY_3"; fi
- if [[ $TRAVIS_PYTHON_VERSION == 2* || $TRAVIS_PYTHON_VERSION == pypy* && $TRAVIS_PYTHON_VERSION != pypy3* ]]; then export F2B_PY=2; fi
- if [[ $TRAVIS_PYTHON_VERSION == 3* || $TRAVIS_PYTHON_VERSION == pypy3* ]]; then export F2B_PY=3; fi
- echo "Set F2B_PY=$F2B_PY"
- travis_retry sudo apt-get update -qq
# Set this so sudo executes the correct python binary
# Anything not using sudo will already have the correct environment
@ -31,20 +32,20 @@ install:
# coveralls
- travis_retry pip install coveralls codecov
# dnspython or dnspython3
- if [[ "$F2B_PY_2" ]]; then travis_retry pip install dnspython; fi
- if [[ "$F2B_PY_3" ]]; then travis_retry pip install dnspython3; fi
- if [[ "$F2B_PY" = 2 ]]; then travis_retry pip install dnspython; fi
- if [[ "$F2B_PY" = 3 ]]; then travis_retry pip install dnspython3; fi
# gamin - install manually (not in PyPI) - travis-ci system Python is 2.7
- if [[ $TRAVIS_PYTHON_VERSION == 2.7 ]]; then travis_retry sudo apt-get install -qq python-gamin && cp /usr/share/pyshared/gamin.py /usr/lib/pyshared/python2.7/_gamin.so $VIRTUAL_ENV/lib/python2.7/site-packages/; fi
# pyinotify
- travis_retry pip install pyinotify
before_script:
# Manually execute 2to3 for now
- if [[ "$F2B_PY_3" ]]; then ./fail2ban-2to3; fi
- if [[ "$F2B_PY" = 3 ]]; then ./fail2ban-2to3; fi
script:
# Keep the legacy setup.py test approach of checking coverage for python2
- if [[ "$F2B_PY_2" ]]; then coverage run setup.py test; fi
- if [[ "$F2B_PY" = 2 ]]; then coverage run setup.py test; fi
# Coverage doesn't pick up setup.py test with python3, so run it directly (with same verbosity as from setup)
- if [[ "$F2B_PY_3" ]]; then coverage run bin/fail2ban-testcases --verbosity=2; fi
- if [[ "$F2B_PY" = 3 ]]; then coverage run bin/fail2ban-testcases --verbosity=2; fi
# Use $VENV_BIN (not python) or else sudo will always run the system's python (2.7)
- sudo $VENV_BIN/pip install .
# Doc files should get installed on Travis under Linux

View File

@ -36,14 +36,90 @@ from .server.mytime import MyTime
PREFER_ENC = locale.getpreferredencoding()
# correct preferred encoding if lang not set in environment:
if PREFER_ENC.startswith('ANSI_'): # pragma: no cover
if all((os.getenv(v) in (None, "") for v in ('LANGUAGE', 'LC_ALL', 'LC_CTYPE', 'LANG'))):
if sys.stdout and not sys.stdout.encoding.startswith('ANSI_'):
PREFER_ENC = sys.stdout.encoding
elif all((os.getenv(v) in (None, "") for v in ('LANGUAGE', 'LC_ALL', 'LC_CTYPE', 'LANG'))):
PREFER_ENC = 'UTF-8';
# py-2.x: try to minimize influence of sporadic conversion errors on python 2.x,
# caused by implicit converting of string/unicode (e. g. `str(u"\uFFFD")` produces an error
# if default encoding is 'ascii');
if sys.version_info < (3,): # pragma: 3.x no cover
# correct default (global system) encoding (mostly UTF-8):
def __resetDefaultEncoding(encoding):
global PREFER_ENC
ode = sys.getdefaultencoding().upper()
if ode == 'ASCII' and ode != PREFER_ENC.upper():
# setdefaultencoding is normally deleted after site initialized, so hack-in using load of sys-module:
_sys = sys
if not hasattr(_sys, "setdefaultencoding"):
try:
from imp import load_dynamic as __ldm
_sys = __ldm('_sys', 'sys')
except ImportError: # pragma: no cover - only if load_dynamic fails
reload(sys)
_sys = sys
if hasattr(_sys, "setdefaultencoding"):
_sys.setdefaultencoding(encoding)
# override to PREFER_ENC:
__resetDefaultEncoding(PREFER_ENC)
del __resetDefaultEncoding
# todo: rewrite explicit (and implicit) str-conversions via encode/decode with IO-encoding (sys.stdout.encoding),
# e. g. inside tags-replacement by command-actions, etc.
#
# Following "uni_decode", "uni_string" functions unified python independent any
# to string converting.
#
# Typical example resp. work-case for understanding the coding/decoding issues:
#
# [isinstance('', str), isinstance(b'', str), isinstance(u'', str)]
# [True, True, False]; # -- python2
# [True, False, True]; # -- python3
#
if sys.version_info >= (3,): # pragma: 2.x no cover
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
try:
if isinstance(x, bytes):
return x.decode(enc, errors)
return x
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
if errors != 'strict':
raise
return x.decode(enc, 'replace')
def uni_string(x):
if not isinstance(x, bytes):
return str(x)
return x.decode(PREFER_ENC, 'replace')
else: # pragma: 3.x no cover
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
try:
if isinstance(x, unicode):
return x.encode(enc, errors)
return x
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
if errors != 'strict':
raise
return x.encode(enc, 'replace')
if sys.getdefaultencoding().upper() != 'UTF-8': # pragma: no cover - utf-8 is default encoding now
def uni_string(x):
if not isinstance(x, unicode):
return str(x)
return x.encode(PREFER_ENC, 'replace')
else:
uni_string = str
def _as_bool(val):
return bool(val) if not isinstance(val, basestring) \
else val.lower() in ('1', 'on', 'true', 'yes')
def formatExceptionInfo():
""" Consistently format exception information """
cla, exc = sys.exc_info()[:2]
return (cla.__name__, str(exc))
return (cla.__name__, uni_string(exc))
#
@ -213,41 +289,6 @@ else:
r.update(y)
return r
#
# Following "uni_decode" function unified python independent any to string converting
#
# Typical example resp. work-case for understanding the coding/decoding issues:
#
# [isinstance('', str), isinstance(b'', str), isinstance(u'', str)]
# [True, True, False]; # -- python2
# [True, False, True]; # -- python3
#
if sys.version_info >= (3,):
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
try:
if isinstance(x, bytes):
return x.decode(enc, errors)
return x
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
if errors != 'strict':
raise
return uni_decode(x, enc, 'replace')
else:
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
try:
if isinstance(x, unicode):
return x.encode(enc, errors)
return x
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
if errors != 'strict':
raise
return uni_decode(x, enc, 'replace')
def _as_bool(val):
return bool(val) if not isinstance(val, basestring) \
else val.lower() in ('1', 'on', 'true', 'yes')
#
# Following function used for parse options from parameter (e.g. `name[p1=0, p2="..."][p3='...']`).
#
@ -325,7 +366,7 @@ def substituteRecursiveTags(inptags, conditional='',
if tag in ignore or tag in done: continue
# ignore replacing callable items from calling map - should be converted on demand only (by get):
if noRecRepl and callable(tags.getRawItem(tag)): continue
value = orgval = str(tags[tag])
value = orgval = uni_string(tags[tag])
# search and replace all tags within value, that can be interpolated using other tags:
m = tre_search(value)
refCounts = {}
@ -360,7 +401,7 @@ def substituteRecursiveTags(inptags, conditional='',
m = tre_search(value, m.end())
continue
# if calling map - be sure we've string:
if noRecRepl: repl = str(repl)
if noRecRepl: repl = uni_string(repl)
value = value.replace('<%s>' % rtag, repl)
#logSys.log(5, 'value now: %s' % value)
# increment reference count:

View File

@ -36,7 +36,7 @@ from .failregex import mapTag2Opt
from .ipdns import asip, DNSUtils
from .mytime import MyTime
from .utils import Utils
from ..helpers import getLogger, _merge_copy_dicts, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
from ..helpers import getLogger, _merge_copy_dicts, uni_string, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
# Gets the instance of the logger.
logSys = getLogger(__name__)
@ -83,6 +83,8 @@ class CallingMap(MutableMapping, object):
The dictionary data which can be accessed to obtain items uncalled
"""
CM_REPR_ITEMS = ()
# immutable=True saves content between actions, without interim copying (save original on demand, recoverable via reset)
__slots__ = ('data', 'storage', 'immutable', '__org_data')
def __init__(self, *args, **kwargs):
@ -98,14 +100,29 @@ class CallingMap(MutableMapping, object):
pass
self.immutable = immutable
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, self._asdict())
def _asrepr(self, calculated=False):
# be sure it is suitable as string, so use str as checker:
return "%s(%r)" % (self.__class__.__name__, self._asdict(calculated, str))
def _asdict(self):
try:
return dict(self)
except:
return dict(self.data, **self.storage)
__repr__ = _asrepr
def _asdict(self, calculated=False, checker=None):
d = dict(self.data, **self.storage)
if not calculated:
return dict((n,v) for n,v in d.iteritems() \
if not callable(v) or n in self.CM_REPR_ITEMS)
for n,v in d.items():
if callable(v):
try:
# calculate:
v = self.__getitem__(n)
# convert if needed:
if checker: checker(v)
# store calculated:
d[n] = v
except: # can't calculate - just ignore it
pass
return d
def getRawItem(self, key):
try:
@ -628,7 +645,7 @@ class CommandAction(ActionBase):
if value is None:
# fallback (no or default replacement)
return ADD_REPL_TAGS_CM.get(tag, m.group())
value = str(value) # assure string
value = uni_string(value) # assure string
if tag in cls._escapedTags:
# That one needs to be escaped since its content is
# out of our control
@ -707,7 +724,7 @@ class CommandAction(ActionBase):
except KeyError:
# fallback (no or default replacement)
return ADD_REPL_TAGS_CM.get(tag, m.group())
value = str(value) # assure string
value = uni_string(value) # assure string
# replacement for tag:
return escapeVal(tag, value)
@ -721,7 +738,7 @@ class CommandAction(ActionBase):
def substTag(m):
tag = mapTag2Opt(m.groups()[0])
try:
value = str(tickData[tag])
value = uni_string(tickData[tag])
except KeyError:
return ""
return escapeVal("F_"+tag, value)

View File

@ -291,6 +291,8 @@ class Actions(JailThread, Mapping):
class ActionInfo(CallingMap):
CM_REPR_ITEMS = ("fid", "raw-ticket")
AI_DICT = {
"ip": lambda self: self.__ticket.getIP(),
"family": lambda self: self['ip'].familyStr,
@ -310,7 +312,9 @@ class Actions(JailThread, Mapping):
"ipmatches": lambda self: "\n".join(self._mi4ip(True).getMatches()),
"ipjailmatches": lambda self: "\n".join(self._mi4ip().getMatches()),
"ipfailures": lambda self: self._mi4ip(True).getAttempt(),
"ipjailfailures": lambda self: self._mi4ip().getAttempt()
"ipjailfailures": lambda self: self._mi4ip().getAttempt(),
# raw ticket info:
"raw-ticket": lambda self: repr(self.__ticket)
}
__slots__ = CallingMap.__slots__ + ('__ticket', '__jail', '__mi4ip')

View File

@ -33,60 +33,77 @@ from threading import RLock
from .mytime import MyTime
from .ticket import FailTicket
from .utils import Utils
from ..helpers import getLogger, PREFER_ENC
from ..helpers import getLogger, uni_string, PREFER_ENC
# Gets the instance of the logger.
logSys = getLogger(__name__)
if sys.version_info >= (3,):
def _json_default(x):
if isinstance(x, set):
x = list(x)
return x
def _json_default(x):
"""Avoid errors on types unknow in json-adapters."""
if isinstance(x, set):
x = list(x)
return uni_string(x)
if sys.version_info >= (3,): # pragma: 2.x no cover
def _json_dumps_safe(x):
try:
x = json.dumps(x, ensure_ascii=False, default=_json_default).encode(
PREFER_ENC, 'replace')
except Exception as e: # pragma: no cover
logSys.error('json dumps failed: %s', e)
except Exception as e:
# adapter handler should be exception-safe, so avoid possible errors in log-handlers (concat, str. conversion, etc)
try:
logSys.error('json dumps failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
except: # pragma: no cover
pass
x = '{}'
return x
def _json_loads_safe(x):
try:
x = json.loads(x.decode(
PREFER_ENC, 'replace'))
except Exception as e: # pragma: no cover
logSys.error('json loads failed: %s', e)
x = json.loads(x.decode(PREFER_ENC, 'replace'))
except Exception as e:
# converter handler should be exception-safe, so avoid possible errors in log-handlers (concat, str. conversion, etc)
try:
logSys.error('json loads failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
except: # pragma: no cover
pass
x = {}
return x
else:
else: # pragma: 3.x no cover
def _normalize(x):
if isinstance(x, dict):
return dict((_normalize(k), _normalize(v)) for k, v in x.iteritems())
elif isinstance(x, (list, set)):
return [_normalize(element) for element in x]
elif isinstance(x, unicode):
return x.encode(PREFER_ENC)
else:
return x
# in 2.x default text_factory is unicode - so return proper unicode here:
return x.encode(PREFER_ENC, 'replace').decode(PREFER_ENC)
elif isinstance(x, basestring):
return x.decode(PREFER_ENC, 'replace')
return x
def _json_dumps_safe(x):
try:
x = json.dumps(_normalize(x), ensure_ascii=False).decode(
PREFER_ENC, 'replace')
except Exception as e: # pragma: no cover
logSys.error('json dumps failed: %s', e)
x = json.dumps(_normalize(x), ensure_ascii=False, default=_json_default)
except Exception as e:
# adapter handler should be exception-safe, so avoid possible errors in log-handlers (concat, str. conversion, etc)
try:
logSys.error('json dumps failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
except: # pragma: no cover
pass
x = '{}'
return x
def _json_loads_safe(x):
try:
x = _normalize(json.loads(x.decode(
PREFER_ENC, 'replace')))
except Exception as e: # pragma: no cover
logSys.error('json loads failed: %s', e)
x = json.loads(x.decode(PREFER_ENC, 'replace'))
except Exception as e:
# converter handler should be exception-safe, so avoid possible errors in log-handlers (concat, str. conversion, etc)
try:
logSys.error('json loads failed: %r', e, exc_info=logSys.getEffectiveLevel() <= 4)
except: # pragma: no cover
pass
x = {}
return x
@ -199,6 +216,8 @@ class Fail2BanDb(object):
self._db = sqlite3.connect(
filename, check_same_thread=False,
detect_types=sqlite3.PARSE_DECLTYPES)
# # to allow use multi-byte utf-8
# self._db.text_factory = str
self._bansMergedCache = {}

View File

@ -147,7 +147,7 @@ class Ticket(object):
self._data['matches'] = matches or []
def getMatches(self):
return [(line if isinstance(line, basestring) else "".join(line)) \
return [(line if not isinstance(line, (list, tuple)) else "".join(line)) \
for line in self._data.get('matches', ())]
@property

View File

@ -567,13 +567,18 @@ class CommandActionTest(LogCaptureTestCase):
'b': lambda self: self['a'] + 6,
'c': ''
})
s = repr(m)
s = repr(m); # only stored values (no calculated)
self.assertNotIn("'a': ", s)
self.assertNotIn("'b': ", s)
self.assertIn("'c': ''", s)
s = m._asrepr(True) # all values (including calculated)
self.assertIn("'a': 5", s)
self.assertIn("'b': 11", s)
self.assertIn("'c': ''", s)
m['c'] = lambda self: self['xxx'] + 7; # unresolvable
s = repr(m)
s = m._asrepr(True)
self.assertIn("'a': 5", s)
self.assertIn("'b': 11", s)
self.assertIn("'c': ", s) # presents as callable

View File

@ -35,10 +35,11 @@ from ..server.ticket import FailTicket
from ..server.actions import Actions, Utils
from .dummyjail import DummyJail
try:
from ..server.database import Fail2BanDb as Fail2BanDb
from ..server import database
Fail2BanDb = database.Fail2BanDb
except ImportError: # pragma: no cover
Fail2BanDb = None
from .utils import LogCaptureTestCase
from .utils import LogCaptureTestCase, logSys as DefLogSys
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
@ -275,30 +276,53 @@ class DatabaseTest(LogCaptureTestCase):
self.testAddJail()
# invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ...
tickets = [
FailTicket("127.0.0.1", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.2", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.3", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')])
FailTicket("127.0.0.1", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.2", 0, ['user "test"', u'user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.3", 0, ['user "test"', b'user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.4", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xe4\xf6\xfc\xdf"']),
FailTicket("127.0.0.5", 0, ['user "test"', 'unterminated \xcf']),
FailTicket("127.0.0.6", 0, ['user "test"', u'unterminated \xcf']),
FailTicket("127.0.0.7", 0, ['user "test"', b'unterminated \xcf'])
]
self.db.addBan(self.jail, tickets[0])
self.db.addBan(self.jail, tickets[1])
self.db.addBan(self.jail, tickets[2])
for ticket in tickets:
self.db.addBan(self.jail, ticket)
self.assertNotLogged("json dumps failed")
readtickets = self.db.getBans(jail=self.jail)
self.assertEqual(len(readtickets), 3)
## python 2 or 3 :
invstr = u'user "\ufffd\ufffd\ufffd\ufffd\ufffd"'.encode('utf-8', 'replace')
self.assertTrue(
readtickets[0] == FailTicket("127.0.0.1", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
or readtickets[0] == tickets[0]
)
self.assertTrue(
readtickets[1] == FailTicket("127.0.0.2", 0, [invstr, u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')])
or readtickets[1] == tickets[1]
)
self.assertTrue(
readtickets[2] == FailTicket("127.0.0.3", 0, [invstr, 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'])
or readtickets[2] == tickets[2]
)
self.assertNotLogged("json loads failed")
## all tickets available
self.assertEqual(len(readtickets), 7)
## too different to cover all possible constellations for python 2 and 3,
## can replace/ignore some non-ascii chars by json dump/load (unicode/str),
## so check ip and matches count only:
for i, ticket in enumerate(tickets):
DefLogSys.debug('readtickets[%d]: %r', i, readtickets[i].getData())
DefLogSys.debug(' == tickets[%d]: %r', i, ticket.getData())
self.assertEqual(readtickets[i].getIP(), ticket.getIP())
self.assertEqual(len(readtickets[i].getMatches()), len(ticket.getMatches()))
## simulate errors in dumps/loads:
priorEnc = database.PREFER_ENC
try:
database.PREFER_ENC = 'f2b-test::non-existing-encoding'
for ticket in tickets:
self.db.addBan(self.jail, ticket)
self.assertLogged("json dumps failed")
readtickets = self.db.getBans(jail=self.jail)
self.assertLogged("json loads failed")
## despite errors all tickets written and loaded (check adapter-handlers are error-safe):
self.assertEqual(len(readtickets), 14)
finally:
database.PREFER_ENC = priorEnc
def _testAdd3Bans(self):
self.testAddJail()

View File

@ -1256,8 +1256,8 @@ class Fail2banServerTest(Fail2banClientServerBase):
_write_file(lgfn, "w+",
str(int(MyTime.time())) + ' failure "125-000-001" - 192.0.2.1',
str(int(MyTime.time())) + ' failure "125-000-002" - 192.0.2.1',
str(int(MyTime.time())) + ' failure "125-000-003" - 192.0.2.1',
str(int(MyTime.time())) + ' failure "125-000-004" - 192.0.2.1',
str(int(MyTime.time())) + ' failure "125-000-003" - 192.0.2.1 (\xf2\xf0\xe5\xf2\xe8\xe9)',
str(int(MyTime.time())) + ' failure "125-000-004" - 192.0.2.1 (\xf2\xf0\xe5\xf2\xe8\xe9)',
str(int(MyTime.time())) + ' failure "125-000-005" - 192.0.2.1',
)
# check all sessions are banned (and blacklisted in map-file):

View File

@ -33,8 +33,8 @@ from StringIO import StringIO
from utils import LogCaptureTestCase, logSys as DefLogSys
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, uni_decode
from ..helpers import splitwords
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, \
splitwords, uni_decode, uni_string
from ..server.mytime import MyTime
@ -193,6 +193,14 @@ class TestsUtilsTest(LogCaptureTestCase):
self.assertEqual(mbasename("/long/path/base.py"), 'path.base')
self.assertEqual(mbasename("/long/path/base"), 'path.base')
def testUniConverters(self):
self.assertRaises(Exception, uni_decode,
(b'test' if sys.version_info >= (3,) else u'test'), 'f2b-test::non-existing-encoding')
uni_decode((b'test\xcf' if sys.version_info >= (3,) else u'test\xcf'))
uni_string(b'test\xcf')
uni_string('test\xcf')
uni_string(u'test\xcf')
def testTraceBack(self):
# pretty much just a smoke test since tests runners swallow all the detail
@ -304,8 +312,10 @@ class TestsUtilsTest(LogCaptureTestCase):
## assertLogged, assertNotLogged negative case:
self.pruneLog()
logSys.debug('test "xyz"')
self._testAssertionErrorRE(r"All of the .* were found present in the log",
self._testAssertionErrorRE(r".* was found in the log",
self.assertNotLogged, 'test "xyz"')
self._testAssertionErrorRE(r"All of the .* were found present in the log",
self.assertNotLogged, 'test "xyz"', 'test')
self._testAssertionErrorRE(r"was found in the log",
self.assertNotLogged, 'test', 'xyz', all=True)
self._testAssertionErrorRE(r"was not found in the log",

View File

@ -186,7 +186,7 @@ class F2B(DefaultTestOptions):
def __init__(self, opts):
self.__dict__ = opts.__dict__
if self.fast:
if self.fast: # pragma: no cover - normal mode in travis
self.memory_db = True
self.no_gamin = True
self.__dict__['share_config'] = {}
@ -662,7 +662,7 @@ class LogCaptureTestCase(unittest.TestCase):
def truncate(self, size=None):
"""Truncate the internal buffer and records."""
if size:
if size: # pragma: no cover - not implemented now
raise Exception('invalid size argument: %r, should be None or 0' % size)
self._val = ''
with self._lock:
@ -674,8 +674,8 @@ class LogCaptureTestCase(unittest.TestCase):
msg = record.getMessage() + '\n'
try:
self._strm.write(msg)
except UnicodeEncodeError:
self._strm.write(msg.encode('UTF-8'))
except UnicodeEncodeError: # pragma: no cover - normally unreachable now
self._strm.write(msg.encode('UTF-8', 'replace'))
def getvalue(self):
"""Return current buffer as whole string."""
@ -702,7 +702,8 @@ class LogCaptureTestCase(unittest.TestCase):
# submit already emitted (delivered to handle) records:
for record in recs:
self.__write(record)
elif lck: # reset dirty buffer flag (if we can lock, otherwise just next time):
elif lck: # pragma: no cover - too sporadic for coverage
# reset dirty buffer flag (if we can lock, otherwise just next time):
self._dirty &= ~1 # reset dirty buffer flag
self._lock.release()
# cache (outside of log to avoid dead-locking during cross lock within self._strm):
@ -811,7 +812,7 @@ class LogCaptureTestCase(unittest.TestCase):
all : boolean (default False) if True should fail if any of s logged
"""
logged = self._log.getvalue()
if not kwargs.get('all', False):
if len(s) > 1 and not kwargs.get('all', False):
for s_ in s:
if s_ not in logged:
return