diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index f8be9dbf..edd88f27 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
- python-version: [3.6, 3.7, 3.8, 3.9, '3.10', '3.11', pypy3.10]
+ python-version: [3.6, 3.7, 3.8, 3.9, '3.10', '3.11', '3.12', pypy3.10]
fail-fast: false
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
@@ -59,6 +59,13 @@ jobs:
python -m pip install systemd-python || echo 'systemd not available'
#readline if available as module:
python -c 'import readline' 2> /dev/null || python -m pip install readline || echo 'readline not available'
+ # asyncore/asynchat:
+ if dpkg --compare-versions "$F2B_PYV" ge 3.12; then
+ #sudo apt-get -y install python${F2B_PY/2/}-setuptools || echo 'setuptools not unavailable'
+ python -m pip install setuptools || echo "can't install setuptools"
+ python -m pip install pyasynchat || echo "can't install pyasynchat";
+ python -m pip install pyasyncore || echo "can't install pyasyncore";
+ fi
- name: Before scripts
run: |
diff --git a/fail2ban/helpers.py b/fail2ban/helpers.py
index c2c22a27..b864fd15 100644
--- a/fail2ban/helpers.py
+++ b/fail2ban/helpers.py
@@ -284,7 +284,7 @@ def splitwords(s):
"""
if not s:
return []
- return list(filter(bool, [v.strip() for v in re.split('[\s,]+', s)]))
+ return list(filter(bool, [v.strip() for v in re.split(r'[\s,]+', s)]))
def _merge_dicts(x, y):
"""Helper to merge dicts.
diff --git a/fail2ban/tests/action_d/test_smtp.py b/fail2ban/tests/action_d/test_smtp.py
index 8d20055a..5802ddea 100644
--- a/fail2ban/tests/action_d/test_smtp.py
+++ b/fail2ban/tests/action_d/test_smtp.py
@@ -18,7 +18,6 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os
-import smtpd
import threading
import unittest
import re
@@ -29,134 +28,139 @@ else:
import imp
from ..dummyjail import DummyJail
-
from ..utils import CONFIG_DIR, asyncserver, Utils, uni_decode
-class TestSMTPServer(smtpd.SMTPServer):
+try:
+ import smtpd
- def __init__(self, *args):
- smtpd.SMTPServer.__init__(self, *args)
- self.ready = False
+ class TestSMTPServer(smtpd.SMTPServer):
- def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
- self.peer = peer
- self.mailfrom = mailfrom
- self.rcpttos = rcpttos
- self.org_data = data
- # replace new line (with tab or space) for possible mime translations (word wrap),
- self.data = re.sub(r"\n[\t ]", " ", uni_decode(data))
- self.ready = True
+ def __init__(self, *args):
+ smtpd.SMTPServer.__init__(self, *args)
+ self.ready = False
+
+ def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
+ self.peer = peer
+ self.mailfrom = mailfrom
+ self.rcpttos = rcpttos
+ self.org_data = data
+ # replace new line (with tab or space) for possible mime translations (word wrap),
+ self.data = re.sub(r"\n[\t ]", " ", uni_decode(data))
+ self.ready = True
-class SMTPActionTest(unittest.TestCase):
+ class SMTPActionTest(unittest.TestCase):
- def setUp(self):
- """Call before every test case."""
- unittest.F2B.SkipIfCfgMissing(action='smtp.py')
- super(SMTPActionTest, self).setUp()
- self.jail = DummyJail()
- pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")
- pythonModuleName = os.path.basename(pythonModule.rstrip(".py"))
- if sys.version_info >= (3, 3):
- customActionModule = importlib.machinery.SourceFileLoader(
- pythonModuleName, pythonModule).load_module()
- else:
- customActionModule = imp.load_source(
- pythonModuleName, pythonModule)
+ def setUp(self):
+ """Call before every test case."""
+ unittest.F2B.SkipIfCfgMissing(action='smtp.py')
+ super(SMTPActionTest, self).setUp()
+ self.jail = DummyJail()
+ pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")
+ pythonModuleName = os.path.basename(pythonModule.rstrip(".py"))
+ if sys.version_info >= (3, 3):
+ customActionModule = importlib.machinery.SourceFileLoader(
+ pythonModuleName, pythonModule).load_module()
+ else:
+ customActionModule = imp.load_source(
+ pythonModuleName, pythonModule)
- self.smtpd = TestSMTPServer(("localhost", 0), None)
- port = self.smtpd.socket.getsockname()[1]
+ self.smtpd = TestSMTPServer(("localhost", 0), None)
+ port = self.smtpd.socket.getsockname()[1]
- self.action = customActionModule.Action(
- self.jail, "test", host="localhost:%i" % port)
+ self.action = customActionModule.Action(
+ self.jail, "test", host="localhost:%i" % port)
- ## because of bug in loop (see loop in asyncserver.py) use it's loop instead of asyncore.loop:
- self._active = True
- self._loop_thread = threading.Thread(
- target=asyncserver.loop, kwargs={'active': lambda: self._active})
- self._loop_thread.daemon = True
- self._loop_thread.start()
+ ## because of bug in loop (see loop in asyncserver.py) use it's loop instead of asyncore.loop:
+ self._active = True
+ self._loop_thread = threading.Thread(
+ target=asyncserver.loop, kwargs={'active': lambda: self._active})
+ self._loop_thread.daemon = True
+ self._loop_thread.start()
- def tearDown(self):
- """Call after every test case."""
- self.smtpd.close()
- self._active = False
- self._loop_thread.join()
- super(SMTPActionTest, self).tearDown()
+ def tearDown(self):
+ """Call after every test case."""
+ self.smtpd.close()
+ self._active = False
+ self._loop_thread.join()
+ super(SMTPActionTest, self).tearDown()
- def _exec_and_wait(self, doaction, timeout=3, short=False):
- if short: timeout /= 25
- self.smtpd.ready = False
- doaction()
- Utils.wait_for(lambda: self.smtpd.ready, timeout)
+ def _exec_and_wait(self, doaction, timeout=3, short=False):
+ if short: timeout /= 25
+ self.smtpd.ready = False
+ doaction()
+ Utils.wait_for(lambda: self.smtpd.ready, timeout)
- def testStart(self):
- self._exec_and_wait(self.action.start)
- self.assertEqual(self.smtpd.mailfrom, "fail2ban")
- self.assertEqual(self.smtpd.rcpttos, ["root"])
- self.assertTrue(
- "Subject: [Fail2Ban] %s: started" % self.jail.name
- in self.smtpd.data)
+ def testStart(self):
+ self._exec_and_wait(self.action.start)
+ self.assertEqual(self.smtpd.mailfrom, "fail2ban")
+ self.assertEqual(self.smtpd.rcpttos, ["root"])
+ self.assertTrue(
+ "Subject: [Fail2Ban] %s: started" % self.jail.name
+ in self.smtpd.data)
- def testStop(self):
- self._exec_and_wait(self.action.stop)
- self.assertEqual(self.smtpd.mailfrom, "fail2ban")
- self.assertEqual(self.smtpd.rcpttos, ["root"])
- self.assertTrue(
- "Subject: [Fail2Ban] %s: stopped" %
- self.jail.name in self.smtpd.data)
+ def testStop(self):
+ self._exec_and_wait(self.action.stop)
+ self.assertEqual(self.smtpd.mailfrom, "fail2ban")
+ self.assertEqual(self.smtpd.rcpttos, ["root"])
+ self.assertTrue(
+ "Subject: [Fail2Ban] %s: stopped" %
+ self.jail.name in self.smtpd.data)
- def _testBan(self, restored=False):
- aInfo = {
- 'ip': "127.0.0.2",
- 'failures': 3,
- 'matches': "Test fail 1\n",
- 'ipjailmatches': "Test fail 1\nTest Fail2\n",
- 'ipmatches': "Test fail 1\nTest Fail2\nTest Fail3\n",
- }
- if restored:
- aInfo['restored'] = 1
+ def _testBan(self, restored=False):
+ aInfo = {
+ 'ip': "127.0.0.2",
+ 'failures': 3,
+ 'matches': "Test fail 1\n",
+ 'ipjailmatches': "Test fail 1\nTest Fail2\n",
+ 'ipmatches': "Test fail 1\nTest Fail2\nTest Fail3\n",
+ }
+ if restored:
+ aInfo['restored'] = 1
- self._exec_and_wait(lambda: self.action.ban(aInfo), short=restored)
- if restored: # no mail, should raises attribute error:
- self.assertRaises(AttributeError, lambda: self.smtpd.mailfrom)
- return
- self.assertEqual(self.smtpd.mailfrom, "fail2ban")
- self.assertEqual(self.smtpd.rcpttos, ["root"])
- subject = "Subject: [Fail2Ban] %s: banned %s" % (
- self.jail.name, aInfo['ip'])
- self.assertIn(subject, self.smtpd.data)
- self.assertIn(
- "%i attempts" % aInfo['failures'], self.smtpd.data)
+ self._exec_and_wait(lambda: self.action.ban(aInfo), short=restored)
+ if restored: # no mail, should raises attribute error:
+ self.assertRaises(AttributeError, lambda: self.smtpd.mailfrom)
+ return
+ self.assertEqual(self.smtpd.mailfrom, "fail2ban")
+ self.assertEqual(self.smtpd.rcpttos, ["root"])
+ subject = "Subject: [Fail2Ban] %s: banned %s" % (
+ self.jail.name, aInfo['ip'])
+ self.assertIn(subject, self.smtpd.data)
+ self.assertIn(
+ "%i attempts" % aInfo['failures'], self.smtpd.data)
- self.action.matches = "matches"
- self._exec_and_wait(lambda: self.action.ban(aInfo))
- self.assertIn(aInfo['matches'], self.smtpd.data)
+ self.action.matches = "matches"
+ self._exec_and_wait(lambda: self.action.ban(aInfo))
+ self.assertIn(aInfo['matches'], self.smtpd.data)
- self.action.matches = "ipjailmatches"
- self._exec_and_wait(lambda: self.action.ban(aInfo))
- self.assertIn(aInfo['ipjailmatches'], self.smtpd.data)
+ self.action.matches = "ipjailmatches"
+ self._exec_and_wait(lambda: self.action.ban(aInfo))
+ self.assertIn(aInfo['ipjailmatches'], self.smtpd.data)
- self.action.matches = "ipmatches"
- self._exec_and_wait(lambda: self.action.ban(aInfo))
- self.assertIn(aInfo['ipmatches'], self.smtpd.data)
-
- def testBan(self):
- self._testBan()
+ self.action.matches = "ipmatches"
+ self._exec_and_wait(lambda: self.action.ban(aInfo))
+ self.assertIn(aInfo['ipmatches'], self.smtpd.data)
+
+ def testBan(self):
+ self._testBan()
- def testNOPByRestored(self):
- self._testBan(restored=True)
+ def testNOPByRestored(self):
+ self._testBan(restored=True)
- def testOptions(self):
- self._exec_and_wait(self.action.start)
- self.assertEqual(self.smtpd.mailfrom, "fail2ban")
- self.assertEqual(self.smtpd.rcpttos, ["root"])
+ def testOptions(self):
+ self._exec_and_wait(self.action.start)
+ self.assertEqual(self.smtpd.mailfrom, "fail2ban")
+ self.assertEqual(self.smtpd.rcpttos, ["root"])
- self.action.fromname = "Test"
- self.action.fromaddr = "test@example.com"
- self.action.toaddr = "test@example.com, test2@example.com"
- self._exec_and_wait(self.action.start)
- self.assertEqual(self.smtpd.mailfrom, "test@example.com")
- self.assertTrue("From: %s <%s>" %
- (self.action.fromname, self.action.fromaddr) in self.smtpd.data)
- self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"]))
+ self.action.fromname = "Test"
+ self.action.fromaddr = "test@example.com"
+ self.action.toaddr = "test@example.com, test2@example.com"
+ self._exec_and_wait(self.action.start)
+ self.assertEqual(self.smtpd.mailfrom, "test@example.com")
+ self.assertTrue("From: %s <%s>" %
+ (self.action.fromname, self.action.fromaddr) in self.smtpd.data)
+ self.assertEqual(set(self.smtpd.rcpttos), set(["test@example.com", "test2@example.com"]))
+
+except ImportError as e:
+ print("I: Skipping smtp tests: %s" % e)
diff --git a/fail2ban/tests/fail2banregextestcase.py b/fail2ban/tests/fail2banregextestcase.py
index 685aa3b0..9a9c7d50 100644
--- a/fail2ban/tests/fail2banregextestcase.py
+++ b/fail2ban/tests/fail2banregextestcase.py
@@ -221,7 +221,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.pruneLog()
self.assertTrue(_test_exec(
"-d", "^Epoch",
- "1490349000 test failed.dns.ch", "^\s*test \S+"
+ "1490349000 test failed.dns.ch", r"^\s*test \S+"
))
self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed', all=True)
self.assertNotLogged('Unable to find a corresponding IP address')
@@ -229,7 +229,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.pruneLog()
self.assertTrue(_test_exec(
"-d", "^Epoch", "-o", "id",
- "1490349000 test this/is/some/path/32", "^\s*test \S+"
+ "1490349000 test this/is/some/path/32", r"^\s*test \S+"
))
self.assertLogged('this/is/some/path/32', all=True)
@@ -439,23 +439,23 @@ class Fail2banRegexTest(LogCaptureTestCase):
# with different ID/IP from failregex (ID/User from first, IP from second message):
self.assertTrue(_test('-o', 'ID:"" | IP: | U:', log,
flt+'[failregex="'
- '^'+prefix+'User \S+ not allowed\n'
- '^'+prefix+'Received disconnect from '
+ '^'+prefix+r'User \S+ not allowed'+'\n'
+ '^'+prefix+r'Received disconnect from '
'"]'))
self.assertLogged('ID:"User root" | IP:192.0.2.76 | U:root')
self.pruneLog()
# with different ID/IP from failregex (User from first, ID and IP from second message):
self.assertTrue(_test('-o', 'ID:"" | IP: | U:', log,
flt+'[failregex="'
- '^'+prefix+'User \S+ not allowed\n'
- '^'+prefix+'Received disconnect from port \d+'
+ '^'+prefix+r'User \S+ not allowed'+'\n'
+ '^'+prefix+r'Received disconnect from port \d+'
'"]'))
self.assertLogged('ID:"192.0.2.76 port 58846" | IP:192.0.2.76 | U:root')
self.pruneLog()
# first with sshd and prefregex:
_test_variants()
# the same without prefregex and MLFID directly in failregex (no merge with prefregex groups):
- _test_variants('common', prefix="\s*\S+ sshd\[\d+\]:\s+")
+ _test_variants('common', prefix=r"\s*\S+ sshd\[\d+\]:\s+")
def testNoDateTime(self):
# datepattern doesn't match:
@@ -541,7 +541,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
'svc[2] connect started 192.0.2.4\n'
'svc[2] connect authorized 192.0.2.4\n'
'svc[2] connect finished 192.0.2.4\n',
- 'common[prefregex="^svc\[\d+\] connect .+$"'
+ r'common[prefregex="^svc\[\d+\] connect .+$"'
', failregex="'
'^started\n'
'^finished \n'
diff --git a/fail2ban/tests/servertestcase.py b/fail2ban/tests/servertestcase.py
index 311341ae..a9e9ed43 100644
--- a/fail2ban/tests/servertestcase.py
+++ b/fail2ban/tests/servertestcase.py
@@ -1372,12 +1372,12 @@ class ServerConfigReaderTests(LogCaptureTestCase):
"`{ nft flush set inet f2b-table addr6-set-j-w-nft-mp 2> /dev/null; } || ",
),
'stop': (
- "`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr-set-j-w-nft-mp\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
- "`nft delete rule inet f2b-table f2b-chain $hdl; done`",
- "`nft delete set inet f2b-table addr-set-j-w-nft-mp`",
- "`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr6-set-j-w-nft-mp\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
- "`nft delete rule inet f2b-table f2b-chain $hdl; done`",
- "`nft delete set inet f2b-table addr6-set-j-w-nft-mp`",
+ r"`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr-set-j-w-nft-mp\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
+ r"`nft delete rule inet f2b-table f2b-chain $hdl; done`",
+ r"`nft delete set inet f2b-table addr-set-j-w-nft-mp`",
+ r"`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr6-set-j-w-nft-mp\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
+ r"`nft delete rule inet f2b-table f2b-chain $hdl; done`",
+ r"`nft delete set inet f2b-table addr6-set-j-w-nft-mp`",
),
'ip4-check': (
r"`nft list chain inet f2b-table f2b-chain | grep -q '@addr-set-j-w-nft-mp[ \t]'`",
@@ -1418,12 +1418,12 @@ class ServerConfigReaderTests(LogCaptureTestCase):
"`{ nft flush set inet f2b-table addr6-set-j-w-nft-ap 2> /dev/null; } || ",
),
'stop': (
- "`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr-set-j-w-nft-ap\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
- "`nft delete rule inet f2b-table f2b-chain $hdl; done`",
- "`nft delete set inet f2b-table addr-set-j-w-nft-ap`",
- "`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr6-set-j-w-nft-ap\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
- "`nft delete rule inet f2b-table f2b-chain $hdl; done`",
- "`nft delete set inet f2b-table addr6-set-j-w-nft-ap`",
+ r"`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr-set-j-w-nft-ap\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
+ r"`nft delete rule inet f2b-table f2b-chain $hdl; done`",
+ r"`nft delete set inet f2b-table addr-set-j-w-nft-ap`",
+ r"`{ nft -a list chain inet f2b-table f2b-chain | grep -oP '@addr6-set-j-w-nft-ap\s+.*\s+\Khandle\s+(\d+)$'; } | while read -r hdl; do`",
+ r"`nft delete rule inet f2b-table f2b-chain $hdl; done`",
+ r"`nft delete set inet f2b-table addr6-set-j-w-nft-ap`",
),
'ip4-check': (
r"""`nft list chain inet f2b-table f2b-chain | grep -q '@addr-set-j-w-nft-ap[ \t]'`""",