py3.12: ignore smtpd based tests (if no smtpd module)

test-3.13.0-alpha.2
sebres 2023-12-11 21:45:34 +01:00
parent 70aef2c3c6
commit ef208e9149
1 changed files with 114 additions and 110 deletions

View File

@ -18,7 +18,6 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os import os
import smtpd
import threading import threading
import unittest import unittest
import re import re
@ -29,134 +28,139 @@ else:
import imp import imp
from ..dummyjail import DummyJail from ..dummyjail import DummyJail
from ..utils import CONFIG_DIR, asyncserver, Utils, uni_decode from ..utils import CONFIG_DIR, asyncserver, Utils, uni_decode
class TestSMTPServer(smtpd.SMTPServer): try:
import smtpd
def __init__(self, *args): class TestSMTPServer(smtpd.SMTPServer):
smtpd.SMTPServer.__init__(self, *args)
self.ready = False
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): def __init__(self, *args):
self.peer = peer smtpd.SMTPServer.__init__(self, *args)
self.mailfrom = mailfrom self.ready = False
self.rcpttos = rcpttos
self.org_data = data def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
# replace new line (with tab or space) for possible mime translations (word wrap), self.peer = peer
self.data = re.sub(r"\n[\t ]", " ", uni_decode(data)) self.mailfrom = mailfrom
self.ready = True self.rcpttos = rcpttos
self.org_data = data
# replace new line (with tab or space) for possible mime translations (word wrap),
self.data = re.sub(r"\n[\t ]", " ", uni_decode(data))
self.ready = True
class SMTPActionTest(unittest.TestCase): class SMTPActionTest(unittest.TestCase):
def setUp(self): def setUp(self):
"""Call before every test case.""" """Call before every test case."""
unittest.F2B.SkipIfCfgMissing(action='smtp.py') unittest.F2B.SkipIfCfgMissing(action='smtp.py')
super(SMTPActionTest, self).setUp() super(SMTPActionTest, self).setUp()
self.jail = DummyJail() self.jail = DummyJail()
pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py") pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")
pythonModuleName = os.path.basename(pythonModule.rstrip(".py")) pythonModuleName = os.path.basename(pythonModule.rstrip(".py"))
if sys.version_info >= (3, 3): if sys.version_info >= (3, 3):
customActionModule = importlib.machinery.SourceFileLoader( customActionModule = importlib.machinery.SourceFileLoader(
pythonModuleName, pythonModule).load_module() pythonModuleName, pythonModule).load_module()
else: else:
customActionModule = imp.load_source( customActionModule = imp.load_source(
pythonModuleName, pythonModule) pythonModuleName, pythonModule)
self.smtpd = TestSMTPServer(("localhost", 0), None) self.smtpd = TestSMTPServer(("localhost", 0), None)
port = self.smtpd.socket.getsockname()[1] port = self.smtpd.socket.getsockname()[1]
self.action = customActionModule.Action( self.action = customActionModule.Action(
self.jail, "test", host="localhost:%i" % port) self.jail, "test", host="localhost:%i" % port)
## because of bug in loop (see loop in asyncserver.py) use it's loop instead of asyncore.loop: ## because of bug in loop (see loop in asyncserver.py) use it's loop instead of asyncore.loop:
self._active = True self._active = True
self._loop_thread = threading.Thread( self._loop_thread = threading.Thread(
target=asyncserver.loop, kwargs={'active': lambda: self._active}) target=asyncserver.loop, kwargs={'active': lambda: self._active})
self._loop_thread.daemon = True self._loop_thread.daemon = True
self._loop_thread.start() self._loop_thread.start()
def tearDown(self): def tearDown(self):
"""Call after every test case.""" """Call after every test case."""
self.smtpd.close() self.smtpd.close()
self._active = False self._active = False
self._loop_thread.join() self._loop_thread.join()
super(SMTPActionTest, self).tearDown() super(SMTPActionTest, self).tearDown()
def _exec_and_wait(self, doaction, timeout=3, short=False): def _exec_and_wait(self, doaction, timeout=3, short=False):
if short: timeout /= 25 if short: timeout /= 25
self.smtpd.ready = False self.smtpd.ready = False
doaction() doaction()
Utils.wait_for(lambda: self.smtpd.ready, timeout) Utils.wait_for(lambda: self.smtpd.ready, timeout)
def testStart(self): def testStart(self):
self._exec_and_wait(self.action.start) self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban") self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"]) self.assertEqual(self.smtpd.rcpttos, ["root"])
self.assertTrue( self.assertTrue(
"Subject: [Fail2Ban] %s: started" % self.jail.name "Subject: [Fail2Ban] %s: started" % self.jail.name
in self.smtpd.data) in self.smtpd.data)
def testStop(self): def testStop(self):
self._exec_and_wait(self.action.stop) self._exec_and_wait(self.action.stop)
self.assertEqual(self.smtpd.mailfrom, "fail2ban") self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"]) self.assertEqual(self.smtpd.rcpttos, ["root"])
self.assertTrue( self.assertTrue(
"Subject: [Fail2Ban] %s: stopped" % "Subject: [Fail2Ban] %s: stopped" %
self.jail.name in self.smtpd.data) self.jail.name in self.smtpd.data)
def _testBan(self, restored=False): def _testBan(self, restored=False):
aInfo = { aInfo = {
'ip': "127.0.0.2", 'ip': "127.0.0.2",
'failures': 3, 'failures': 3,
'matches': "Test fail 1\n", 'matches': "Test fail 1\n",
'ipjailmatches': "Test fail 1\nTest Fail2\n", 'ipjailmatches': "Test fail 1\nTest Fail2\n",
'ipmatches': "Test fail 1\nTest Fail2\nTest Fail3\n", 'ipmatches': "Test fail 1\nTest Fail2\nTest Fail3\n",
} }
if restored: if restored:
aInfo['restored'] = 1 aInfo['restored'] = 1
self._exec_and_wait(lambda: self.action.ban(aInfo), short=restored) self._exec_and_wait(lambda: self.action.ban(aInfo), short=restored)
if restored: # no mail, should raises attribute error: if restored: # no mail, should raises attribute error:
self.assertRaises(AttributeError, lambda: self.smtpd.mailfrom) self.assertRaises(AttributeError, lambda: self.smtpd.mailfrom)
return return
self.assertEqual(self.smtpd.mailfrom, "fail2ban") self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"]) self.assertEqual(self.smtpd.rcpttos, ["root"])
subject = "Subject: [Fail2Ban] %s: banned %s" % ( subject = "Subject: [Fail2Ban] %s: banned %s" % (
self.jail.name, aInfo['ip']) self.jail.name, aInfo['ip'])
self.assertIn(subject, self.smtpd.data) self.assertIn(subject, self.smtpd.data)
self.assertIn( self.assertIn(
"%i attempts" % aInfo['failures'], self.smtpd.data) "%i attempts" % aInfo['failures'], self.smtpd.data)
self.action.matches = "matches" self.action.matches = "matches"
self._exec_and_wait(lambda: self.action.ban(aInfo)) self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['matches'], self.smtpd.data) self.assertIn(aInfo['matches'], self.smtpd.data)
self.action.matches = "ipjailmatches" self.action.matches = "ipjailmatches"
self._exec_and_wait(lambda: self.action.ban(aInfo)) self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['ipjailmatches'], self.smtpd.data) self.assertIn(aInfo['ipjailmatches'], self.smtpd.data)
self.action.matches = "ipmatches" self.action.matches = "ipmatches"
self._exec_and_wait(lambda: self.action.ban(aInfo)) self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['ipmatches'], self.smtpd.data) self.assertIn(aInfo['ipmatches'], self.smtpd.data)
def testBan(self): def testBan(self):
self._testBan() self._testBan()
def testNOPByRestored(self): def testNOPByRestored(self):
self._testBan(restored=True) self._testBan(restored=True)
def testOptions(self): def testOptions(self):
self._exec_and_wait(self.action.start) self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban") self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"]) self.assertEqual(self.smtpd.rcpttos, ["root"])
self.action.fromname = "Test" self.action.fromname = "Test"
self.action.fromaddr = "test@example.com" self.action.fromaddr = "test@example.com"
self.action.toaddr = "test@example.com, test2@example.com" self.action.toaddr = "test@example.com, test2@example.com"
self._exec_and_wait(self.action.start) self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "test@example.com") self.assertEqual(self.smtpd.mailfrom, "test@example.com")
self.assertTrue("From: %s <%s>" % self.assertTrue("From: %s <%s>" %
(self.action.fromname, self.action.fromaddr) in self.smtpd.data) (self.action.fromname, self.action.fromaddr) in self.smtpd.data)
self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"])) self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"]))
except ImportError as e:
print("I: Skipping smtp tests: %s" % e)