mirror of https://github.com/fail2ban/fail2ban
Merge pull request #592 from kwirk/python-action-tests
TST+BF: Add tests for python actions, including test for smtp.pypull/593/head
commit
819df889d8
|
@ -98,8 +98,8 @@ class SMTPAction(ActionBase):
|
||||||
Email address to use for from address in email.
|
Email address to use for from address in email.
|
||||||
Default "fail2ban".
|
Default "fail2ban".
|
||||||
dest : str, optional
|
dest : str, optional
|
||||||
Email addresses of intended recipient(s) in comma delimited
|
Email addresses of intended recipient(s) in comma space ", "
|
||||||
format. Default "root".
|
delimited format. Default "root".
|
||||||
matches : str, optional
|
matches : str, optional
|
||||||
Type of matches to be included from ban in email. Can be one
|
Type of matches to be included from ban in email. Can be one
|
||||||
of "matches", "ipmatches" or "ipjailmatches". Default None
|
of "matches", "ipmatches" or "ipjailmatches". Default None
|
||||||
|
@ -159,7 +159,7 @@ class SMTPAction(ActionBase):
|
||||||
if self.user and self.password:
|
if self.user and self.password:
|
||||||
smtp.login(self.user, self.password)
|
smtp.login(self.user, self.password)
|
||||||
failed_recipients = smtp.sendmail(
|
failed_recipients = smtp.sendmail(
|
||||||
self.fromaddr, self.toaddr, msg.as_string())
|
self.fromaddr, self.toaddr.split(", "), msg.as_string())
|
||||||
except smtplib.SMTPConnectError:
|
except smtplib.SMTPConnectError:
|
||||||
self._logSys.error("Error connecting to host '%s'", self.host)
|
self._logSys.error("Error connecting to host '%s'", self.host)
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -98,7 +98,8 @@ class Actions(JailThread, Mapping):
|
||||||
if pythonModule is None:
|
if pythonModule is None:
|
||||||
action = CommandAction(self._jail, name)
|
action = CommandAction(self._jail, name)
|
||||||
else:
|
else:
|
||||||
pythonModuleName = os.path.basename(pythonModule.strip(".py"))
|
pythonModuleName = os.path.splitext(
|
||||||
|
os.path.basename(pythonModule))[0]
|
||||||
if sys.version_info >= (3, 3):
|
if sys.version_info >= (3, 3):
|
||||||
customActionModule = importlib.machinery.SourceFileLoader(
|
customActionModule = importlib.machinery.SourceFileLoader(
|
||||||
pythonModuleName, pythonModule).load_module()
|
pythonModuleName, pythonModule).load_module()
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
|
||||||
|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
|
||||||
|
|
||||||
|
# This file is part of Fail2Ban.
|
||||||
|
#
|
||||||
|
# Fail2Ban is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# Fail2Ban is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with Fail2Ban; if not, write to the Free Software
|
||||||
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
|
||||||
|
__author__ = "Steven Hiscocks"
|
||||||
|
__copyright__ = "Copyright (c) 2014 Steven Hiscocks"
|
||||||
|
__license__ = "GPL"
|
|
@ -0,0 +1,133 @@
|
||||||
|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
|
||||||
|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
|
||||||
|
|
||||||
|
# This file is part of Fail2Ban.
|
||||||
|
#
|
||||||
|
# Fail2Ban is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# Fail2Ban is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with Fail2Ban; if not, write to the Free Software
|
||||||
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
|
||||||
|
import os
|
||||||
|
import smtpd
|
||||||
|
import asyncore
|
||||||
|
import threading
|
||||||
|
import unittest
|
||||||
|
import sys
|
||||||
|
if sys.version_info >= (3, 3):
|
||||||
|
import importlib
|
||||||
|
else:
|
||||||
|
import imp
|
||||||
|
|
||||||
|
from ..dummyjail import DummyJail
|
||||||
|
|
||||||
|
if os.path.exists('config/fail2ban.conf'):
|
||||||
|
CONFIG_DIR = "config"
|
||||||
|
else:
|
||||||
|
CONFIG_DIR='/etc/fail2ban'
|
||||||
|
|
||||||
|
class TestSMTPServer(smtpd.SMTPServer):
|
||||||
|
|
||||||
|
def process_message(self, peer, mailfrom, rcpttos, data):
|
||||||
|
self.peer = peer
|
||||||
|
self.mailfrom = mailfrom
|
||||||
|
self.rcpttos = rcpttos
|
||||||
|
self.data = data
|
||||||
|
|
||||||
|
class SMTPActionTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Call before every test case."""
|
||||||
|
self.jail = DummyJail()
|
||||||
|
pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")
|
||||||
|
pythonModuleName = os.path.basename(pythonModule.rstrip(".py"))
|
||||||
|
if sys.version_info >= (3, 3):
|
||||||
|
customActionModule = importlib.machinery.SourceFileLoader(
|
||||||
|
pythonModuleName, pythonModule).load_module()
|
||||||
|
else:
|
||||||
|
customActionModule = imp.load_source(
|
||||||
|
pythonModuleName, pythonModule)
|
||||||
|
|
||||||
|
self.smtpd = TestSMTPServer(("localhost", 0), None)
|
||||||
|
port = self.smtpd.socket.getsockname()[1]
|
||||||
|
|
||||||
|
self.action = customActionModule.Action(
|
||||||
|
self.jail, "test", host="127.0.0.1:%i" % port)
|
||||||
|
|
||||||
|
self._loop_thread = threading.Thread(
|
||||||
|
target=asyncore.loop, kwargs={'timeout': 1})
|
||||||
|
self._loop_thread.start()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Call after every test case."""
|
||||||
|
self.smtpd.close()
|
||||||
|
self._loop_thread.join()
|
||||||
|
|
||||||
|
def testStart(self):
|
||||||
|
self.action.start()
|
||||||
|
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
|
||||||
|
self.assertEqual(self.smtpd.rcpttos, ["root"])
|
||||||
|
self.assertTrue(
|
||||||
|
"Subject: [Fail2Ban] %s: started" % self.jail.getName()
|
||||||
|
in self.smtpd.data)
|
||||||
|
|
||||||
|
def testStop(self):
|
||||||
|
self.action.stop()
|
||||||
|
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
|
||||||
|
self.assertEqual(self.smtpd.rcpttos, ["root"])
|
||||||
|
self.assertTrue(
|
||||||
|
"Subject: [Fail2Ban] %s: stopped" %
|
||||||
|
self.jail.getName() in self.smtpd.data)
|
||||||
|
|
||||||
|
def testBan(self):
|
||||||
|
aInfo = {
|
||||||
|
'ip': "127.0.0.2",
|
||||||
|
'failures': 3,
|
||||||
|
'matches': "Test fail 1\n",
|
||||||
|
'ipjailmatches': "Test fail 1\nTest Fail2\n",
|
||||||
|
'ipmatches': "Test fail 1\nTest Fail2\nTest Fail3\n",
|
||||||
|
}
|
||||||
|
|
||||||
|
self.action.ban(aInfo)
|
||||||
|
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
|
||||||
|
self.assertEqual(self.smtpd.rcpttos, ["root"])
|
||||||
|
self.assertTrue(
|
||||||
|
"Subject: [Fail2Ban] %s: banned %s" %
|
||||||
|
(self.jail.getName(), aInfo['ip']) in self.smtpd.data)
|
||||||
|
self.assertTrue(
|
||||||
|
"%i attempts" % aInfo['failures'] in self.smtpd.data)
|
||||||
|
|
||||||
|
self.action.matches = "matches"
|
||||||
|
self.action.ban(aInfo)
|
||||||
|
self.assertTrue(aInfo['matches'] in self.smtpd.data)
|
||||||
|
|
||||||
|
self.action.matches = "ipjailmatches"
|
||||||
|
self.action.ban(aInfo)
|
||||||
|
self.assertTrue(aInfo['ipjailmatches'] in self.smtpd.data)
|
||||||
|
|
||||||
|
self.action.matches = "ipmatches"
|
||||||
|
self.action.ban(aInfo)
|
||||||
|
self.assertTrue(aInfo['ipmatches'] in self.smtpd.data)
|
||||||
|
|
||||||
|
def testOptions(self):
|
||||||
|
self.action.start()
|
||||||
|
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
|
||||||
|
self.assertEqual(self.smtpd.rcpttos, ["root"])
|
||||||
|
|
||||||
|
self.action.fromname = "Test"
|
||||||
|
self.action.fromaddr = "test@example.com"
|
||||||
|
self.action.toaddr = "test@example.com, test2@example.com"
|
||||||
|
self.action.start()
|
||||||
|
self.assertEqual(self.smtpd.mailfrom, "test@example.com")
|
||||||
|
self.assertTrue("From: %s <%s>" %
|
||||||
|
(self.action.fromname, self.action.fromaddr) in self.smtpd.data)
|
||||||
|
self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"]))
|
|
@ -24,12 +24,15 @@ __license__ = "GPL"
|
||||||
|
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
|
||||||
|
from ..server.actions import Actions
|
||||||
|
|
||||||
class DummyJail(object):
|
class DummyJail(object):
|
||||||
"""A simple 'jail' to suck in all the tickets generated by Filter's
|
"""A simple 'jail' to suck in all the tickets generated by Filter's
|
||||||
"""
|
"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.lock = Lock()
|
self.lock = Lock()
|
||||||
self.queue = []
|
self.queue = []
|
||||||
|
self.actions = Actions(self)
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -202,6 +202,17 @@ def gatherTests(regexps=None, no_network=False):
|
||||||
# Filter Regex tests with sample logs
|
# Filter Regex tests with sample logs
|
||||||
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
|
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
|
||||||
|
|
||||||
|
#
|
||||||
|
# Python action testcases
|
||||||
|
#
|
||||||
|
testloader = unittest.TestLoader()
|
||||||
|
from . import action_d
|
||||||
|
for file_ in os.listdir(
|
||||||
|
os.path.abspath(os.path.dirname(action_d.__file__))):
|
||||||
|
if file_.startswith("test_") and file_.endswith(".py"):
|
||||||
|
tests.addTest(testloader.loadTestsFromName(
|
||||||
|
"%s.%s" % (action_d.__name__, os.path.splitext(file_)[0])))
|
||||||
|
|
||||||
#
|
#
|
||||||
# Extensive use-tests of different available filters backends
|
# Extensive use-tests of different available filters backends
|
||||||
#
|
#
|
||||||
|
|
Loading…
Reference in New Issue