@ -22,90 +22,17 @@ __author__ = "Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2013 Yaroslav Halchenko"
__license__ = "GPL"
import logging, os, re, traceback, time, unittest
from os.path import basename, dirname
import logging
import os
import re
import time
import unittest
from StringIO import StringIO
from ..server.mytime import MyTime
logSys = logging.getLogger(__name__)
# Following "traceback" functions are adopted from PyMVPA distributed
# under MIT/Expat and copyright by PyMVPA developers (i.e. me and
# Michael). Hereby I re-license derivative work on these pieces under GPL
# to stay in line with the main Fail2Ban license
def mbasename(s):
"""Custom function to include directory name if filename is too common
Also strip .py at the end
base = basename(s)
if base.endswith('.py'):
base = base[:-3]
if base in set(['base', '__init__']):
base = basename(dirname(s)) + '.' + base
return base
class TraceBack(object):
"""Customized traceback to be included in debug messages
def __init__(self, compress=False):
"""Initialize TrackBack metric
compress : bool
if True then prefix common with previous invocation gets
replaced with ...
self.__prev = ""
self.__compress = compress
def __call__(self):
ftb = traceback.extract_stack(limit=100)[:-2]
entries = [[mbasename(x[0]), dirname(x[0]), str(x[1])] for x in ftb]
entries = [ [e[0], e[2]] for e in entries
if not (e[0] in ['unittest', 'logging.__init__']
or e[1].endswith('/unittest'))]
# lets make it more concise
entries_out = [entries[0]]
for entry in entries[1:]:
if entry[0] == entries_out[-1][0]:
entries_out[-1][1] += ',%s' % entry[1]
sftb = '>'.join(['%s:%s' % (mbasename(x[0]),
x[1]) for x in entries_out])
if self.__compress:
# lets remove part which is common with previous invocation
prev_next = sftb
common_prefix = os.path.commonprefix((self.__prev, sftb))
common_prefix2 = re.sub('>[^>]*$', '', common_prefix)
if common_prefix2 != "":
sftb = '...' + sftb[len(common_prefix2):]
self.__prev = prev_next
return sftb
class FormatterWithTraceBack(logging.Formatter):
"""Custom formatter which expands %(tb) and %(tbc) with tracebacks
TODO: might need locking in case of compressed tracebacks
def __init__(self, fmt, *args, **kwargs):
logging.Formatter.__init__(self, fmt=fmt, *args, **kwargs)
compress = '%(tbc)s' in fmt
self._tb = TraceBack(compress=compress)
def format(self, record):
record.tbc = record.tb = self._tb()
return logging.Formatter.format(self, record)
def mtimesleep():
# no sleep now should be necessary since polling tracks now not only
# mtime but also ino and size
@ -146,7 +73,6 @@ def gatherTests(regexps=None, no_network=False):
if not regexps: # pragma: no cover
tests = unittest.TestSuite()
else: # pragma: no cover
import re
class FilteredTestSuite(unittest.TestSuite):
_regexps = [re.compile(r) for r in regexps]
def addTest(self, suite):