TST: Add tests for transmitter journalmatch

pull/224/head
Steven Hiscocks 2013-05-13 23:42:09 +01:00
parent fe3ed176df
commit c1226afe92
2 changed files with 87 additions and 3 deletions

View File

@ -650,13 +650,13 @@ class FileContainer:
class JournalFilter(Filter): # pragma: systemd no cover
def addJournalMatch(self, match):
def addJournalMatch(self, match): # pragma: no cover - Base class, not used
pass
def delJournalMatch(self, match):
def delJournalMatch(self, match): # pragma: no cover - Base class, not used
pass
def getJournalMatch(self, match):
def getJournalMatch(self, match): # pragma: no cover - Base class, not used
return []
##

View File

@ -28,6 +28,10 @@ import unittest, socket, time, tempfile, os, locale
from fail2ban.server.server import Server
from fail2ban.exceptions import UnknownJailException
try:
from fail2ban.server import filtersystemd
except ImportError:
filtersystemd = None
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
@ -498,6 +502,86 @@ class Transmitter(TransmitterBase):
self.assertEqual(
self.transm.proceed(["status", "INVALID", "COMMAND"])[0],1)
if filtersystemd: # pragma: systemd no cover
def testJournalMatch(self):
jailName = "TestJail2"
self.server.addJail(jailName, "systemd")
self.jailAddDelTest(
"journalmatch",
[
"_SYSTEMD_UNIT=sshd.service",
"TEST_FIELD1=ABC TEST_FIELD2=123",
"_HOSTNAME=example.com",
],
jailName
)
values = [
'"FIELD=Test + Value+ \\\"Test+Value=\'Test"',
'FIELD="Test + Value+ \\\"Test+Value=\'Test"',
]
# Test shell like escaping for spaces
for value in values:
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch", value]),
(0, ["FIELD=Test + Value+ \"Test+Value='Test"]))
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value]),
(0, []))
# Try duplicates
value = "_COMM=sshd"
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch", value]),
(0, [value]))
# Duplicates are accepted, as automatically OR'd, and journalctl
# also accepts them without issue.
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch", value]),
(0, [value, value]))
# Remove first instance
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value]),
(0, [value]))
# Remove second instance
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value]),
(0, []))
# Test splitting of OR'd values
value1, value2 = "_COMM=sshd", "_SYSTEMD_UNIT=sshd.service"
self.assertEqual(
self.transm.proceed(
["set", jailName, "addjournalmatch",
" + ".join([value1, value2])]),
(0, [value1, value2]))
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value1]),
(0, [value2]))
self.assertEqual(
self.transm.proceed(
["set", jailName, "deljournalmatch", value2]),
(0, []))
# Invalid match
value = "This isn't valid!"
result = self.transm.proceed(
["set", jailName, "addjournalmatch", value])
self.assertTrue(isinstance(result[1], ValueError))
# Delete invalid match
value = "FIELD=NotPresent"
result = self.transm.proceed(
["set", jailName, "deljournalmatch", value])
self.assertTrue(isinstance(result[1], ValueError))
class TransmitterLogging(TransmitterBase):
def setUp(self):