combine smtpd and aiosmtpd tests; encapsulate smtp facilities to setUpClass/tearDownClass (behaves like a singleton, doesn't start smtp server per test); don't generate cert every time (too slow by RSA:2048, use short ECC:256 instead);

drastically speedup all smtp-action tests
pull/3268/head
sebres 2023-12-30 21:27:35 +01:00
parent b161e55ca7
commit 0c2edfacb0
2 changed files with 201 additions and 200 deletions

View File

@ -68,6 +68,11 @@ jobs:
#python -m pip install pyasynchat || echo "can't install pyasynchat";
#python -m pip install pyasyncore || echo "can't install pyasyncore";
fi
# aiosmtpd in test_smtp (for 3.10+, no need to test it everywhere):
if dpkg --compare-versions "$F2B_PYV" ge 3.10; then
#sudo apt-get -y install python${F2B_PY/2/}-aiosmtpd || echo 'aiosmtpd not available'
python -m pip install aiosmtpd || echo 'aiosmtpd not available'
fi
- name: Before scripts
run: |

View File

@ -30,6 +30,92 @@ else:
from ..dummyjail import DummyJail
from ..utils import CONFIG_DIR, asyncserver, Utils, uni_decode
class _SMTPActionTestCase():
def _reset_smtpd(self):
for a in ('mailfrom', 'org_data', 'data'):
if hasattr(self.smtpd, a): delattr(self.smtpd, a)
self.ready = False
def _exec_and_wait(self, doaction, timeout=3, short=False):
if short: timeout /= 25
self.smtpd.ready = False
doaction()
Utils.wait_for(lambda: self.smtpd.ready, timeout)
def testStart(self):
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.action.ssl = False # ensure it works without TLS as a sanity check
self.assertTrue(
"Subject: [Fail2Ban] %s: started" % self.jail.name
in self.smtpd.data)
def testStop(self):
self._exec_and_wait(self.action.stop)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.assertTrue(
"Subject: [Fail2Ban] %s: stopped" %
self.jail.name in self.smtpd.data)
def _testBan(self, restored=False):
aInfo = {
'ip': "127.0.0.2",
'failures': 3,
'matches': "Test fail 1\n",
'ipjailmatches': "Test fail 1\nTest Fail2\n",
'ipmatches': "Test fail 1\nTest Fail2\nTest Fail3\n",
}
if restored:
aInfo['restored'] = 1
self._exec_and_wait(lambda: self.action.ban(aInfo), short=restored)
if restored: # no mail, should raises attribute error:
self.assertRaises(AttributeError, lambda: self.smtpd.mailfrom)
return
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
subject = "Subject: [Fail2Ban] %s: banned %s" % (
self.jail.name, aInfo['ip'])
self.assertIn(subject, self.smtpd.data)
self.assertIn(
"%i attempts" % aInfo['failures'], self.smtpd.data)
self.action.matches = "matches"
self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['matches'], self.smtpd.data)
self.action.matches = "ipjailmatches"
self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['ipjailmatches'], self.smtpd.data)
self.action.matches = "ipmatches"
self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['ipmatches'], self.smtpd.data)
def testBan(self):
self._testBan()
def testNOPByRestored(self):
self._testBan(restored=True)
def testOptions(self):
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.action.fromname = "Test"
self.action.fromaddr = "test@example.com"
self.action.toaddr = "test@example.com, test2@example.com"
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "test@example.com")
self.assertTrue("From: %s <%s>" %
(self.action.fromname, self.action.fromaddr) in self.smtpd.data)
self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"]))
try:
import smtpd
@ -49,7 +135,29 @@ try:
self.ready = True
class SMTPActionTest(unittest.TestCase):
class SMTPActionTest(unittest.TestCase, _SMTPActionTestCase):
def setUpClass():
"""Call before tests."""
unittest.F2B.SkipIfCfgMissing(action='smtp.py')
cls = SMTPActionTest
cls.smtpd = TestSMTPServer(("localhost", 0), None)
cls.port = cls.smtpd.socket.getsockname()[1]
## because of bug in loop (see loop in asyncserver.py) use it's loop instead of asyncore.loop:
cls._active = True
cls._loop_thread = threading.Thread(
target=asyncserver.loop, kwargs={'active': lambda: cls._active})
cls._loop_thread.daemon = True
cls._loop_thread.start()
def tearDownClass():
"""Call after tests."""
cls = SMTPActionTest
cls.smtpd.close()
cls._active = False
cls._loop_thread.join()
def setUp(self):
"""Call before every test case."""
@ -58,110 +166,17 @@ try:
self.jail = DummyJail()
pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")
pythonModuleName = os.path.basename(pythonModule.rstrip(".py"))
if sys.version_info >= (3, 3):
customActionModule = importlib.machinery.SourceFileLoader(
pythonModuleName, pythonModule).load_module()
else:
customActionModule = imp.load_source(
pythonModuleName, pythonModule)
self.smtpd = TestSMTPServer(("localhost", 0), None)
port = self.smtpd.socket.getsockname()[1]
customActionModule = importlib.machinery.SourceFileLoader(
pythonModuleName, pythonModule).load_module()
self.action = customActionModule.Action(
self.jail, "test", host="localhost:%i" % port)
## because of bug in loop (see loop in asyncserver.py) use it's loop instead of asyncore.loop:
self._active = True
self._loop_thread = threading.Thread(
target=asyncserver.loop, kwargs={'active': lambda: self._active})
self._loop_thread.daemon = True
self._loop_thread.start()
self.jail, "test", host="localhost:%i" % self.port)
def tearDown(self):
"""Call after every test case."""
self.smtpd.close()
self._active = False
self._loop_thread.join()
self._reset_smtpd()
super(SMTPActionTest, self).tearDown()
def _exec_and_wait(self, doaction, timeout=3, short=False):
if short: timeout /= 25
self.smtpd.ready = False
doaction()
Utils.wait_for(lambda: self.smtpd.ready, timeout)
def testStart(self):
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.assertTrue(
"Subject: [Fail2Ban] %s: started" % self.jail.name
in self.smtpd.data)
def testStop(self):
self._exec_and_wait(self.action.stop)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.assertTrue(
"Subject: [Fail2Ban] %s: stopped" %
self.jail.name in self.smtpd.data)
def _testBan(self, restored=False):
aInfo = {
'ip': "127.0.0.2",
'failures': 3,
'matches': "Test fail 1\n",
'ipjailmatches': "Test fail 1\nTest Fail2\n",
'ipmatches': "Test fail 1\nTest Fail2\nTest Fail3\n",
}
if restored:
aInfo['restored'] = 1
self._exec_and_wait(lambda: self.action.ban(aInfo), short=restored)
if restored: # no mail, should raises attribute error:
self.assertRaises(AttributeError, lambda: self.smtpd.mailfrom)
return
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
subject = "Subject: [Fail2Ban] %s: banned %s" % (
self.jail.name, aInfo['ip'])
self.assertIn(subject, self.smtpd.data)
self.assertIn(
"%i attempts" % aInfo['failures'], self.smtpd.data)
self.action.matches = "matches"
self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['matches'], self.smtpd.data)
self.action.matches = "ipjailmatches"
self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['ipjailmatches'], self.smtpd.data)
self.action.matches = "ipmatches"
self._exec_and_wait(lambda: self.action.ban(aInfo))
self.assertIn(aInfo['ipmatches'], self.smtpd.data)
def testBan(self):
self._testBan()
def testNOPByRestored(self):
self._testBan(restored=True)
def testOptions(self):
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.action.fromname = "Test"
self.action.fromaddr = "test@example.com"
self.action.toaddr = "test@example.com, test2@example.com"
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "test@example.com")
self.assertTrue("From: %s <%s>" %
(self.action.fromname, self.action.fromaddr) in self.smtpd.data)
self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"]))
except ImportError as e:
print("I: Skipping smtp tests: %s" % e)
@ -169,12 +184,10 @@ except ImportError as e:
try:
import asyncio
from aiosmtpd.controller import Controller
import os
import socket
import ssl
import tempfile
from OpenSSL import crypto
class TestSMTPHandler:
def __init__(self, *args):
self.ready = False
@ -183,7 +196,9 @@ try:
self.peer = session.peer
self.mailfrom = envelope.mail_from
self.rcpttos = envelope.rcpt_tos
self.data = envelope.content.decode()
self.org_data = envelope.content.decode()
# normalize CRLF -> LF:
self.data = re.sub(r"\r\n", "\n", uni_decode(self.org_data))
self.ready = True
return '250 OK'
@ -191,56 +206,91 @@ try:
print(error)
return '542 Internal server error'
class AIOSMTPActionTest(unittest.TestCase):
def create_temp_self_signed_cert(self):
class AIOSMTPActionTest(unittest.TestCase, _SMTPActionTestCase):
@classmethod
def create_temp_self_signed_cert(cls):
"""
https://aliceh75.github.io/testing-asyncio-with-ssl
Create a self signed SSL certificate in temporary files for host
'localhost'
Returns a tuple containing the certificate file name and the key
file name.
It is the caller's responsibility to delete the files after use
The cert (ECC:256, 100years) created with:
openssl req -x509 -out /tmp/f2b-localhost.crt -keyout /tmp/f2b-localhost.key -days 36500 -newkey ec:<(openssl ecparam -name prime256v1) -nodes -sha256 \
-subj '/CN=localhost' -extensions EXT -config <( \
printf "[dn]\nCN=localhost\n[req]\ndistinguished_name = dn\n[EXT]\nsubjectAltName=DNS:localhost\nkeyUsage=digitalSignature\nextendedKeyUsage=serverAuth" \
)
cat /tmp/f2b-localhost.*
rm /tmp/f2b-localhost.*
"""
# create a key pair
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
if hasattr(cls, 'crtfiles'): return cls.crtfiles
cls.crtfiles = crtfiles = (tempfile.mktemp(".crt", "f2b_cert_"), tempfile.mktemp(".key", "f2b_cert_"))
with open(crtfiles[0], 'w') as f:
f.write(
'-----BEGIN CERTIFICATE-----\n'
'MIIBhDCCASugAwIBAgIUCuW168kD3G7XrpFwGHwE6vGfoJkwCgYIKoZIzj0EAwIw\n'
'FDESMBAGA1UEAwwJbG9jYWxob3N0MCAXDTIzMTIzMDE3NDUzNFoYDzIxMjMxMjA2\n'
'MTc0NTM0WjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwWTATBgcqhkjOPQIBBggqhkjO\n'
'PQMBBwNCAARDa8BO/UE4axzvnOQ/pCc/ZTp351X1TqIfjEFaMoZOItz1/MW3ZCuS\n'
'2vuby3rMn0WZ59RWVotBqA6lcMVcgDq3o1kwVzAUBgNVHREEDTALgglsb2NhbGhv\n'
'c3QwCwYDVR0PBAQDAgeAMBMGA1UdJQQMMAoGCCsGAQUFBwMBMB0GA1UdDgQWBBS8\n'
'kH1Ucuq+wlex5DxxHDe1kKGdcjAKBggqhkjOPQQDAgNHADBEAiBmv05+BvXWMzLg\n'
'TtF4McoQNrU/0TTKhV8o+mgd+47tMAIgaaSNRnfjGIfJMbXg7Bh53qOIu5+lnm1b\n'
'ySygMgFmePs=\n'
'-----END CERTIFICATE-----\n'
)
with open(crtfiles[1], 'w') as f:
f.write(
'-----BEGIN PRIVATE KEY-----\n'
'MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgoBGcojKPZMYut7aP\n'
'JGe2GW+2lVV0zJpgCsZ7816a9uqhRANCAARDa8BO/UE4axzvnOQ/pCc/ZTp351X1\n'
'TqIfjEFaMoZOItz1/MW3ZCuS2vuby3rMn0WZ59RWVotBqA6lcMVcgDq3\n'
'-----END PRIVATE KEY-----\n'
)
# return file names
return crtfiles
# create a self-signed cert
cert = crypto.X509()
cert.get_subject().C = "UK"
cert.get_subject().ST = "London"
cert.get_subject().L = "London"
cert.get_subject().O = "myapp"
cert.get_subject().OU = "myapp"
cert.get_subject().CN = 'localhost'
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.sign(key, 'sha1')
@classmethod
def _del_cert(cls):
if hasattr(cls, 'crtfiles') and cls.crtfiles:
for f in cls.crtfiles:
try:
os.unlink(f)
except FileNotFoundError: pass
cls.crtfiles = None
# Save certificate in temporary file
(cert_file_fd, cert_file_name) = tempfile.mkstemp(suffix='.crt', prefix='cert')
cert_file = os.fdopen(cert_file_fd, 'wb')
cert_file.write(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
)
cert_file.close()
@staticmethod
def _free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', 0))
return s.getsockname()[1]
# Save key in temporary file
(key_file_fd, key_file_name) = tempfile.mkstemp(suffix='.key', prefix='cert')
key_file = os.fdopen(key_file_fd, 'wb')
key_file.write(
crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
)
key_file.close()
def setUpClass():
"""Call before tests."""
unittest.F2B.SkipIfCfgMissing(action='smtp.py')
# Return file names
return (cert_file_name, key_file_name)
cert_file, cert_key = AIOSMTPActionTest.create_temp_self_signed_cert()
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(cert_file, cert_key)
cls = AIOSMTPActionTest
cls.port = cls._free_port()
cls.smtpd = TestSMTPHandler()
cls.controller = Controller(cls.smtpd, hostname='localhost', server_hostname='localhost', port=cls.port,
server_kwargs={'tls_context': ssl_context, 'require_starttls': False})
# Run the event loop in a separate thread.
cls.controller.start()
def tearDownClass():
"""Call after tests."""
cls = AIOSMTPActionTest
cls.controller.stop()
cls._del_cert()
def setUp(self):
"""Call before every test case."""
unittest.F2B.SkipIfCfgMissing(action='smtp.py')
@ -248,72 +298,18 @@ try:
self.jail = DummyJail()
pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")
pythonModuleName = os.path.basename(pythonModule.rstrip(".py"))
if sys.version_info >= (3, 3):
customActionModule = importlib.machinery.SourceFileLoader(
pythonModuleName, pythonModule).load_module()
else:
customActionModule = imp.load_source(
pythonModuleName, pythonModule)
cert_file, cert_key = self.create_temp_self_signed_cert()
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(cert_file, cert_key)
port = 8025
self.smtpd = TestSMTPHandler()
self.controller = Controller(self.smtpd, hostname='localhost', server_hostname='localhost', port=port, server_kwargs={'tls_context': ssl_context, 'require_starttls': False})
# Run the event loop in a separate thread.
self.controller.start()
customActionModule = importlib.machinery.SourceFileLoader(
pythonModuleName, pythonModule).load_module()
self.action = customActionModule.Action(
self.jail, "test", host="localhost:%i" % port)
self.jail, "test", host="localhost:%i" % self.port)
## because of bug in loop (see loop in asyncserver.py) use it's loop instead of asyncore.loop:
self._active = True
self._loop_thread = threading.Thread(
target=asyncserver.loop, kwargs={'active': lambda: self._active})
self._loop_thread.daemon = True
self._loop_thread.start()
self.action.ssl = True
def tearDown(self):
"""Call after every test case."""
self.controller.stop()
self._active = False
self._loop_thread.join()
self._reset_smtpd()
super(AIOSMTPActionTest, self).tearDown()
def _exec_and_wait(self, doaction, timeout=3, short=False):
if short: timeout /= 25
self.smtpd.ready = False
doaction()
Utils.wait_for(lambda: self.smtpd.ready, timeout)
def testStart(self):
"""
Make sure aiosmtpd starts without TLS as a sanity check
"""
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.assertTrue(
"Subject: [Fail2Ban] %s: started" % self.jail.name
in self.smtpd.data)
def testOptionsTls(self):
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "fail2ban")
self.assertEqual(self.smtpd.rcpttos, ["root"])
self.action.fromname = "Test"
self.action.fromaddr = "test@example.com"
self.action.toaddr = "test@example.com, test2@example.com"
self.action.ssl = True # Important part
self._exec_and_wait(self.action.start)
self.assertEqual(self.smtpd.mailfrom, "test@example.com")
self.assertTrue("From: %s <%s>" %
(self.action.fromname, self.action.fromaddr) in self.smtpd.data)
self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"]))
except ImportError as e:
print("I: Skipping SSL smtp tests: %s" % e)
print("I: Skipping SSL smtp tests: %s" % e)