|
|
|
@ -31,6 +31,7 @@ import tempfile
|
|
|
|
|
import shutil |
|
|
|
|
import sys |
|
|
|
|
import time |
|
|
|
|
import threading |
|
|
|
|
import unittest |
|
|
|
|
|
|
|
|
|
from cStringIO import StringIO |
|
|
|
@ -63,7 +64,8 @@ os.putenv('PYTHONPATH', os.path.dirname(os.path.dirname(os.path.dirname(
|
|
|
|
|
class DefaultTestOptions(optparse.Values): |
|
|
|
|
def __init__(self): |
|
|
|
|
self.__dict__ = { |
|
|
|
|
'log_level': None, 'log_traceback': None, 'full_traceback': None, |
|
|
|
|
'log_level': None, 'log_lazy': True, |
|
|
|
|
'log_traceback': None, 'full_traceback': None, |
|
|
|
|
'fast': False, 'memory_db': False, 'no_gamin': False, |
|
|
|
|
'no_network': False, 'negate_re': False |
|
|
|
|
} |
|
|
|
@ -123,7 +125,7 @@ def initProcess(opts):
|
|
|
|
|
return opts; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class F2B(optparse.Values): |
|
|
|
|
class F2B(DefaultTestOptions): |
|
|
|
|
def __init__(self, opts): |
|
|
|
|
self.__dict__ = opts.__dict__ |
|
|
|
|
if self.fast: |
|
|
|
@ -448,7 +450,7 @@ if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
|
|
|
|
|
|
|
|
|
|
class LogCaptureTestCase(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
class _MemHandler(logging.StreamHandler): |
|
|
|
|
class _MemHandler(logging.Handler): |
|
|
|
|
"""Logging handler helper |
|
|
|
|
|
|
|
|
|
Affords not to delegate logging to StreamHandler at all, |
|
|
|
@ -457,34 +459,54 @@ class LogCaptureTestCase(unittest.TestCase):
|
|
|
|
|
the log level set to DEBUG. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
|
def __init__(self, lazy=True): |
|
|
|
|
self._lock = threading.Lock() |
|
|
|
|
self._val = None |
|
|
|
|
self._recs = list() |
|
|
|
|
self._strm = StringIO() |
|
|
|
|
logging.StreamHandler.__init__(self, self._strm) |
|
|
|
|
logging.Handler.__init__(self) |
|
|
|
|
if lazy: |
|
|
|
|
self.handle = self._handle_lazy |
|
|
|
|
|
|
|
|
|
def truncate(self, size=None): |
|
|
|
|
"""Truncate the internal buffer and records.""" |
|
|
|
|
if size: |
|
|
|
|
raise Exception('invalid size argument: %r, should be None or 0' % size) |
|
|
|
|
with self._lock: |
|
|
|
|
self._strm.truncate(0) |
|
|
|
|
self._val = None |
|
|
|
|
self._recs = list() |
|
|
|
|
|
|
|
|
|
def __write(self, record): |
|
|
|
|
msg = record.getMessage() + '\n' |
|
|
|
|
try: |
|
|
|
|
self._strm.write(msg) |
|
|
|
|
except UnicodeEncodeError: |
|
|
|
|
self._strm.write(msg.encode('UTF-8')) |
|
|
|
|
|
|
|
|
|
def getvalue(self): |
|
|
|
|
"""Return current buffer as whole string.""" |
|
|
|
|
with self._lock: |
|
|
|
|
# cached: |
|
|
|
|
if self._val is not None: |
|
|
|
|
return self._val |
|
|
|
|
# submit already emitted (delivered to handle) records: |
|
|
|
|
for record in self._recs: |
|
|
|
|
logging.StreamHandler.handle(self, record) |
|
|
|
|
self.__write(record) |
|
|
|
|
self._recs = list() |
|
|
|
|
# cache and return: |
|
|
|
|
self._val = self._strm.getvalue() |
|
|
|
|
return self._val |
|
|
|
|
|
|
|
|
|
def handle(self, record): |
|
|
|
|
"""Handle the specified record.""" |
|
|
|
|
def handle(self, record): # pragma: no cover |
|
|
|
|
"""Handle the specified record direct (not lazy)""" |
|
|
|
|
with self._lock: |
|
|
|
|
self._val = None |
|
|
|
|
self.__write(record) |
|
|
|
|
|
|
|
|
|
def _handle_lazy(self, record): |
|
|
|
|
"""Lazy handle the specified record on demand""" |
|
|
|
|
with self._lock: |
|
|
|
|
self._val = None |
|
|
|
|
self._recs.append(record) |
|
|
|
|
|
|
|
|
@ -498,7 +520,7 @@ class LogCaptureTestCase(unittest.TestCase):
|
|
|
|
|
self._old_level = logSys.level |
|
|
|
|
self._old_handlers = logSys.handlers |
|
|
|
|
# Let's log everything into a string |
|
|
|
|
self._log = LogCaptureTestCase._MemHandler() |
|
|
|
|
self._log = LogCaptureTestCase._MemHandler(unittest.F2B.log_lazy) |
|
|
|
|
logSys.handlers = [self._log] |
|
|
|
|
if self._old_level <= logging.DEBUG: # so if DEBUG etc -- show them (and log it in travis)! |
|
|
|
|
print("") |
|
|
|
|