fail2ban/fail2ban/tests/servertestcase.py

1390 lines
48 KiB
Python

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :
# This file is part of Fail2Ban.
#
# Fail2Ban is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Fail2Ban is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Author: Cyril Jaquier
#
__author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import unittest
import time
import tempfile
import os
import locale
import sys
import platform
from ..server.failregex import Regex, FailRegex, RegexException
from ..server import actions as _actions
from ..server.server import Server
from ..server.ipdns import IPAddr
from ..server.jail import Jail
from ..server.jailthread import JailThread
from ..server.utils import Utils
from .utils import LogCaptureTestCase
from ..helpers import getLogger
from .. import version
try:
from ..server import filtersystemd
except ImportError: # pragma: no cover
filtersystemd = None
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
FAST_BACKEND = "polling"
logSys = getLogger("fail2ban")
class TestServer(Server):
def setLogLevel(self, *args, **kwargs):
pass
def setLogTarget(self, *args, **kwargs):
pass
class TransmitterBase(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.transm = self.server._Server__transm
self.tmp_files = []
sock_fd, sock_name = tempfile.mkstemp('fail2ban.sock', 'transmitter')
os.close(sock_fd)
self.tmp_files.append(sock_name)
pidfile_fd, pidfile_name = tempfile.mkstemp(
'fail2ban.pid', 'transmitter')
os.close(pidfile_fd)
self.tmp_files.append(pidfile_name)
self.server.start(sock_name, pidfile_name, force=False)
self.jailName = "TestJail1"
self.server.addJail(self.jailName, FAST_BACKEND)
def tearDown(self):
"""Call after every test case."""
self.server.quit()
for f in self.tmp_files:
if os.path.exists(f):
os.remove(f)
def setGetTest(self, cmd, inValue, outValue=(None,), outCode=0, jail=None, repr_=False):
"""Process set/get commands and compare both return values
with outValue if it was given otherwise with inValue"""
setCmd = ["set", cmd, inValue]
getCmd = ["get", cmd]
if jail is not None:
setCmd.insert(1, jail)
getCmd.insert(1, jail)
# if outValue was not given (now None is allowed return/compare value also)
if outValue == (None,):
outValue = inValue
def v(x):
"""Prepare value for comparison"""
return (repr(x) if repr_ else x)
self.assertEqual(v(self.transm.proceed(setCmd)), v((outCode, outValue)))
if not outCode:
# if we expected to get it set without problem, check new value
self.assertEqual(v(self.transm.proceed(getCmd)), v((0, outValue)))
def setGetTestNOK(self, cmd, inValue, jail=None):
setCmd = ["set", cmd, inValue]
getCmd = ["get", cmd]
if jail is not None:
setCmd.insert(1, jail)
getCmd.insert(1, jail)
# Get initial value before trying invalid value
initValue = self.transm.proceed(getCmd)[1]
self.assertEqual(self.transm.proceed(setCmd)[0], 1)
# Check after failed set that value is same as previous
self.assertEqual(self.transm.proceed(getCmd), (0, initValue))
def jailAddDelTest(self, cmd, values, jail):
cmdAdd = "add" + cmd
cmdDel = "del" + cmd
self.assertEqual(
self.transm.proceed(["get", jail, cmd]), (0, []))
for n, value in enumerate(values):
ret = self.transm.proceed(["set", jail, cmdAdd, value])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[:n+1]))))
ret = self.transm.proceed(["get", jail, cmd])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[:n+1]))))
for n, value in enumerate(values):
ret = self.transm.proceed(["set", jail, cmdDel, value])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[n+1:]))))
ret = self.transm.proceed(["get", jail, cmd])
self.assertEqual((ret[0], sorted(map(str, ret[1]))), (0, sorted(map(str, values[n+1:]))))
def jailAddDelRegexTest(self, cmd, inValues, outValues, jail):
cmdAdd = "add" + cmd
cmdDel = "del" + cmd
self.assertEqual(
self.transm.proceed(["get", jail, cmd]), (0, []))
for n, value in enumerate(inValues):
self.assertEqual(
self.transm.proceed(["set", jail, cmdAdd, value]),
(0, outValues[:n+1]))
self.assertEqual(
self.transm.proceed(["get", jail, cmd]),
(0, outValues[:n+1]))
for n, value in enumerate(inValues):
self.assertEqual(
self.transm.proceed(["set", jail, cmdDel, 0]), # First item
(0, outValues[n+1:]))
self.assertEqual(
self.transm.proceed(["get", jail, cmd]),
(0, outValues[n+1:]))
class Transmitter(TransmitterBase):
def setUp(self):
self.server = TestServer()
super(Transmitter, self).setUp()
def testStopServer(self):
self.assertEqual(self.transm.proceed(["stop"]), (0, None))
def testPing(self):
self.assertEqual(self.transm.proceed(["ping"]), (0, "pong"))
def testVersion(self):
self.assertEqual(self.transm.proceed(["version"]), (0, version.version))
def testSleep(self):
if not unittest.F2B.fast:
t0 = time.time()
self.assertEqual(self.transm.proceed(["sleep", "0.1"]), (0, None))
t1 = time.time()
# Approx 0.1 second delay but not faster
dt = t1 - t0
self.assertTrue(0.09 < dt < 0.2, msg="Sleep was %g sec" % dt)
else: # pragma: no cover
self.assertEqual(self.transm.proceed(["sleep", "0.0001"]), (0, None))
def testDatabase(self):
if not unittest.F2B.memory_db:
tmp, tmpFilename = tempfile.mkstemp(".db", "fail2ban_")
else: # pragma: no cover
tmpFilename = ':memory:'
# Jails present, can't change database
self.setGetTestNOK("dbfile", tmpFilename)
self.server.delJail(self.jailName)
self.setGetTest("dbfile", tmpFilename)
# the same file name (again no jails / not changed):
self.setGetTest("dbfile", tmpFilename)
self.setGetTest("dbpurgeage", "600", 600)
self.setGetTestNOK("dbpurgeage", "LIZARD")
# the same file name (again with jails / not changed):
self.server.addJail(self.jailName, FAST_BACKEND)
self.setGetTest("dbfile", tmpFilename)
self.server.delJail(self.jailName)
# Disable database
self.assertEqual(self.transm.proceed(
["set", "dbfile", "None"]),
(0, None))
self.assertEqual(self.transm.proceed(
["get", "dbfile"]),
(0, None))
self.assertEqual(self.transm.proceed(
["set", "dbpurgeage", "500"]),
(0, None))
self.assertEqual(self.transm.proceed(
["get", "dbpurgeage"]),
(0, None))
# the same (again with jails / not changed):
self.server.addJail(self.jailName, FAST_BACKEND)
self.assertEqual(self.transm.proceed(
["set", "dbfile", "None"]),
(0, None))
if not unittest.F2B.memory_db:
os.close(tmp)
os.unlink(tmpFilename)
def testAddJail(self):
jail2 = "TestJail2"
jail3 = "TestJail3"
jail4 = "TestJail4"
self.assertEqual(
self.transm.proceed(["add", jail2, "polling"]), (0, jail2))
self.assertEqual(self.transm.proceed(["add", jail3]), (0, jail3))
self.assertEqual(
self.transm.proceed(["add", jail4, "invalid backend"])[0], 1)
self.assertEqual(
self.transm.proceed(["add", jail4, "auto"]), (0, jail4))
# Duplicate Jail
self.assertEqual(
self.transm.proceed(["add", self.jailName, "polling"])[0], 1)
# All name is reserved
self.assertEqual(
self.transm.proceed(["add", "all", "polling"])[0], 1)
def testStartStopJail(self):
self.assertEqual(
self.transm.proceed(["start", self.jailName]), (0, None))
time.sleep(Utils.DEFAULT_SLEEP_TIME)
# wait until not started (3 seconds as long as any RuntimeError, ex.: RuntimeError('cannot join thread before it is started',)):
self.assertTrue( Utils.wait_for(
lambda: self.server.isAlive(1) and not isinstance(self.transm.proceed(["status", self.jailName]), RuntimeError),
3) )
self.assertEqual(
self.transm.proceed(["stop", self.jailName]), (0, None))
self.assertTrue(self.jailName not in self.server._Server__jails)
def testStartStopAllJail(self):
self.server.addJail("TestJail2", FAST_BACKEND)
self.assertEqual(
self.transm.proceed(["start", self.jailName]), (0, None))
self.assertEqual(
self.transm.proceed(["start", "TestJail2"]), (0, None))
# yoh: workaround for gh-146. I still think that there is some
# race condition and missing locking somewhere, but for now
# giving it a small delay reliably helps to proceed with tests
time.sleep(Utils.DEFAULT_SLEEP_TIME)
self.assertTrue( Utils.wait_for(
lambda: self.server.isAlive(2) and not isinstance(self.transm.proceed(["status", self.jailName]), RuntimeError),
3) )
self.assertEqual(self.transm.proceed(["stop", "all"]), (0, None))
self.assertTrue( Utils.wait_for( lambda: not len(self.server._Server__jails), 3) )
self.assertTrue(self.jailName not in self.server._Server__jails)
self.assertTrue("TestJail2" not in self.server._Server__jails)
def testJailIdle(self):
self.assertEqual(
self.transm.proceed(["set", self.jailName, "idle", "on"]),
(0, True))
self.assertEqual(
self.transm.proceed(["set", self.jailName, "idle", "off"]),
(0, False))
self.assertEqual(
self.transm.proceed(["set", self.jailName, "idle", "CAT"])[0],
1)
def testJailFindTime(self):
self.setGetTest("findtime", "120", 120, jail=self.jailName)
self.setGetTest("findtime", "60", 60, jail=self.jailName)
self.setGetTest("findtime", "30m", 30*60, jail=self.jailName)
self.setGetTest("findtime", "-60", -60, jail=self.jailName)
self.setGetTestNOK("findtime", "Dog", jail=self.jailName)
def testJailBanTime(self):
self.setGetTest("bantime", "600", 600, jail=self.jailName)
self.setGetTest("bantime", "50", 50, jail=self.jailName)
self.setGetTest("bantime", "-50", -50, jail=self.jailName)
self.setGetTest("bantime", "15d 5h 30m", 1315800, jail=self.jailName)
self.setGetTestNOK("bantime", "Cat", jail=self.jailName)
def testDatePattern(self):
self.setGetTest("datepattern", "%%%Y%m%d%H%M%S",
("%%%Y%m%d%H%M%S", "%YearMonthDay24hourMinuteSecond"),
jail=self.jailName)
self.setGetTest(
"datepattern", "Epoch", (None, "Epoch"), jail=self.jailName)
self.setGetTest(
"datepattern", "TAI64N", (None, "TAI64N"), jail=self.jailName)
self.setGetTestNOK("datepattern", "%Cat%a%%%g", jail=self.jailName)
def testJailUseDNS(self):
self.setGetTest("usedns", "yes", jail=self.jailName)
self.setGetTest("usedns", "warn", jail=self.jailName)
self.setGetTest("usedns", "no", jail=self.jailName)
# Safe default should be "no"
value = "Fish"
self.assertEqual(
self.transm.proceed(["set", self.jailName, "usedns", value]),
(0, "no"))
def testJailBanIP(self):
self.server.startJail(self.jailName) # Jail must be started
self.assertEqual(
self.transm.proceed(["set", self.jailName, "banip", "127.0.0.1"]),
(0, "127.0.0.1"))
time.sleep(Utils.DEFAULT_SLEEP_TIME) # Give chance to ban
self.assertEqual(
self.transm.proceed(["set", self.jailName, "banip", "Badger"]),
(0, "Badger")) #NOTE: Is IP address validated? Is DNS Lookup done?
time.sleep(Utils.DEFAULT_SLEEP_TIME) # Give chance to ban
# Unban IP
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "unbanip", "127.0.0.1"]),
(0, "127.0.0.1"))
# Unban IP which isn't banned
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "unbanip", "192.168.1.1"])[0],1)
def testJailMaxRetry(self):
self.setGetTest("maxretry", "5", 5, jail=self.jailName)
self.setGetTest("maxretry", "2", 2, jail=self.jailName)
self.setGetTest("maxretry", "-2", -2, jail=self.jailName)
self.setGetTestNOK("maxretry", "Duck", jail=self.jailName)
def testJailMaxLines(self):
self.setGetTest("maxlines", "5", 5, jail=self.jailName)
self.setGetTest("maxlines", "2", 2, jail=self.jailName)
self.setGetTestNOK("maxlines", "-2", jail=self.jailName)
self.setGetTestNOK("maxlines", "Duck", jail=self.jailName)
def testJailLogEncoding(self):
self.setGetTest("logencoding", "UTF-8", jail=self.jailName)
self.setGetTest("logencoding", "ascii", jail=self.jailName)
self.setGetTest("logencoding", "auto", locale.getpreferredencoding(),
jail=self.jailName)
self.setGetTestNOK("logencoding", "Monkey", jail=self.jailName)
def testJailLogPath(self):
self.jailAddDelTest(
"logpath",
[
os.path.join(TEST_FILES_DIR, "testcase01.log"),
os.path.join(TEST_FILES_DIR, "testcase02.log"),
os.path.join(TEST_FILES_DIR, "testcase03.log"),
],
self.jailName
)
# Try duplicates
value = os.path.join(TEST_FILES_DIR, "testcase04.log")
self.assertEqual(
self.transm.proceed(["set", self.jailName, "addlogpath", value]),
(0, [value]))
# Will silently ignore duplicate
self.assertEqual(
self.transm.proceed(["set", self.jailName, "addlogpath", value]),
(0, [value]))
self.assertEqual(
self.transm.proceed(["get", self.jailName, "logpath"]),
(0, [value]))
self.assertEqual(
self.transm.proceed(["set", self.jailName, "dellogpath", value]),
(0, []))
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addlogpath", value, "tail"]),
(0, [value]))
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addlogpath", value, "head"]),
(0, [value]))
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addlogpath", value, "badger"])[0],
1)
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addlogpath", value, value, value])[0],
1)
def testJailLogPathInvalidFile(self):
# Invalid file
value = "this_file_shouldn't_exist"
result = self.transm.proceed(
["set", self.jailName, "addlogpath", value])
self.assertTrue(isinstance(result[1], IOError))
def testJailLogPathBrokenSymlink(self):
# Broken symlink
name = tempfile.mktemp(prefix='tmp_fail2ban_broken_symlink')
sname = name + '.slink'
os.symlink(name, sname)
result = self.transm.proceed(
["set", self.jailName, "addlogpath", sname])
self.assertTrue(isinstance(result[1], IOError))
os.unlink(sname)
def testJailIgnoreIP(self):
self.jailAddDelTest(
"ignoreip",
[
"127.0.0.1",
"192.168.1.1",
"8.8.8.8",
],
self.jailName
)
# Try duplicates
value = "127.0.0.1"
self.assertEqual(
self.transm.proceed(["set", self.jailName, "addignoreip", value]),
(0, [value]))
# Will allow duplicate
#NOTE: Should duplicates be allowed, or silent ignore like logpath?
self.assertEqual(
self.transm.proceed(["set", self.jailName, "addignoreip", value]),
(0, [value, value]))
self.assertEqual(
self.transm.proceed(["get", self.jailName, "ignoreip"]),
(0, [value, value]))
self.assertEqual(
self.transm.proceed(["set", self.jailName, "delignoreip", value]),
(0, [value]))
def testJailIgnoreCommand(self):
self.setGetTest("ignorecommand", "bin ", jail=self.jailName)
def testJailRegex(self):
self.jailAddDelRegexTest("failregex",
[
"user john at <HOST>",
"Admin user login from <HOST>",
"failed attempt from <HOST> again",
],
[
"user john at %s" % (Regex._resolveHostTag('<HOST>')),
"Admin user login from %s" % (Regex._resolveHostTag('<HOST>')),
"failed attempt from %s again" % (Regex._resolveHostTag('<HOST>')),
],
self.jailName
)
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addfailregex", "No host regex"])[0],
1)
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addfailregex", 654])[0],
1)
def testJailIgnoreRegex(self):
self.jailAddDelRegexTest("ignoreregex",
[
"user john",
"Admin user login from <HOST>",
"Dont match me!",
],
[
"user john",
"Admin user login from %s" % (Regex._resolveHostTag('<HOST>')),
"Dont match me!",
],
self.jailName
)
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addignoreregex", "Invalid [regex"])[0],
1)
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "addignoreregex", 50])[0],
1)
def testStatus(self):
jails = [self.jailName]
self.assertEqual(self.transm.proceed(["status"]),
(0, [('Number of jail', len(jails)), ('Jail list', ", ".join(jails))]))
self.server.addJail("TestJail2", FAST_BACKEND)
jails.append("TestJail2")
self.assertEqual(self.transm.proceed(["status"]),
(0, [('Number of jail', len(jails)), ('Jail list', ", ".join(jails))]))
def testJailStatus(self):
self.assertEqual(self.transm.proceed(["status", self.jailName]),
(0,
[
('Filter', [
('Currently failed', 0),
('Total failed', 0),
('File list', [])]
),
('Actions', [
('Currently banned', 0),
('Total banned', 0),
('Banned IP list', [])]
)
]
)
)
def testJailStatusBasic(self):
self.assertEqual(self.transm.proceed(["status", self.jailName, "basic"]),
(0,
[
('Filter', [
('Currently failed', 0),
('Total failed', 0),
('File list', [])]
),
('Actions', [
('Currently banned', 0),
('Total banned', 0),
('Banned IP list', [])]
)
]
)
)
def testJailStatusBasicKwarg(self):
self.assertEqual(self.transm.proceed(["status", self.jailName, "INVALID"]),
(0,
[
('Filter', [
('Currently failed', 0),
('Total failed', 0),
('File list', [])]
),
('Actions', [
('Currently banned', 0),
('Total banned', 0),
('Banned IP list', [])]
)
]
)
)
def testJailStatusCymru(self):
try:
import dns.exception
import dns.resolver
except ImportError:
value = ['error']
else:
value = []
self.assertEqual(self.transm.proceed(["status", self.jailName, "cymru"]),
(0,
[
('Filter', [
('Currently failed', 0),
('Total failed', 0),
('File list', [])]
),
('Actions', [
('Currently banned', 0),
('Total banned', 0),
('Banned IP list', []),
('Banned ASN list', value),
('Banned Country list', value),
('Banned RIR list', value)]
)
]
)
)
def testAction(self):
action = "TestCaseAction"
cmdList = [
"actionstart",
"actionstop",
"actioncheck",
"actionban",
"actionunban",
]
cmdValueList = [
"Action Start",
"Action Stop",
"Action Check",
"Action Ban",
"Action Unban",
]
self.assertEqual(
self.transm.proceed(["set", self.jailName, "addaction", action]),
(0, action))
self.assertEqual(
self.transm.proceed(
["get", self.jailName, "actions"])[1][0],
action)
for cmd, value in zip(cmdList, cmdValueList):
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "action", action, cmd, value]),
(0, value))
for cmd, value in zip(cmdList, cmdValueList):
self.assertEqual(
self.transm.proceed(["get", self.jailName, "action", action, cmd]),
(0, value))
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "action", action, "KEY", "VALUE"]),
(0, "VALUE"))
self.assertEqual(
self.transm.proceed(
["get", self.jailName, "action", action, "KEY"]),
(0, "VALUE"))
self.assertEqual(
self.transm.proceed(
["get", self.jailName, "action", action, "InvalidKey"])[0],
1)
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "action", action, "timeout", "10"]),
(0, 10))
self.assertEqual(
self.transm.proceed(
["get", self.jailName, "action", action, "timeout"]),
(0, 10))
self.assertEqual(
self.transm.proceed(["set", self.jailName, "delaction", action]),
(0, None))
self.assertEqual(
self.transm.proceed(
["set", self.jailName, "delaction", "Doesn't exist"])[0],1)
def testPythonActionMethodsAndProperties(self):
action = "TestCaseAction"
try:
out = self.transm.proceed(
["set", self.jailName, "addaction", action,
os.path.join(TEST_FILES_DIR, "action.d", "action.py"),
'{"opt1": "value"}'])
self.assertEqual(out, (0, action))
except AssertionError:
if ((2, 6) <= sys.version_info < (2, 6, 5)) \
and '__init__() keywords must be strings' in out[1]:
# known issue http://bugs.python.org/issue2646 in 2.6 series
# since general Fail2Ban warnings are suppressed in normal
# operation -- let's issue Python's native warning here
import warnings
warnings.warn(
"Your version of Python %s seems to experience a known "
"issue forbidding correct operation of Fail2Ban: "
"http://bugs.python.org/issue2646 Upgrade your Python and "
"meanwhile other intestPythonActionMethodsAndProperties will "
"be skipped" % (sys.version))
return
raise
self.assertEqual(
sorted(self.transm.proceed(["get", self.jailName,
"actionproperties", action])[1]),
['opt1', 'opt2'])
self.assertEqual(
self.transm.proceed(["get", self.jailName, "action", action,
"opt1"]),
(0, 'value'))
self.assertEqual(
self.transm.proceed(["get", self.jailName, "action", action,
"opt2"]),
(0, None))
self.assertEqual(
sorted(self.transm.proceed(["get", self.jailName, "actionmethods",
action])[1]),
['ban', 'start', 'stop', 'testmethod', 'unban'])
self.assertEqual(
self.transm.proceed(["set", self.jailName, "action", action,
"testmethod", '{"text": "world!"}']),
(0, 'Hello world! value'))
self.assertEqual(
self.transm.proceed(["set", self.jailName, "action", action,
"opt1", "another value"]),
(0, 'another value'))
self.assertEqual(
self.transm.proceed(["set", self.jailName, "action", action,
"testmethod", '{"text": "world!"}']),
(0, 'Hello world! another value'))
def testNOK(self):
self.assertEqual(self.transm.proceed(["INVALID", "COMMAND"])[0],1)
def testSetNOK(self):
self.assertEqual(
self.transm.proceed(["set", "INVALID", "COMMAND"])[0],1)
def testGetNOK(self):
self.assertEqual(
self.transm.proceed(["get", "INVALID", "COMMAND"])[0],1)
def testStatusNOK(self):
self.assertEqual(
self.transm.proceed(["status", "INVALID", "COMMAND"])[0],1)
def testJournalMatch(self):
if not filtersystemd: # pragma: no cover
if sys.version_info >= (2, 7):
raise unittest.SkipTest(
"systemd python interface not available")
return
jailName = "TestJail2"
self.server.addJail(jailName, "systemd")
values = [
"_SYSTEMD_UNIT=sshd.service",
"TEST_FIELD1=ABC",
"_HOSTNAME=example.com",
]
for n, value in enumerate(values):
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch", value]),
(0, [[val] for val in values[:n+1]]))
for n, value in enumerate(values):
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value]),
(0, [[val] for val in values[n+1:]]))
# Try duplicates
value = "_COMM=sshd"
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch", value]),
(0, [[value]]))
# Duplicates are accepted, as automatically OR'd, and journalctl
# also accepts them without issue.
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch", value]),
(0, [[value], [value]]))
# Remove first instance
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value]),
(0, [[value]]))
# Remove second instance
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value]),
(0, []))
value = [
"_COMM=sshd", "+", "_SYSTEMD_UNIT=sshd.service", "_UID=0"]
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch"] + value),
(0, [["_COMM=sshd"], ["_SYSTEMD_UNIT=sshd.service", "_UID=0"]]))
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch"] + value[:1]),
(0, [["_SYSTEMD_UNIT=sshd.service", "_UID=0"]]))
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch"] + value[2:]),
(0, []))
# Invalid match
value = "This isn't valid!"
result = self.transm.proceed(
["set", jailName, "addjournalmatch", value])
self.assertTrue(isinstance(result[1], ValueError))
# Delete invalid match
value = "FIELD=NotPresent"
result = self.transm.proceed(
["set", jailName, "deljournalmatch", value])
self.assertTrue(isinstance(result[1], ValueError))
class TransmitterLogging(TransmitterBase):
def setUp(self):
self.server = Server()
self.server.setLogTarget("/dev/null")
self.server.setLogLevel("CRITICAL")
self.server.setSyslogSocket("auto")
super(TransmitterLogging, self).setUp()
def testLogTarget(self):
logTargets = []
for _ in xrange(3):
tmpFile = tempfile.mkstemp("fail2ban", "transmitter")
logTargets.append(tmpFile[1])
os.close(tmpFile[0])
for logTarget in logTargets:
self.setGetTest("logtarget", logTarget)
# If path is invalid, do not change logtarget
value = "/this/path/should/not/exist"
self.setGetTestNOK("logtarget", value)
self.transm.proceed(["set", "logtarget", "/dev/null"])
for logTarget in logTargets:
os.remove(logTarget)
self.setGetTest("logtarget", "STDOUT")
self.setGetTest("logtarget", "STDERR")
def testLogTargetSYSLOG(self):
if not os.path.exists("/dev/log") and sys.version_info >= (2, 7):
raise unittest.SkipTest("'/dev/log' not present")
elif not os.path.exists("/dev/log"):
return
self.assertTrue(self.server.getSyslogSocket(), "auto")
self.setGetTest("logtarget", "SYSLOG")
self.assertTrue(self.server.getSyslogSocket(), "/dev/log")
def testSyslogSocket(self):
self.setGetTest("syslogsocket", "/dev/log/NEW/PATH")
def testSyslogSocketNOK(self):
self.setGetTest("syslogsocket", "/this/path/should/not/exist")
self.setGetTestNOK("logtarget", "SYSLOG")
# set back for other tests
self.setGetTest("syslogsocket", "/dev/log")
self.setGetTest("logtarget", "SYSLOG",
**{True: {}, # should work on Linux
False: dict( # expect to fail otherwise
outCode=1,
outValue=Exception('Failed to change log target'),
repr_=True # Exceptions are not comparable apparently
)
}[platform.system() in ('Linux',) and os.path.exists('/dev/log')]
)
def testLogLevel(self):
self.setGetTest("loglevel", "HEAVYDEBUG")
self.setGetTest("loglevel", "DEBUG")
self.setGetTest("loglevel", "INFO")
self.setGetTest("loglevel", "NOTICE")
self.setGetTest("loglevel", "WARNING")
self.setGetTest("loglevel", "ERROR")
self.setGetTest("loglevel", "CRITICAL")
self.setGetTest("loglevel", "cRiTiCaL", "CRITICAL")
self.setGetTestNOK("loglevel", "Bird")
def testFlushLogs(self):
self.assertEqual(self.transm.proceed(["flushlogs"]), (0, "rolled over"))
try:
f, fn = tempfile.mkstemp("fail2ban.log")
os.close(f)
self.server.setLogLevel("WARNING")
self.assertEqual(self.transm.proceed(["set", "logtarget", fn]), (0, fn))
l = getLogger('fail2ban')
l.warning("Before file moved")
try:
f2, fn2 = tempfile.mkstemp("fail2ban.log")
os.close(f2)
os.rename(fn, fn2)
l.warning("After file moved")
self.assertEqual(self.transm.proceed(["flushlogs"]), (0, "rolled over"))
l.warning("After flushlogs")
with open(fn2,'r') as f:
line1 = f.next()
if line1.find('Changed logging target to') >= 0:
line1 = f.next()
self.assertTrue(line1.endswith("Before file moved\n"))
line2 = f.next()
self.assertTrue(line2.endswith("After file moved\n"))
try:
n = f.next()
if n.find("Command: ['flushlogs']") >=0:
self.assertRaises(StopIteration, f.next)
else:
self.fail("Exception StopIteration or Command: ['flushlogs'] expected. Got: %s" % n)
except StopIteration:
pass # on higher debugging levels this is expected
with open(fn,'r') as f:
line1 = f.next()
if line1.find('rollover performed on') >= 0:
line1 = f.next()
self.assertTrue(line1.endswith("After flushlogs\n"))
self.assertRaises(StopIteration, f.next)
f.close()
finally:
os.remove(fn2)
finally:
try:
os.remove(fn)
except OSError:
pass
self.assertEqual(self.transm.proceed(["set", "logtarget", "STDERR"]), (0, "STDERR"))
self.assertEqual(self.transm.proceed(["flushlogs"]), (0, "flushed"))
class JailTests(unittest.TestCase):
def testLongName(self):
# Just a smoke test for now
longname = "veryveryverylongname"
jail = Jail(longname)
self.assertEqual(jail.name, longname)
class RegexTests(unittest.TestCase):
def testInit(self):
# Should raise an Exception upon empty regex
self.assertRaises(RegexException, Regex, '')
self.assertRaises(RegexException, Regex, ' ')
self.assertRaises(RegexException, Regex, '\t')
def testStr(self):
# .replace just to guarantee uniform use of ' or " in the %r
self.assertEqual(str(Regex('a')).replace('"', "'"), "Regex('a')")
# Class name should be proper
self.assertTrue(str(FailRegex('<HOST>')).startswith("FailRegex("))
def testHost(self):
self.assertRaises(RegexException, FailRegex, '')
# Testing obscure case when host group might be missing in the matched pattern,
# e.g. if we made it optional.
fr = FailRegex('%%<HOST>?')
self.assertFalse(fr.hasMatched())
fr.search([('%%',"","")])
self.assertTrue(fr.hasMatched())
self.assertRaises(RegexException, fr.getHost)
class _BadThread(JailThread):
def run(self):
raise RuntimeError('run bad thread exception')
class LoggingTests(LogCaptureTestCase):
def testGetF2BLogger(self):
testLogSys = getLogger("fail2ban.some.string.with.name")
self.assertEqual(testLogSys.parent.name, "fail2ban")
self.assertEqual(testLogSys.name, "fail2ban.name")
def testFail2BanExceptHook(self):
prev_exchook = sys.__excepthook__
x = []
sys.__excepthook__ = lambda *args: x.append(args)
try:
badThread = _BadThread()
badThread.start()
badThread.join()
self.assertLogged("Unhandled exception")
finally:
sys.__excepthook__ = prev_exchook
self.assertEqual(len(x), 1)
self.assertEqual(x[0][0], RuntimeError)
from clientreadertestcase import ActionReader, JailReader, JailsReader, CONFIG_DIR, STOCK
class ServerConfigReaderTests(LogCaptureTestCase):
def __init__(self, *args, **kwargs):
super(ServerConfigReaderTests, self).__init__(*args, **kwargs)
self.__share_cfg = {}
def setUp(self):
"""Call before every test case."""
super(ServerConfigReaderTests, self).setUp()
self._execCmdLst = []
def tearDown(self):
"""Call after every test case."""
super(ServerConfigReaderTests, self).tearDown()
def _executeCmd(self, realCmd, timeout=60):
for l in realCmd.split('\n'):
if not l.startswith('#'):
logSys.debug('exec-cmd: `%s`', l)
else:
logSys.debug(l)
return True
def test_IPAddr(self):
self.assertTrue(IPAddr('192.0.2.1').isIPv4)
self.assertTrue(IPAddr('2001:DB8::').isIPv6)
if STOCK:
def testCheckStockJailActions(self):
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_config=self.__share_cfg) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine
stream = jails.convert(allow_no_files=True)
server = TestServer()
transm = server._Server__transm
cmdHandler = transm._Transmitter__commandHandler
# for cmd in stream:
# print(cmd)
# filter all start commands (we want not start all jails):
for cmd in stream:
if cmd[0] != 'start':
# change to the fast init backend:
if cmd[0] == 'add':
cmd[2] = 'polling'
# change log path to test log of jail (to prevent "Permission denied" on /var/logs/ for test-user):
elif len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addlogpath':
cmd[3] = os.path.join(TEST_FILES_DIR, 'logs', cmd[1])
# add dummy regex to prevent too long compile of all regexp (we don't use it in this test at all):
# [todo sebres] remove `not hasattr(unittest, 'F2B') or `, after merge with "f2b-perfom-prepare-716" ...
elif (not hasattr(unittest, 'F2B') or unittest.F2B.fast) and (
len(cmd) > 3 and cmd[0] in ('set', 'multi-set') and cmd[2] == 'addfailregex'
):
cmd[0] = "set"
cmd[3] = "DUMMY-REGEX <HOST>"
# command to server, use cmdHandler direct instead of `transm.proceed(cmd)`:
try:
cmdHandler(cmd)
except Exception, e: # pragma: no cover
self.fail("Command %r has failed. Received %r" % (cmd, e))
# jails = server._Server__jails
# for j in jails:
# print(j, jails[j])
def getDefaultJailStream(self, jail, act):
act = act.replace('%(__name__)s', jail)
actName, actOpt = JailReader.extractOptions(act)
stream = [
['add', jail, 'polling'],
# ['set', jail, 'addfailregex', 'DUMMY-REGEX <HOST>'],
]
action = ActionReader(
actName, jail, actOpt,
share_config=self.__share_cfg, basedir=CONFIG_DIR)
self.assertTrue(action.read())
action.getOptions({})
stream.extend(action.convert())
return stream
def testCheckStockCommandActions(self):
# test cases to check valid ipv4/ipv6 action definition, tuple with (('jail', 'action[params]', 'tests', ...)
# where tests is a dictionary contains:
# 'ip4' - should not be found (logged) on ban/unban of IPv6 (negative test),
# 'ip6' - should not be found (logged) on ban/unban of IPv4 (negative test),
# 'start', 'stop' - should be found (logged) on action start/stop,
# etc.
testJailsActions = (
# iptables-multiport --
('j-w-iptables-mp', 'iptables-multiport[name=%(__name__)s, bantime="600", port="http,https", protocol="tcp", chain="INPUT"]', {
'ip4': ('`iptables ', 'icmp-port-unreachable'), 'ip6': ('`ip6tables ', 'icmp6-port-unreachable'),
'start': (
"`iptables -w -N f2b-j-w-iptables-mp`",
"`iptables -w -A f2b-j-w-iptables-mp -j RETURN`",
"`iptables -w -I INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
"`ip6tables -w -N f2b-j-w-iptables-mp`",
"`ip6tables -w -A f2b-j-w-iptables-mp -j RETURN`",
"`ip6tables -w -I INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
),
'stop': (
"`iptables -w -D INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
"`iptables -w -F f2b-j-w-iptables-mp`",
"`iptables -w -X f2b-j-w-iptables-mp`",
"`ip6tables -w -D INPUT -p tcp -m multiport --dports http,https -j f2b-j-w-iptables-mp`",
"`ip6tables -w -F f2b-j-w-iptables-mp`",
"`ip6tables -w -X f2b-j-w-iptables-mp`",
),
'ip4-check': (
r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-mp[ \t]'`""",
),
'ip6-check': (
r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-mp[ \t]'`""",
),
'ip4-ban': (
r"`iptables -w -I f2b-j-w-iptables-mp 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip4-unban': (
r"`iptables -w -D f2b-j-w-iptables-mp -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip6-ban': (
r"`ip6tables -w -I f2b-j-w-iptables-mp 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
'ip6-unban': (
r"`ip6tables -w -D f2b-j-w-iptables-mp -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
}),
# iptables-allports --
('j-w-iptables-ap', 'iptables-allports[name=%(__name__)s, bantime="600", protocol="tcp", chain="INPUT"]', {
'ip4': ('`iptables ', 'icmp-port-unreachable'), 'ip6': ('`ip6tables ', 'icmp6-port-unreachable'),
'start': (
"`iptables -w -N f2b-j-w-iptables-ap`",
"`iptables -w -A f2b-j-w-iptables-ap -j RETURN`",
"`iptables -w -I INPUT -p tcp -j f2b-j-w-iptables-ap`",
"`ip6tables -w -N f2b-j-w-iptables-ap`",
"`ip6tables -w -A f2b-j-w-iptables-ap -j RETURN`",
"`ip6tables -w -I INPUT -p tcp -j f2b-j-w-iptables-ap`",
),
'stop': (
"`iptables -w -D INPUT -p tcp -j f2b-j-w-iptables-ap`",
"`iptables -w -F f2b-j-w-iptables-ap`",
"`iptables -w -X f2b-j-w-iptables-ap`",
"`ip6tables -w -D INPUT -p tcp -j f2b-j-w-iptables-ap`",
"`ip6tables -w -F f2b-j-w-iptables-ap`",
"`ip6tables -w -X f2b-j-w-iptables-ap`",
),
'ip4-check': (
r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-ap[ \t]'`""",
),
'ip6-check': (
r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-ap[ \t]'`""",
),
'ip4-ban': (
r"`iptables -w -I f2b-j-w-iptables-ap 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip4-unban': (
r"`iptables -w -D f2b-j-w-iptables-ap -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip6-ban': (
r"`ip6tables -w -I f2b-j-w-iptables-ap 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
'ip6-unban': (
r"`ip6tables -w -D f2b-j-w-iptables-ap -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
}),
# iptables-ipset-proto6 --
('j-w-iptables-ipset', 'iptables-ipset-proto6[name=%(__name__)s, bantime="600", port="http", protocol="tcp", chain="INPUT"]', {
'ip4': (' f2b-j-w-iptables-ipset ',), 'ip6': (' f2b-j-w-iptables-ipset6 ',),
'start': (
"`ipset create f2b-j-w-iptables-ipset hash:ip timeout 600`",
"`iptables -w -I INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset create f2b-j-w-iptables-ipset6 hash:ip timeout 600 family inet6`",
"`ip6tables -w -I INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`",
),
'stop': (
"`iptables -w -D INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset`",
"`ipset destroy f2b-j-w-iptables-ipset`",
"`ip6tables -w -D INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset6`",
"`ipset destroy f2b-j-w-iptables-ipset6`",
),
'ip4-check': (),
'ip6-check': (),
'ip4-ban': (
r"`ipset add f2b-j-w-iptables-ipset 192.0.2.1 timeout 600 -exist`",
),
'ip4-unban': (
r"`ipset del f2b-j-w-iptables-ipset 192.0.2.1 -exist`",
),
'ip6-ban': (
r"`ipset add f2b-j-w-iptables-ipset6 2001:db8:: timeout 600 -exist`",
),
'ip6-unban': (
r"`ipset del f2b-j-w-iptables-ipset6 2001:db8:: -exist`",
),
}),
# iptables-ipset-proto6-allports --
('j-w-iptables-ipset-ap', 'iptables-ipset-proto6-allports[name=%(__name__)s, bantime="600", chain="INPUT"]', {
'ip4': (' f2b-j-w-iptables-ipset-ap ',), 'ip6': (' f2b-j-w-iptables-ipset-ap6 ',),
'start': (
"`ipset create f2b-j-w-iptables-ipset-ap hash:ip timeout 600`",
"`iptables -w -I INPUT -m set --match-set f2b-j-w-iptables-ipset-ap src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset create f2b-j-w-iptables-ipset-ap6 hash:ip timeout 600 family inet6`",
"`ip6tables -w -I INPUT -m set --match-set f2b-j-w-iptables-ipset-ap6 src -j REJECT --reject-with icmp6-port-unreachable`",
),
'stop': (
"`iptables -w -D INPUT -m set --match-set f2b-j-w-iptables-ipset-ap src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset-ap`",
"`ipset destroy f2b-j-w-iptables-ipset-ap`",
"`ip6tables -w -D INPUT -m set --match-set f2b-j-w-iptables-ipset-ap6 src -j REJECT --reject-with icmp6-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset-ap6`",
"`ipset destroy f2b-j-w-iptables-ipset-ap6`",
),
'ip4-check': (),
'ip6-check': (),
'ip4-ban': (
r"`ipset add f2b-j-w-iptables-ipset-ap 192.0.2.1 timeout 600 -exist`",
),
'ip4-unban': (
r"`ipset del f2b-j-w-iptables-ipset-ap 192.0.2.1 -exist`",
),
'ip6-ban': (
r"`ipset add f2b-j-w-iptables-ipset-ap6 2001:db8:: timeout 600 -exist`",
),
'ip6-unban': (
r"`ipset del f2b-j-w-iptables-ipset-ap6 2001:db8:: -exist`",
),
}),
# iptables --
('j-w-iptables', 'iptables[name=%(__name__)s, bantime="600", port="http", protocol="tcp", chain="INPUT"]', {
'ip4': ('`iptables ', 'icmp-port-unreachable'), 'ip6': ('`ip6tables ', 'icmp6-port-unreachable'),
'start': (
"`iptables -w -N f2b-j-w-iptables`",
"`iptables -w -A f2b-j-w-iptables -j RETURN`",
"`iptables -w -I INPUT -p tcp --dport http -j f2b-j-w-iptables`",
"`ip6tables -w -N f2b-j-w-iptables`",
"`ip6tables -w -A f2b-j-w-iptables -j RETURN`",
"`ip6tables -w -I INPUT -p tcp --dport http -j f2b-j-w-iptables`",
),
'stop': (
"`iptables -w -D INPUT -p tcp --dport http -j f2b-j-w-iptables`",
"`iptables -w -F f2b-j-w-iptables`",
"`iptables -w -X f2b-j-w-iptables`",
"`ip6tables -w -D INPUT -p tcp --dport http -j f2b-j-w-iptables`",
"`ip6tables -w -F f2b-j-w-iptables`",
"`ip6tables -w -X f2b-j-w-iptables`",
),
'ip4-check': (
r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables[ \t]'`""",
),
'ip6-check': (
r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables[ \t]'`""",
),
'ip4-ban': (
r"`iptables -w -I f2b-j-w-iptables 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip4-unban': (
r"`iptables -w -D f2b-j-w-iptables -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip6-ban': (
r"`ip6tables -w -I f2b-j-w-iptables 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
'ip6-unban': (
r"`ip6tables -w -D f2b-j-w-iptables -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
}),
# iptables-new --
('j-w-iptables-new', 'iptables-new[name=%(__name__)s, bantime="600", port="http", protocol="tcp", chain="INPUT"]', {
'ip4': ('`iptables ', 'icmp-port-unreachable'), 'ip6': ('`ip6tables ', 'icmp6-port-unreachable'),
'start': (
"`iptables -w -N f2b-j-w-iptables-new`",
"`iptables -w -A f2b-j-w-iptables-new -j RETURN`",
"`iptables -w -I INPUT -m state --state NEW -p tcp --dport http -j f2b-j-w-iptables-new`",
"`ip6tables -w -N f2b-j-w-iptables-new`",
"`ip6tables -w -A f2b-j-w-iptables-new -j RETURN`",
"`ip6tables -w -I INPUT -m state --state NEW -p tcp --dport http -j f2b-j-w-iptables-new`",
),
'stop': (
"`iptables -w -D INPUT -m state --state NEW -p tcp --dport http -j f2b-j-w-iptables-new`",
"`iptables -w -F f2b-j-w-iptables-new`",
"`iptables -w -X f2b-j-w-iptables-new`",
"`ip6tables -w -D INPUT -m state --state NEW -p tcp --dport http -j f2b-j-w-iptables-new`",
"`ip6tables -w -F f2b-j-w-iptables-new`",
"`ip6tables -w -X f2b-j-w-iptables-new`",
),
'ip4-check': (
r"""`iptables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-new[ \t]'`""",
),
'ip6-check': (
r"""`ip6tables -w -n -L INPUT | grep -q 'f2b-j-w-iptables-new[ \t]'`""",
),
'ip4-ban': (
r"`iptables -w -I f2b-j-w-iptables-new 1 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip4-unban': (
r"`iptables -w -D f2b-j-w-iptables-new -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
),
'ip6-ban': (
r"`ip6tables -w -I f2b-j-w-iptables-new 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
'ip6-unban': (
r"`ip6tables -w -D f2b-j-w-iptables-new -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
),
}),
# iptables-xt_recent-echo --
('j-w-iptables-xtre', 'iptables-xt_recent-echo[name=%(__name__)s, bantime="600", chain="INPUT"]', {
'ip4': ('`iptables ', '/f2b-j-w-iptables-xtre`'), 'ip6': ('`ip6tables ', '/f2b-j-w-iptables-xtre6`'),
'start': (
"`if [ `id -u` -eq 0 ];then iptables -w -I INPUT -m recent --update --seconds 3600 --name f2b-j-w-iptables-xtre -j REJECT --reject-with icmp-port-unreachable;fi`",
"`if [ `id -u` -eq 0 ];then ip6tables -w -I INPUT -m recent --update --seconds 3600 --name f2b-j-w-iptables-xtre6 -j REJECT --reject-with icmp6-port-unreachable;fi`",
),
'stop': (
"`echo / > /proc/net/xt_recent/f2b-j-w-iptables-xtre`",
"`if [ `id -u` -eq 0 ];then iptables -w -D INPUT -m recent --update --seconds 3600 --name f2b-j-w-iptables-xtre -j REJECT --reject-with icmp-port-unreachable;fi`",
"`echo / > /proc/net/xt_recent/f2b-j-w-iptables-xtre6`",
"`if [ `id -u` -eq 0 ];then ip6tables -w -D INPUT -m recent --update --seconds 3600 --name f2b-j-w-iptables-xtre6 -j REJECT --reject-with icmp6-port-unreachable;fi`",
),
'ip4-check': (
r"`test -e /proc/net/xt_recent/f2b-j-w-iptables-xtre`",
),
'ip6-check': (
r"`test -e /proc/net/xt_recent/f2b-j-w-iptables-xtre6`",
),
'ip4-ban': (
r"`echo +192.0.2.1 > /proc/net/xt_recent/f2b-j-w-iptables-xtre`",
),
'ip4-unban': (
r"`echo -192.0.2.1 > /proc/net/xt_recent/f2b-j-w-iptables-xtre`",
),
'ip6-ban': (
r"`echo +2001:db8:: > /proc/net/xt_recent/f2b-j-w-iptables-xtre6`",
),
'ip6-unban': (
r"`echo -2001:db8:: > /proc/net/xt_recent/f2b-j-w-iptables-xtre6`",
),
}),
# pf --
('j-w-pf', 'pf[name=%(__name__)s]', {
'ip4': (), 'ip6': (),
'start': (
'`echo "table <f2b-j-w-pf> persist counters" | pfctl -f-`',
'`echo "block proto tcp from <f2b-j-w-pf> to any port any" | pfctl -f-`',
),
'stop': (
'`pfctl -sr 2>/dev/null | grep -v f2b-j-w-pf | pfctl -f-`',
'`pfctl -t f2b-j-w-pf -T flush`',
'`pfctl -t f2b-j-w-pf -T kill`',
),
'ip4-check': ("`pfctl -sr | grep -q f2b-j-w-pf`",),
'ip6-check': ("`pfctl -sr | grep -q f2b-j-w-pf`",),
'ip4-ban': ("`pfctl -t f2b-j-w-pf -T add 192.0.2.1`",),
'ip4-unban': ("`pfctl -t f2b-j-w-pf -T delete 192.0.2.1`",),
'ip6-ban': ("`pfctl -t f2b-j-w-pf -T add 2001:db8::`",),
'ip6-unban': ("`pfctl -t f2b-j-w-pf -T delete 2001:db8::`",),
}),
)
server = TestServer()
transm = server._Server__transm
cmdHandler = transm._Transmitter__commandHandler
for jail, act, tests in testJailsActions:
stream = self.getDefaultJailStream(jail, act)
# for cmd in stream:
# print(cmd)
# filter all start commands (we want not start all jails):
for cmd in stream:
# command to server:
ret, res = transm.proceed(cmd)
self.assertEqual(ret, 0)
jails = server._Server__jails
for jail, act, tests in testJailsActions:
# print(jail, jails[jail])
for a in jails[jail].actions:
action = jails[jail].actions[a]
logSys.debug('# ' + ('=' * 50))
logSys.debug('# == %-44s ==', jail + ' - ' + action._name)
logSys.debug('# ' + ('=' * 50))
self.assertTrue(isinstance(action, _actions.CommandAction))
# wrap default command processor:
action.executeCmd = self._executeCmd
# test start :
logSys.debug('# === start ==='); self.pruneLog()
action.start()
self.assertLogged(*tests['start'], all=True)
# test ban ip4 :
logSys.debug('# === ban-ipv4 ==='); self.pruneLog()
action.ban({'ip': IPAddr('192.0.2.1')})
self.assertLogged(*tests['ip4-check']+tests['ip4-ban'], all=True)
self.assertNotLogged(*tests['ip6'], all=True)
# test unban ip4 :
logSys.debug('# === unban ipv4 ==='); self.pruneLog()
action.unban({'ip': IPAddr('192.0.2.1')})
self.assertLogged(*tests['ip4-check']+tests['ip4-unban'], all=True)
self.assertNotLogged(*tests['ip6'], all=True)
# test ban ip6 :
logSys.debug('# === ban ipv6 ==='); self.pruneLog()
action.ban({'ip': IPAddr('2001:DB8::')})
self.assertLogged(*tests['ip6-check']+tests['ip6-ban'], all=True)
self.assertNotLogged(*tests['ip4'], all=True)
# test unban ip6 :
logSys.debug('# === unban ipv6 ==='); self.pruneLog()
action.unban({'ip': IPAddr('2001:DB8::')})
self.assertLogged(*tests['ip6-check']+tests['ip6-unban'], all=True)
self.assertNotLogged(*tests['ip4'], all=True)
# test stop :
logSys.debug('# === stop ==='); self.pruneLog()
action.stop()
self.assertLogged(*tests['stop'], all=True)