@ -998,6 +998,38 @@ class ServerConfigReaderTests(LogCaptureTestCase):
self . assertTrue ( IPAddr ( ' 192.0.2.1 ' ) . isIPv4 )
self . assertTrue ( IPAddr ( ' 2001:DB8:: ' ) . isIPv6 )
def _testExecActions ( self , server ) :
jails = server . _Server__jails
for jail in jails :
# print(jail, jails[jail])
for a in jails [ jail ] . actions :
action = jails [ jail ] . actions [ a ]
logSys . debug ( ' # ' + ( ' = ' * 50 ) )
logSys . debug ( ' # == %-44s == ' , jail + ' - ' + action . _name )
logSys . debug ( ' # ' + ( ' = ' * 50 ) )
# we can currently test only command actions:
if not isinstance ( action , _actions . CommandAction ) : continue
# wrap default command processor, just log if (heavy)debug:
action . executeCmd = self . _executeCmd
# test start :
logSys . debug ( ' # === start === ' ) ; self . pruneLog ( )
action . start ( )
# test ban ip4 :
logSys . debug ( ' # === ban-ipv4 === ' ) ; self . pruneLog ( )
action . ban ( { ' ip ' : IPAddr ( ' 192.0.2.1 ' ) } )
# test unban ip4 :
logSys . debug ( ' # === unban ipv4 === ' ) ; self . pruneLog ( )
action . unban ( { ' ip ' : IPAddr ( ' 192.0.2.1 ' ) } )
# test ban ip6 :
logSys . debug ( ' # === ban ipv6 === ' ) ; self . pruneLog ( )
action . ban ( { ' ip ' : IPAddr ( ' 2001:DB8:: ' ) } )
# test unban ip6 :
logSys . debug ( ' # === unban ipv6 === ' ) ; self . pruneLog ( )
action . unban ( { ' ip ' : IPAddr ( ' 2001:DB8:: ' ) } )
# test stop :
logSys . debug ( ' # === stop === ' ) ; self . pruneLog ( )
action . stop ( )
if STOCK :
def testCheckStockJailActions ( self ) :
@ -1046,6 +1078,10 @@ class ServerConfigReaderTests(LogCaptureTestCase):
# for j in jails:
# print(j, jails[j])
# test default stock actions sepecified in all stock jails:
if not unittest . F2B . fast :
self . _testExecActions ( server )
def getDefaultJailStream ( self , jail , act ) :
act = act . replace ( ' %(__name__)s ' , jail )
actName , actOpt = JailReader . extractOptions ( act )
@ -1061,6 +1097,25 @@ class ServerConfigReaderTests(LogCaptureTestCase):
stream . extend ( action . convert ( ) )
return stream
def testCheckStockAllActions ( self ) :
unittest . F2B . SkipIfFast ( )
import glob
server = TestServer ( )
transm = server . _Server__transm
for actCfg in glob . glob ( os . path . join ( CONFIG_DIR , ' action.d ' , ' *.conf ' ) ) :
act = os . path . basename ( actCfg ) . replace ( ' .conf ' , ' ' )
# transmit artifical jail with each action to the server:
stream = self . getDefaultJailStream ( ' j- ' + act , act )
for cmd in stream :
# command to server:
ret , res = transm . proceed ( cmd )
self . assertEqual ( ret , 0 )
# test executing action commands:
self . _testExecActions ( server )
def testCheckStockCommandActions ( self ) :
# test cases to check valid ipv4/ipv6 action definition, tuple with (('jail', 'action[params]', 'tests', ...)
# where tests is a dictionary contains:
@ -1347,7 +1402,7 @@ class ServerConfigReaderTests(LogCaptureTestCase):
# for cmd in stream:
# print(cmd)
# filter all start commands (we want not start all jails) :
# transmit jail to the server :
for cmd in stream :
# command to server:
ret , res = transm . proceed ( cmd )