@ -49,7 +49,8 @@ class FilterSamplesRegex(unittest.TestCase):
def setUp ( self ) :
""" Call before every test case. """
super ( FilterSamplesRegex , self ) . setUp ( )
self . filter = None
self . _filters = dict ( )
self . _filterTests = None
setUpMyTime ( )
def tearDown ( self ) :
@ -79,14 +80,20 @@ class FilterSamplesRegex(unittest.TestCase):
RE_WRONG_GREED . search ( ' non-greedy .+? test ' + RE_HOST + ' test vary catch-all .* anchored$ ' ) )
def _readFilter ( self , name , basedir , opts = None ) :
self . filter = Filter ( None )
self . filter . returnRawHost = True
self . filter . checkAllRegex = True
self . filter . checkFindTime = False
self . filter . active = True
def _readFilter ( self , fltName , name , basedir , opts = None ) :
# Check filter with this option combination was already used:
flt = self . _filters . get ( fltName )
if flt :
return flt
# First time:
flt = Filter ( None )
flt . returnRawHost = True
flt . checkAllRegex = True
flt . checkFindTime = False
flt . active = True
# Read filter:
if opts is None : opts = dict ( )
# Check filter exists
opts = opts . copy ( )
filterConf = FilterReader ( name , " jail " , opts ,
basedir = basedir , share_config = unittest . F2B . share_config )
self . assertEqual ( filterConf . getFile ( ) , name )
@ -103,25 +110,28 @@ class FilterSamplesRegex(unittest.TestCase):
self . fail ( ' Unexpected config-token %r in stream ' % ( opt , ) )
for optval in optval :
if opt [ 2 ] == " prefregex " :
self . f i lter . prefRegex = optval
f lt. prefRegex = optval
elif opt [ 2 ] == " addfailregex " :
self . f i lter . addFailRegex ( optval )
f lt. addFailRegex ( optval )
elif opt [ 2 ] == " addignoreregex " :
self . f i lter . addIgnoreRegex ( optval )
f lt. addIgnoreRegex ( optval )
elif opt [ 2 ] == " maxlines " :
self . f i lter . setMaxLines ( optval )
f lt. setMaxLines ( optval )
elif opt [ 2 ] == " datepattern " :
self . f i lter . setDatePattern ( optval )
f lt. setDatePattern ( optval )
# test regexp contains greedy catch-all before <HOST>, that is
# not hard-anchored at end or has not precise sub expression after <HOST>:
regexList = self . f i lter . getFailRegex ( )
regexList = f lt. getFailRegex ( )
for fr in regexList :
if RE_WRONG_GREED . search ( fr ) : # pragma: no cover
raise AssertionError ( " Following regexp of \" %s \" contains greedy catch-all before <HOST>, "
" that is not hard-anchored at end or has not precise sub expression after <HOST>: \n %s " %
( name , str ( fr ) . replace ( RE_HOST , ' <HOST> ' ) ) )
return regexList
( fltName , str ( fr ) . replace ( RE_HOST , ' <HOST> ' ) ) )
# Cache within used filter combinations and return:
flt = [ flt , set ( ) ]
self . _filters [ fltName ] = flt
return flt
def testSampleRegexsFactory ( name , basedir ) :
def testFilter ( self ) :
@ -130,17 +140,10 @@ def testSampleRegexsFactory(name, basedir):
os . path . isfile ( os . path . join ( TEST_FILES_DIR , " logs " , name ) ) ,
" No sample log file available for ' %s ' filter " % name )
regexList = None
regexsUsedIdx = set ( )
regexsUsedRe = set ( )
filenames = [ name ]
regexsUsedRe = set ( )
def _testMissingSamples ( ) :
for failRegexIndex , failRegex in enumerate ( regexList ) :
self . assertTrue (
failRegexIndex in regexsUsedIdx or failRegex in regexsUsedRe ,
" Regex for filter ' %s ' has no samples: %i : %r " %
( name , failRegexIndex , failRegex ) )
# process each test-file (note: array filenames can grow during processing):
i = 0
while i < len ( filenames ) :
filename = filenames [ i ] ; i + = 1 ;
@ -154,13 +157,17 @@ def testSampleRegexsFactory(name, basedir):
faildata = json . loads ( jsonREMatch . group ( 2 ) )
# filterOptions - dict in JSON to control filter options (e. g. mode, etc.):
if jsonREMatch . group ( 1 ) == ' filterOptions ' :
# another filter mode - we should check previous also:
if self . filter is not None :
_testMissingSamples ( )
regexsUsedIdx = set ( ) # clear used indices (possible overlapping by mode change)
# read filter with another setting:
self . filter = None
regexList = self . _readFilter ( name , basedir , opts = faildata )
# following lines with another filter options:
self . _filterTests = [ ]
for opts in ( faildata if isinstance ( faildata , list ) else [ faildata ] ) :
# unique filter name (using options combination):
self . assertTrue ( isinstance ( opts , dict ) )
fltName = opts . get ( ' filterName ' )
if not fltName : fltName = str ( opts ) if opts else ' '
fltName = name + fltName
# read it:
flt = self . _readFilter ( fltName , name , basedir , opts = opts )
self . _filterTests . append ( ( fltName , flt ) )
continue
# addFILE - filename to "include" test-files should be additionally parsed:
if jsonREMatch . group ( 1 ) == ' addFILE ' :
@ -176,65 +183,81 @@ def testSampleRegexsFactory(name, basedir):
else : # pragma: no cover - normally unreachable
faildata = { }
if self . filter is None :
regexList = self . _readFilter ( name , basedir , opts = None )
try :
ret = self . filter . processLine ( line )
if not ret :
# Bypass if filter constraint specified:
if faildata . get ( ' filter ' ) and name != faildata . get ( ' filter ' ) :
continue
# Check line is flagged as none match
self . assertFalse ( faildata . get ( ' match ' , True ) ,
" Line not matched when should have " )
continue
failregex , fid , fail2banTime , fail = ret [ 0 ]
# Bypass no failure helpers-regexp:
if not faildata . get ( ' match ' , False ) and ( fid is None or fail . get ( ' nofail ' ) ) :
regexsUsedIdx . add ( failregex )
regexsUsedRe . add ( regexList [ failregex ] )
continue
# Check line is flagged to match
self . assertTrue ( faildata . get ( ' match ' , False ) ,
" Line matched when shouldn ' t have " )
self . assertEqual ( len ( ret ) , 1 ,
" Multiple regexs matched %r " % ( map ( lambda x : x [ 0 ] , ret ) ) )
# Verify match captures (at least fid/host) and timestamp as expected
for k , v in faildata . iteritems ( ) :
if k not in ( " time " , " match " , " desc " , " filter " ) :
fv = fail . get ( k , None )
# Fallback for backwards compatibility (previously no fid, was host only):
if k == " host " and fv is None :
fv = fid
self . assertEqual ( fv , v )
t = faildata . get ( " time " , None )
try :
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S " )
except ValueError :
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S. %f " )
# if filter options was not yet specified:
if not self . _filterTests :
fltName = name
flt = self . _readFilter ( fltName , name , basedir , opts = None )
self . _filterTests = [ ( fltName , flt ) ]
jsonTime = time . mktime ( jsonTimeLocal . timetuple ( ) )
# process line using several filter options (if specified in the test-file):
for fltName , flt in self . _filterTests :
flt , regexsUsedIdx = flt
regexList = flt . getFailRegex ( )
jsonTime + = jsonTimeLocal . microsecond / 1000000
try :
ret = flt . processLine ( line )
if not ret :
# Bypass if filter constraint specified:
if faildata . get ( ' filter ' ) and name != faildata . get ( ' filter ' ) :
continue
# Check line is flagged as none match
self . assertFalse ( faildata . get ( ' match ' , True ) ,
" Line not matched when should have " )
continue
self . assertEqual ( fail2banTime , jsonTime ,
" UTC Time mismatch %s ( %s ) != %s ( %s ) (diff %.3f seconds) " %
( fail2banTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( fail2banTime ) ) ,
jsonTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( jsonTime ) ) ,
fail2banTime - jsonTime ) )
failregex , fid , fail2banTime , fail = ret [ 0 ]
# Bypass no failure helpers-regexp:
if not faildata . get ( ' match ' , False ) and ( fid is None or fail . get ( ' nofail ' ) ) :
regexsUsedIdx . add ( failregex )
regexsUsedRe . add ( regexList [ failregex ] )
continue
regexsUsedIdx . add ( failregex )
regexsUsedRe . add ( regexList [ failregex ] )
except AssertionError as e : # pragma: no cover
raise AssertionError ( " %s on: %s : %i , line: \n %s " % (
e , logFile . filename ( ) , logFile . filelineno ( ) , line ) )
# Check line is flagged to match
self . assertTrue ( faildata . get ( ' match ' , False ) ,
" Line matched when shouldn ' t have " )
self . assertEqual ( len ( ret ) , 1 ,
" Multiple regexs matched %r " % ( map ( lambda x : x [ 0 ] , ret ) ) )
# Verify match captures (at least fid/host) and timestamp as expected
for k , v in faildata . iteritems ( ) :
if k not in ( " time " , " match " , " desc " , " filter " ) :
fv = fail . get ( k , None )
# Fallback for backwards compatibility (previously no fid, was host only):
if k == " host " and fv is None :
fv = fid
self . assertEqual ( fv , v )
t = faildata . get ( " time " , None )
try :
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S " )
except ValueError :
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S. %f " )
jsonTime = time . mktime ( jsonTimeLocal . timetuple ( ) )
jsonTime + = jsonTimeLocal . microsecond / 1000000
self . assertEqual ( fail2banTime , jsonTime ,
" UTC Time mismatch %s ( %s ) != %s ( %s ) (diff %.3f seconds) " %
( fail2banTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( fail2banTime ) ) ,
jsonTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( jsonTime ) ) ,
fail2banTime - jsonTime ) )
_testMissingSamples ( )
regexsUsedIdx . add ( failregex )
regexsUsedRe . add ( regexList [ failregex ] )
except AssertionError as e : # pragma: no cover
raise AssertionError ( " %s : %s on: %s : %i , line: \n %s " % (
fltName , e , logFile . filename ( ) , logFile . filelineno ( ) , line ) )
# check missing samples for regex using each filter-options combination:
for fltName , flt in self . _filters . iteritems ( ) :
flt , regexsUsedIdx = flt
regexList = flt . getFailRegex ( )
for failRegexIndex , failRegex in enumerate ( regexList ) :
self . assertTrue (
failRegexIndex in regexsUsedIdx or failRegex in regexsUsedRe ,
" %s : Regex has no samples: %i : %r " %
( fltName , failRegexIndex , failRegex ) )
return testFilter