|
|
|
@ -32,10 +32,8 @@ from ..client.configurator import Configurator
|
|
|
|
|
from .utils import LogCaptureTestCase
|
|
|
|
|
|
|
|
|
|
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
|
|
|
|
if os.path.exists(os.path.join('config','fail2ban.conf')):
|
|
|
|
|
CONFIG_DIR='config'
|
|
|
|
|
else:
|
|
|
|
|
CONFIG_DIR='/etc/fail2ban'
|
|
|
|
|
STOCK = os.path.exists(os.path.join('config','fail2ban.conf'))
|
|
|
|
|
CONFIG_DIR='config' if STOCK else '/etc/fail2ban'
|
|
|
|
|
|
|
|
|
|
IMPERFECT_CONFIG = os.path.join(os.path.dirname(__file__), 'config')
|
|
|
|
|
|
|
|
|
@ -185,14 +183,15 @@ class JailReaderTest(LogCaptureTestCase):
|
|
|
|
|
self.assertTrue(self._is_logged('Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0'))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def testStockSSHJail(self):
|
|
|
|
|
jail = JailReader('sshd', basedir=CONFIG_DIR) # we are running tests from root project dir atm
|
|
|
|
|
self.assertTrue(jail.read())
|
|
|
|
|
self.assertTrue(jail.getOptions())
|
|
|
|
|
self.assertFalse(jail.isEnabled())
|
|
|
|
|
self.assertEqual(jail.getName(), 'sshd')
|
|
|
|
|
jail.setName('ssh-funky-blocker')
|
|
|
|
|
self.assertEqual(jail.getName(), 'ssh-funky-blocker')
|
|
|
|
|
if STOCK:
|
|
|
|
|
def testStockSSHJail(self):
|
|
|
|
|
jail = JailReader('sshd', basedir=CONFIG_DIR) # we are running tests from root project dir atm
|
|
|
|
|
self.assertTrue(jail.read())
|
|
|
|
|
self.assertTrue(jail.getOptions())
|
|
|
|
|
self.assertFalse(jail.isEnabled())
|
|
|
|
|
self.assertEqual(jail.getName(), 'sshd')
|
|
|
|
|
jail.setName('ssh-funky-blocker')
|
|
|
|
|
self.assertEqual(jail.getName(), 'ssh-funky-blocker')
|
|
|
|
|
|
|
|
|
|
def testSplitOption(self):
|
|
|
|
|
# Simple example
|
|
|
|
@ -407,163 +406,163 @@ class JailsReaderTest(LogCaptureTestCase):
|
|
|
|
|
self.assertTrue(self._is_logged("Errors in jail 'missingbitsjail'. Skipping..."))
|
|
|
|
|
self.assertTrue(self._is_logged("No file(s) found for glob /weapons/of/mass/destruction"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def testReadStockJailConf(self):
|
|
|
|
|
jails = JailsReader(basedir=CONFIG_DIR) # we are running tests from root project dir atm
|
|
|
|
|
self.assertTrue(jails.read()) # opens fine
|
|
|
|
|
self.assertTrue(jails.getOptions()) # reads fine
|
|
|
|
|
comm_commands = jails.convert()
|
|
|
|
|
# by default None of the jails is enabled and we get no
|
|
|
|
|
# commands to communicate to the server
|
|
|
|
|
self.assertEqual(comm_commands, [])
|
|
|
|
|
|
|
|
|
|
# TODO: make sure this is handled well
|
|
|
|
|
## We should not "read" some bogus jail
|
|
|
|
|
#old_comm_commands = comm_commands[:] # make a copy
|
|
|
|
|
#self.assertRaises(ValueError, jails.getOptions, "BOGUS")
|
|
|
|
|
#self.printLog()
|
|
|
|
|
#self.assertTrue(self._is_logged("No section: 'BOGUS'"))
|
|
|
|
|
## and there should be no side-effects
|
|
|
|
|
#self.assertEqual(jails.convert(), old_comm_commands)
|
|
|
|
|
|
|
|
|
|
allFilters = set()
|
|
|
|
|
|
|
|
|
|
# All jails must have filter and action set
|
|
|
|
|
# TODO: evolve into a parametric test
|
|
|
|
|
for jail in jails.sections():
|
|
|
|
|
if jail == 'INCLUDES':
|
|
|
|
|
continue
|
|
|
|
|
filterName = jails.get(jail, 'filter')
|
|
|
|
|
allFilters.add(filterName)
|
|
|
|
|
self.assertTrue(len(filterName))
|
|
|
|
|
# moreover we must have a file for it
|
|
|
|
|
# and it must be readable as a Filter
|
|
|
|
|
filterReader = FilterReader(filterName, jail, {})
|
|
|
|
|
filterReader.setBaseDir(CONFIG_DIR)
|
|
|
|
|
self.assertTrue(filterReader.read(),"Failed to read filter:" + filterName) # opens fine
|
|
|
|
|
filterReader.getOptions({}) # reads fine
|
|
|
|
|
|
|
|
|
|
# test if filter has failregex set
|
|
|
|
|
self.assertTrue(filterReader._opts.get('failregex', '').strip())
|
|
|
|
|
|
|
|
|
|
actions = jails.get(jail, 'action')
|
|
|
|
|
self.assertTrue(len(actions.strip()))
|
|
|
|
|
|
|
|
|
|
# somewhat duplicating here what is done in JailsReader if
|
|
|
|
|
# the jail is enabled
|
|
|
|
|
for act in actions.split('\n'):
|
|
|
|
|
actName, actOpt = JailReader.extractOptions(act)
|
|
|
|
|
self.assertTrue(len(actName))
|
|
|
|
|
self.assertTrue(isinstance(actOpt, dict))
|
|
|
|
|
if actName == 'iptables-multiport':
|
|
|
|
|
self.assertTrue('port' in actOpt)
|
|
|
|
|
|
|
|
|
|
actionReader = ActionReader(
|
|
|
|
|
actName, jail, {}, basedir=CONFIG_DIR)
|
|
|
|
|
self.assertTrue(actionReader.read())
|
|
|
|
|
actionReader.getOptions({}) # populate _opts
|
|
|
|
|
cmds = actionReader.convert()
|
|
|
|
|
self.assertTrue(len(cmds))
|
|
|
|
|
|
|
|
|
|
# all must have some actionban
|
|
|
|
|
self.assertTrue(actionReader._opts.get('actionban', '').strip())
|
|
|
|
|
|
|
|
|
|
# Verify that all filters found under config/ have a jail
|
|
|
|
|
def testReadStockJailFilterComplete(self):
|
|
|
|
|
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True)
|
|
|
|
|
self.assertTrue(jails.read()) # opens fine
|
|
|
|
|
self.assertTrue(jails.getOptions()) # reads fine
|
|
|
|
|
# grab all filter names
|
|
|
|
|
filters = set(os.path.splitext(os.path.split(a)[1])[0]
|
|
|
|
|
for a in glob.glob(os.path.join('config', 'filter.d', '*.conf'))
|
|
|
|
|
if not a.endswith('common.conf'))
|
|
|
|
|
filters_jail = set(jail.options['filter'] for jail in jails.jails)
|
|
|
|
|
self.maxDiff = None
|
|
|
|
|
self.assertTrue(filters.issubset(filters_jail),
|
|
|
|
|
"More filters exists than are referenced in stock jail.conf %r" % filters.difference(filters_jail))
|
|
|
|
|
self.assertTrue(filters_jail.issubset(filters),
|
|
|
|
|
"Stock jail.conf references non-existent filters %r" % filters_jail.difference(filters))
|
|
|
|
|
|
|
|
|
|
def testReadStockJailConfForceEnabled(self):
|
|
|
|
|
# more of a smoke test to make sure that no obvious surprises
|
|
|
|
|
# on users' systems when enabling shipped jails
|
|
|
|
|
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True) # we are running tests from root project dir atm
|
|
|
|
|
self.assertTrue(jails.read()) # opens fine
|
|
|
|
|
self.assertTrue(jails.getOptions()) # reads fine
|
|
|
|
|
comm_commands = jails.convert(allow_no_files=True)
|
|
|
|
|
|
|
|
|
|
# by default we have lots of jails ;)
|
|
|
|
|
self.assertTrue(len(comm_commands))
|
|
|
|
|
|
|
|
|
|
# and we know even some of them by heart
|
|
|
|
|
for j in ['sshd', 'recidive']:
|
|
|
|
|
# by default we have 'auto' backend ATM
|
|
|
|
|
self.assertTrue(['add', j, 'auto'] in comm_commands)
|
|
|
|
|
# and warn on useDNS
|
|
|
|
|
self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands)
|
|
|
|
|
self.assertTrue(['start', j] in comm_commands)
|
|
|
|
|
|
|
|
|
|
# last commands should be the 'start' commands
|
|
|
|
|
self.assertEqual(comm_commands[-1][0], 'start')
|
|
|
|
|
|
|
|
|
|
for j in jails._JailsReader__jails:
|
|
|
|
|
actions = j._JailReader__actions
|
|
|
|
|
jail_name = j.getName()
|
|
|
|
|
# make sure that all of the jails have actions assigned,
|
|
|
|
|
# otherwise it makes little to no sense
|
|
|
|
|
self.assertTrue(len(actions),
|
|
|
|
|
msg="No actions found for jail %s" % jail_name)
|
|
|
|
|
|
|
|
|
|
# Test for presence of blocktype (in relation to gh-232)
|
|
|
|
|
for action in actions:
|
|
|
|
|
commands = action.convert()
|
|
|
|
|
action_name = action.getName()
|
|
|
|
|
if '<blocktype>' in str(commands):
|
|
|
|
|
# Verify that it is among cInfo
|
|
|
|
|
self.assertTrue('blocktype' in action._initOpts)
|
|
|
|
|
# Verify that we have a call to set it up
|
|
|
|
|
blocktype_present = False
|
|
|
|
|
target_command = ['set', jail_name, 'action', action_name, 'blocktype']
|
|
|
|
|
for command in commands:
|
|
|
|
|
if (len(command) > 5 and
|
|
|
|
|
command[:5] == target_command):
|
|
|
|
|
blocktype_present = True
|
|
|
|
|
continue
|
|
|
|
|
self.assertTrue(
|
|
|
|
|
blocktype_present,
|
|
|
|
|
msg="Found no %s command among %s"
|
|
|
|
|
% (target_command, str(commands)) )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def testConfigurator(self):
|
|
|
|
|
configurator = Configurator()
|
|
|
|
|
configurator.setBaseDir(CONFIG_DIR)
|
|
|
|
|
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
|
|
|
|
|
|
|
|
|
|
configurator.readEarly()
|
|
|
|
|
opts = configurator.getEarlyOptions()
|
|
|
|
|
# our current default settings
|
|
|
|
|
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
|
|
|
|
|
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
|
|
|
|
|
|
|
|
|
|
configurator.getOptions()
|
|
|
|
|
configurator.convertToProtocol()
|
|
|
|
|
commands = configurator.getConfigStream()
|
|
|
|
|
# and there is logging information left to be passed into the
|
|
|
|
|
# server
|
|
|
|
|
self.assertEqual(sorted(commands),
|
|
|
|
|
[['set', 'dbfile',
|
|
|
|
|
'/var/lib/fail2ban/fail2ban.sqlite3'],
|
|
|
|
|
['set', 'dbpurgeage', 86400],
|
|
|
|
|
['set', 'loglevel', "INFO"],
|
|
|
|
|
['set', 'logtarget', '/var/log/fail2ban.log']])
|
|
|
|
|
|
|
|
|
|
# and if we force change configurator's fail2ban's baseDir
|
|
|
|
|
# there should be an error message (test visually ;) --
|
|
|
|
|
# otherwise just a code smoke test)
|
|
|
|
|
configurator._Configurator__jails.setBaseDir('/tmp')
|
|
|
|
|
self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp')
|
|
|
|
|
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
|
|
|
|
|
if STOCK:
|
|
|
|
|
def testReadStockJailConf(self):
|
|
|
|
|
jails = JailsReader(basedir=CONFIG_DIR) # we are running tests from root project dir atm
|
|
|
|
|
self.assertTrue(jails.read()) # opens fine
|
|
|
|
|
self.assertTrue(jails.getOptions()) # reads fine
|
|
|
|
|
comm_commands = jails.convert()
|
|
|
|
|
# by default None of the jails is enabled and we get no
|
|
|
|
|
# commands to communicate to the server
|
|
|
|
|
self.assertEqual(comm_commands, [])
|
|
|
|
|
|
|
|
|
|
# TODO: make sure this is handled well
|
|
|
|
|
## We should not "read" some bogus jail
|
|
|
|
|
#old_comm_commands = comm_commands[:] # make a copy
|
|
|
|
|
#self.assertRaises(ValueError, jails.getOptions, "BOGUS")
|
|
|
|
|
#self.printLog()
|
|
|
|
|
#self.assertTrue(self._is_logged("No section: 'BOGUS'"))
|
|
|
|
|
## and there should be no side-effects
|
|
|
|
|
#self.assertEqual(jails.convert(), old_comm_commands)
|
|
|
|
|
|
|
|
|
|
allFilters = set()
|
|
|
|
|
|
|
|
|
|
# All jails must have filter and action set
|
|
|
|
|
# TODO: evolve into a parametric test
|
|
|
|
|
for jail in jails.sections():
|
|
|
|
|
if jail == 'INCLUDES':
|
|
|
|
|
continue
|
|
|
|
|
filterName = jails.get(jail, 'filter')
|
|
|
|
|
allFilters.add(filterName)
|
|
|
|
|
self.assertTrue(len(filterName))
|
|
|
|
|
# moreover we must have a file for it
|
|
|
|
|
# and it must be readable as a Filter
|
|
|
|
|
filterReader = FilterReader(filterName, jail, {})
|
|
|
|
|
filterReader.setBaseDir(CONFIG_DIR)
|
|
|
|
|
self.assertTrue(filterReader.read(),"Failed to read filter:" + filterName) # opens fine
|
|
|
|
|
filterReader.getOptions({}) # reads fine
|
|
|
|
|
|
|
|
|
|
# test if filter has failregex set
|
|
|
|
|
self.assertTrue(filterReader._opts.get('failregex', '').strip())
|
|
|
|
|
|
|
|
|
|
actions = jails.get(jail, 'action')
|
|
|
|
|
self.assertTrue(len(actions.strip()))
|
|
|
|
|
|
|
|
|
|
# somewhat duplicating here what is done in JailsReader if
|
|
|
|
|
# the jail is enabled
|
|
|
|
|
for act in actions.split('\n'):
|
|
|
|
|
actName, actOpt = JailReader.extractOptions(act)
|
|
|
|
|
self.assertTrue(len(actName))
|
|
|
|
|
self.assertTrue(isinstance(actOpt, dict))
|
|
|
|
|
if actName == 'iptables-multiport':
|
|
|
|
|
self.assertTrue('port' in actOpt)
|
|
|
|
|
|
|
|
|
|
actionReader = ActionReader(
|
|
|
|
|
actName, jail, {}, basedir=CONFIG_DIR)
|
|
|
|
|
self.assertTrue(actionReader.read())
|
|
|
|
|
actionReader.getOptions({}) # populate _opts
|
|
|
|
|
cmds = actionReader.convert()
|
|
|
|
|
self.assertTrue(len(cmds))
|
|
|
|
|
|
|
|
|
|
# all must have some actionban
|
|
|
|
|
self.assertTrue(actionReader._opts.get('actionban', '').strip())
|
|
|
|
|
|
|
|
|
|
# Verify that all filters found under config/ have a jail
|
|
|
|
|
def testReadStockJailFilterComplete(self):
|
|
|
|
|
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True)
|
|
|
|
|
self.assertTrue(jails.read()) # opens fine
|
|
|
|
|
self.assertTrue(jails.getOptions()) # reads fine
|
|
|
|
|
# grab all filter names
|
|
|
|
|
filters = set(os.path.splitext(os.path.split(a)[1])[0]
|
|
|
|
|
for a in glob.glob(os.path.join('config', 'filter.d', '*.conf'))
|
|
|
|
|
if not a.endswith('common.conf'))
|
|
|
|
|
filters_jail = set(jail.options['filter'] for jail in jails.jails)
|
|
|
|
|
self.maxDiff = None
|
|
|
|
|
self.assertTrue(filters.issubset(filters_jail),
|
|
|
|
|
"More filters exists than are referenced in stock jail.conf %r" % filters.difference(filters_jail))
|
|
|
|
|
self.assertTrue(filters_jail.issubset(filters),
|
|
|
|
|
"Stock jail.conf references non-existent filters %r" % filters_jail.difference(filters))
|
|
|
|
|
|
|
|
|
|
def testReadStockJailConfForceEnabled(self):
|
|
|
|
|
# more of a smoke test to make sure that no obvious surprises
|
|
|
|
|
# on users' systems when enabling shipped jails
|
|
|
|
|
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True) # we are running tests from root project dir atm
|
|
|
|
|
self.assertTrue(jails.read()) # opens fine
|
|
|
|
|
self.assertTrue(jails.getOptions()) # reads fine
|
|
|
|
|
comm_commands = jails.convert(allow_no_files=True)
|
|
|
|
|
|
|
|
|
|
# by default we have lots of jails ;)
|
|
|
|
|
self.assertTrue(len(comm_commands))
|
|
|
|
|
|
|
|
|
|
# and we know even some of them by heart
|
|
|
|
|
for j in ['sshd', 'recidive']:
|
|
|
|
|
# by default we have 'auto' backend ATM
|
|
|
|
|
self.assertTrue(['add', j, 'auto'] in comm_commands)
|
|
|
|
|
# and warn on useDNS
|
|
|
|
|
self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands)
|
|
|
|
|
self.assertTrue(['start', j] in comm_commands)
|
|
|
|
|
|
|
|
|
|
# last commands should be the 'start' commands
|
|
|
|
|
self.assertEqual(comm_commands[-1][0], 'start')
|
|
|
|
|
|
|
|
|
|
for j in jails._JailsReader__jails:
|
|
|
|
|
actions = j._JailReader__actions
|
|
|
|
|
jail_name = j.getName()
|
|
|
|
|
# make sure that all of the jails have actions assigned,
|
|
|
|
|
# otherwise it makes little to no sense
|
|
|
|
|
self.assertTrue(len(actions),
|
|
|
|
|
msg="No actions found for jail %s" % jail_name)
|
|
|
|
|
|
|
|
|
|
# Test for presence of blocktype (in relation to gh-232)
|
|
|
|
|
for action in actions:
|
|
|
|
|
commands = action.convert()
|
|
|
|
|
action_name = action.getName()
|
|
|
|
|
if '<blocktype>' in str(commands):
|
|
|
|
|
# Verify that it is among cInfo
|
|
|
|
|
self.assertTrue('blocktype' in action._initOpts)
|
|
|
|
|
# Verify that we have a call to set it up
|
|
|
|
|
blocktype_present = False
|
|
|
|
|
target_command = ['set', jail_name, 'action', action_name, 'blocktype']
|
|
|
|
|
for command in commands:
|
|
|
|
|
if (len(command) > 5 and
|
|
|
|
|
command[:5] == target_command):
|
|
|
|
|
blocktype_present = True
|
|
|
|
|
continue
|
|
|
|
|
self.assertTrue(
|
|
|
|
|
blocktype_present,
|
|
|
|
|
msg="Found no %s command among %s"
|
|
|
|
|
% (target_command, str(commands)) )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def testStockConfigurator(self):
|
|
|
|
|
configurator = Configurator()
|
|
|
|
|
configurator.setBaseDir(CONFIG_DIR)
|
|
|
|
|
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
|
|
|
|
|
|
|
|
|
|
configurator.readEarly()
|
|
|
|
|
opts = configurator.getEarlyOptions()
|
|
|
|
|
# our current default settings
|
|
|
|
|
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
|
|
|
|
|
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
|
|
|
|
|
|
|
|
|
|
configurator.getOptions()
|
|
|
|
|
configurator.convertToProtocol()
|
|
|
|
|
commands = configurator.getConfigStream()
|
|
|
|
|
# and there is logging information left to be passed into the
|
|
|
|
|
# server
|
|
|
|
|
self.assertEqual(sorted(commands),
|
|
|
|
|
[['set', 'dbfile',
|
|
|
|
|
'/var/lib/fail2ban/fail2ban.sqlite3'],
|
|
|
|
|
['set', 'dbpurgeage', 86400],
|
|
|
|
|
['set', 'loglevel', "INFO"],
|
|
|
|
|
['set', 'logtarget', '/var/log/fail2ban.log']])
|
|
|
|
|
|
|
|
|
|
# and if we force change configurator's fail2ban's baseDir
|
|
|
|
|
# there should be an error message (test visually ;) --
|
|
|
|
|
# otherwise just a code smoke test)
|
|
|
|
|
configurator._Configurator__jails.setBaseDir('/tmp')
|
|
|
|
|
self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp')
|
|
|
|
|
self.assertEqual(configurator.getBaseDir(), CONFIG_DIR)
|
|
|
|
|
|
|
|
|
|
def testMultipleSameAction(self):
|
|
|
|
|
basedir = tempfile.mkdtemp("fail2ban_conf")
|
|
|
|
|