@ -34,7 +34,10 @@ import unittest
from . . server . failregex import Regex
from . . server . failregex import Regex
from . . server . filter import Filter
from . . server . filter import Filter
from . . client . filterreader import FilterReader
from . . client . filterreader import FilterReader
from . utils import setUpMyTime , tearDownMyTime , CONFIG_DIR
from . utils import setUpMyTime , tearDownMyTime , TEST_NOW , CONFIG_DIR
# test-time in UTC as string in isoformat (2005-08-14T10:00:00):
TEST_NOW_STR = datetime . datetime . utcfromtimestamp ( TEST_NOW ) . isoformat ( )
TEST_CONFIG_DIR = os . path . join ( os . path . dirname ( __file__ ) , " config " )
TEST_CONFIG_DIR = os . path . join ( os . path . dirname ( __file__ ) , " config " )
TEST_FILES_DIR = os . path . join ( os . path . dirname ( __file__ ) , " files " )
TEST_FILES_DIR = os . path . join ( os . path . dirname ( __file__ ) , " files " )
@ -173,7 +176,7 @@ def testSampleRegexsFactory(name, basedir):
fltName = name + fltName
fltName = name + fltName
# read it:
# read it:
flt = self . _readFilter ( fltName , name , basedir , opts = opts )
flt = self . _readFilter ( fltName , name , basedir , opts = opts )
self . _filterTests . append ( ( fltName , flt ))
self . _filterTests . append ( ( fltName , flt , opts ))
continue
continue
# addFILE - filename to "include" test-files should be additionally parsed:
# addFILE - filename to "include" test-files should be additionally parsed:
if jsonREMatch . group ( 1 ) == ' addFILE ' :
if jsonREMatch . group ( 1 ) == ' addFILE ' :
@ -194,17 +197,25 @@ def testSampleRegexsFactory(name, basedir):
if not self . _filterTests :
if not self . _filterTests :
fltName = name
fltName = name
flt = self . _readFilter ( fltName , name , basedir , opts = None )
flt = self . _readFilter ( fltName , name , basedir , opts = None )
self . _filterTests = [ ( fltName , flt )]
self . _filterTests = [ ( fltName , flt , { } )]
# process line using several filter options (if specified in the test-file):
# process line using several filter options (if specified in the test-file):
for fltName , flt in self . _filterTests :
for fltName , flt , opts in self . _filterTests :
flt , regexsUsedIdx = flt
flt , regexsUsedIdx = flt
regexList = flt . getFailRegex ( )
regexList = flt . getFailRegex ( )
failregex = - 1
failregex = - 1
try :
try :
fail = { }
fail = { }
ret = flt . processLine ( line )
# for logtype "journal" we don't need parse timestamp (simulate real systemd-backend handling):
checktime = True
if opts . get ( ' logtype ' ) != ' journal ' :
ret = flt . processLine ( line )
else : # simulate journal processing, time is known from journal (formatJournalEntry):
checktime = False
if opts . get ( ' test.prefix-line ' ) : # journal backends creates common prefix-line:
line = opts . get ( ' test.prefix-line ' ) + line
ret = flt . processLine ( ( ' ' , TEST_NOW_STR , line . rstrip ( ' \r \n ' ) ) , TEST_NOW )
if not ret :
if not ret :
# Bypass if filter constraint specified:
# Bypass if filter constraint specified:
if faildata . get ( ' filter ' ) and name != faildata . get ( ' filter ' ) :
if faildata . get ( ' filter ' ) and name != faildata . get ( ' filter ' ) :
@ -245,20 +256,18 @@ def testSampleRegexsFactory(name, basedir):
self . assertEqual ( fv , v )
self . assertEqual ( fv , v )
t = faildata . get ( " time " , None )
t = faildata . get ( " time " , None )
try :
if checktime or t is not None :
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S " )
try :
except ValueError :
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S " )
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S. %f " )
except ValueError :
jsonTimeLocal = datetime . datetime . strptime ( t , " % Y- % m- %d T % H: % M: % S. %f " )
jsonTime = time . mktime ( jsonTimeLocal . timetuple ( ) )
jsonTime = time . mktime ( jsonTimeLocal . timetuple ( ) )
jsonTime + = jsonTimeLocal . microsecond / 1000000
jsonTime + = jsonTimeLocal . microsecond / 1000000
self . assertEqual ( fail2banTime , jsonTime ,
" UTC Time mismatch %s ( %s ) != %s ( %s ) (diff %.3f seconds) " %
self . assertEqual ( fail2banTime , jsonTime ,
( fail2banTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( fail2banTime ) ) ,
" UTC Time mismatch %s ( %s ) != %s ( %s ) (diff %.3f seconds) " %
jsonTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( jsonTime ) ) ,
( fail2banTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( fail2banTime ) ) ,
fail2banTime - jsonTime ) )
jsonTime , time . strftime ( " % Y- % m- %d T % H: % M: % S " , time . gmtime ( jsonTime ) ) ,
fail2banTime - jsonTime ) )
regexsUsedIdx . add ( failregex )
regexsUsedIdx . add ( failregex )
regexsUsedRe . add ( regexList [ failregex ] )
regexsUsedRe . add ( regexList [ failregex ] )