Merge branch 'fix-readline-multibyte'

pull/2337/merge
sebres 2021-03-25 12:13:18 +01:00
commit d8e450cf12
10 changed files with 273 additions and 112 deletions

View File

@ -18,12 +18,17 @@ ver. 1.0.1-dev-1 (20??/??/??) - development nightly edition
different from 0) in case of unsane environment.
### Fixes
* readline fixed to consider interim new-line character as part of code point in multi-byte logs
(e. g. unicode encoding like utf-16be, utf-16le);
* `filter.d/drupal-auth.conf` more strict regex, extended to match "Login attempt failed from" (gh-2742)
### New Features and Enhancements
* `actioncheck` behavior is changed now (gh-488), so invariant check as well as restore or repair
of sane environment (in case of recognized unsane state) would only occur on action errors (e. g.
if ban or unban operations are exiting with other code as 0)
* better recognition of log rotation, better performance by reopen: avoid unnecessary seek to begin of file
(and hash calculation)
* file filter reads only complete lines (ended with new-line) now, so waits for end of line (for its completion)
ver. 0.11.2 (2020/11/23) - heal-the-world-with-security-tools

View File

@ -289,9 +289,6 @@ class Fail2banRegex(object):
def output(self, line):
if not self._opts.out: output(line)
def decode_line(self, line):
return FileContainer.decode_line('<LOG>', self._encoding, line)
def encode_line(self, line):
return line.encode(self._encoding, 'ignore')
@ -723,10 +720,6 @@ class Fail2banRegex(object):
return True
def file_lines_gen(self, hdlr):
for line in hdlr:
yield self.decode_line(line)
def start(self, args):
cmd_log, cmd_regex = args[:2]
@ -745,10 +738,10 @@ class Fail2banRegex(object):
if os.path.isfile(cmd_log):
try:
hdlr = open(cmd_log, 'rb')
test_lines = FileContainer(cmd_log, self._encoding, doOpen=True)
self.output( "Use log file : %s" % cmd_log )
self.output( "Use encoding : %s" % self._encoding )
test_lines = self.file_lines_gen(hdlr)
except IOError as e: # pragma: no cover
output( e )
return False

View File

@ -502,7 +502,7 @@ class Fail2BanDb(object):
except TypeError:
firstLineMD5 = None
if not firstLineMD5 and (pos or md5):
if firstLineMD5 is None and (pos or md5 is not None):
cur.execute(
"INSERT OR REPLACE INTO logs(jail, path, firstlinemd5, lastfilepos) "
"VALUES(?, ?, ?, ?)", (jail.name, name, md5, pos))

View File

@ -1131,14 +1131,14 @@ class FileFilter(Filter):
while not self.idle:
line = log.readline()
if not self.active: break; # jail has been stopped
if not line:
if line is None:
# The jail reached the bottom, simply set in operation for this log
# (since we are first time at end of file, growing is only possible after modifications):
log.inOperation = True
break
# acquire in operation from log and process:
self.inOperation = inOperation if inOperation is not None else log.inOperation
self.processLineAndAdd(line.rstrip('\r\n'))
self.processLineAndAdd(line)
finally:
log.close()
db = self.jail.database
@ -1155,6 +1155,8 @@ class FileFilter(Filter):
if logSys.getEffectiveLevel() <= logging.DEBUG:
logSys.debug("Seek to find time %s (%s), file size %s", date,
MyTime.time2str(date), fs)
if not fs:
return
minp = container.getPos()
maxp = fs
tryPos = minp
@ -1178,8 +1180,8 @@ class FileFilter(Filter):
dateTimeMatch = None
nextp = None
while True:
line = container.readline()
if not line:
line = container.readline(False)
if line is None:
break
(timeMatch, template) = self.dateDetector.matchTime(line)
if timeMatch:
@ -1276,25 +1278,34 @@ except ImportError: # pragma: no cover
class FileContainer:
def __init__(self, filename, encoding, tail=False):
def __init__(self, filename, encoding, tail=False, doOpen=False):
self.__filename = filename
self.waitForLineEnd = True
self.setEncoding(encoding)
self.__tail = tail
self.__handler = None
self.__pos = 0
self.__pos4hash = 0
self.__hash = ''
self.__hashNextTime = time.time() + 30
# Try to open the file. Raises an exception if an error occurred.
handler = open(filename, 'rb')
stats = os.fstat(handler.fileno())
self.__ino = stats.st_ino
if doOpen: # fail2ban-regex only (don't need to reopen it and check for rotation)
self.__handler = handler
return
try:
firstLine = handler.readline()
# Computes the MD5 of the first line.
self.__hash = md5sum(firstLine).hexdigest()
# Start at the beginning of file if tail mode is off.
if tail:
handler.seek(0, 2)
self.__pos = handler.tell()
else:
self.__pos = 0
stats = os.fstat(handler.fileno())
self.__ino = stats.st_ino
if stats.st_size:
firstLine = handler.readline()
# first line available and contains new-line:
if firstLine != firstLine.rstrip(b'\r\n'):
# Computes the MD5 of the first line.
self.__hash = md5sum(firstLine).hexdigest()
# if tail mode scroll to the end of file
if tail:
handler.seek(0, 2)
self.__pos = handler.tell()
finally:
handler.close()
## shows that log is in operation mode (expecting new messages only from here):
@ -1304,6 +1315,10 @@ class FileContainer:
return self.__filename
def getFileSize(self):
h = self.__handler
if h is not None:
stats = os.fstat(h.fileno())
return stats.st_size
return os.path.getsize(self.__filename);
def setEncoding(self, encoding):
@ -1322,38 +1337,54 @@ class FileContainer:
def setPos(self, value):
self.__pos = value
def open(self):
self.__handler = open(self.__filename, 'rb')
# Set the file descriptor to be FD_CLOEXEC
fd = self.__handler.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
# Stat the file before even attempting to read it
stats = os.fstat(self.__handler.fileno())
if not stats.st_size:
# yoh: so it is still an empty file -- nothing should be
# read from it yet
# print "D: no content -- return"
return False
firstLine = self.__handler.readline()
# Computes the MD5 of the first line.
myHash = md5sum(firstLine).hexdigest()
## print "D: fn=%s hashes=%s/%s inos=%s/%s pos=%s rotate=%s" % (
## self.__filename, self.__hash, myHash, stats.st_ino, self.__ino, self.__pos,
## self.__hash != myHash or self.__ino != stats.st_ino)
## sys.stdout.flush()
# Compare hash and inode
if self.__hash != myHash or self.__ino != stats.st_ino:
logSys.log(logging.MSG, "Log rotation detected for %s", self.__filename)
self.__hash = myHash
self.__ino = stats.st_ino
self.__pos = 0
# Sets the file pointer to the last position.
self.__handler.seek(self.__pos)
def open(self, forcePos=None):
h = open(self.__filename, 'rb')
try:
# Set the file descriptor to be FD_CLOEXEC
fd = h.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
myHash = self.__hash
# Stat the file before even attempting to read it
stats = os.fstat(h.fileno())
rotflg = stats.st_size < self.__pos or stats.st_ino != self.__ino
if rotflg or not len(myHash) or time.time() > self.__hashNextTime:
myHash = ''
firstLine = h.readline()
# Computes the MD5 of the first line (if it is complete)
if firstLine != firstLine.rstrip(b'\r\n'):
myHash = md5sum(firstLine).hexdigest()
self.__hashNextTime = time.time() + 30
elif stats.st_size == self.__pos:
myHash = self.__hash
# Compare size, hash and inode
if rotflg or myHash != self.__hash:
if self.__hash != '':
logSys.log(logging.MSG, "Log rotation detected for %s, reason: %r", self.__filename,
(stats.st_size, self.__pos, stats.st_ino, self.__ino, myHash, self.__hash))
self.__ino = stats.st_ino
self.__pos = 0
self.__hash = myHash
# if nothing to read from file yet (empty or no new data):
if forcePos is not None:
self.__pos = forcePos
elif stats.st_size <= self.__pos:
return False
# Sets the file pointer to the last position.
h.seek(self.__pos)
# leave file open (to read content):
self.__handler = h; h = None
finally:
# close (no content or error only)
if h:
h.close(); h = None
return True
def seek(self, offs, endLine=True):
h = self.__handler
if h is None:
self.open(offs)
h = self.__handler
# seek to given position
h.seek(offs, 0)
# goto end of next line
@ -1371,6 +1402,9 @@ class FileContainer:
try:
return line.decode(enc, 'strict')
except (UnicodeDecodeError, UnicodeEncodeError) as e:
# avoid warning if got incomplete end of line (e. g. '\n' in "...[0A" followed by "00]..." for utf-16le:
if (e.end == len(line) and line[e.start] in b'\r\n'):
return line[0:e.start].decode(enc, 'replace')
global _decode_line_warn
lev = 7
if not _decode_line_warn.get(filename, 0):
@ -1379,29 +1413,85 @@ class FileContainer:
logSys.log(lev,
"Error decoding line from '%s' with '%s'.", filename, enc)
if logSys.getEffectiveLevel() <= lev:
logSys.log(lev, "Consider setting logencoding=utf-8 (or another appropriate"
" encoding) for this jail. Continuing"
" to process line ignoring invalid characters: %r",
logSys.log(lev,
"Consider setting logencoding to appropriate encoding for this jail. "
"Continuing to process line ignoring invalid characters: %r",
line)
# decode with replacing error chars:
line = line.decode(enc, 'replace')
return line
def readline(self):
def readline(self, complete=True):
"""Read line from file
In opposite to pythons readline it doesn't return new-line,
so returns either the line if line is complete (and complete=True) or None
if line is not complete (and complete=True) or there is no content to read.
If line is complete (and complete is True), it also shift current known
position to begin of next line.
Also it is safe against interim new-line bytes (e. g. part of multi-byte char)
in given encoding.
"""
if self.__handler is None:
return ""
return FileContainer.decode_line(
self.getFileName(), self.getEncoding(), self.__handler.readline())
# read raw bytes up to \n char:
b = self.__handler.readline()
if not b:
return None
bl = len(b)
# convert to log-encoding (new-line char could disappear if it is part of multi-byte sequence):
r = FileContainer.decode_line(
self.getFileName(), self.getEncoding(), b)
# trim new-line at end and check the line was written complete (contains a new-line):
l = r.rstrip('\r\n')
if complete:
if l == r:
# try to fill buffer in order to find line-end in log encoding:
fnd = 0
while 1:
r = self.__handler.readline()
if not r:
break
b += r
bl += len(r)
# convert to log-encoding:
r = FileContainer.decode_line(
self.getFileName(), self.getEncoding(), b)
# ensure new-line is not in the middle (buffered 2 strings, e. g. in utf-16le it is "...[0A"+"00]..."):
e = r.find('\n')
if e >= 0 and e != len(r)-1:
l, r = r[0:e], r[0:e+1]
# back to bytes and get offset to seek after NL:
r = r.encode(self.getEncoding(), 'replace')
self.__handler.seek(-bl+len(r), 1)
return l
# trim new-line at end and check the line was written complete (contains a new-line):
l = r.rstrip('\r\n')
if l != r:
return l
if self.waitForLineEnd:
# not fulfilled - seek back and return:
self.__handler.seek(-bl, 1)
return None
return l
def close(self):
if not self.__handler is None:
# Saves the last position.
if self.__handler is not None:
# Saves the last real position.
self.__pos = self.__handler.tell()
# Closes the file.
self.__handler.close()
self.__handler = None
## print "D: Closed %s with pos %d" % (handler, self.__pos)
## sys.stdout.flush()
def __iter__(self):
return self
def next(self):
line = self.readline()
if line is None:
self.close()
raise StopIteration
return line
_decode_line_warn = Utils.Cache(maxCount=1000, maxTime=24*60*60);

View File

@ -332,11 +332,9 @@ class Utils():
timeout_expr = lambda: time.time() > time0
else:
timeout_expr = timeout
if not interval:
interval = Utils.DEFAULT_SLEEP_INTERVAL
if timeout_expr():
break
stm = min(stm + interval, Utils.DEFAULT_SLEEP_TIME)
stm = min(stm + (interval or Utils.DEFAULT_SLEEP_INTERVAL), Utils.DEFAULT_SLEEP_TIME)
time.sleep(stm)
return ret

View File

@ -212,19 +212,20 @@ class DatabaseTest(LogCaptureTestCase):
self.jail.name in self.db.getJailNames(True),
"Jail not added to database")
def testAddLog(self):
def _testAddLog(self):
self.testAddJail() # Jail required
_, filename = tempfile.mkstemp(".log", "Fail2BanDb_")
self.fileContainer = FileContainer(filename, "utf-8")
self.db.addLog(self.jail, self.fileContainer)
pos = self.db.addLog(self.jail, self.fileContainer)
self.assertTrue(pos is None); # unknown previously
self.assertIn(filename, self.db.getLogPaths(self.jail))
os.remove(filename)
def testUpdateLog(self):
self.testAddLog() # Add log file
self._testAddLog() # Add log file
# Write some text
filename = self.fileContainer.getFileName()

View File

@ -230,7 +230,7 @@ def _start_params(tmp, use_stock=False, use_stock_cfg=None,
os.symlink(os.path.abspath(pjoin(STOCK_CONF_DIR, n)), pjoin(cfg, n))
if create_before_start:
for n in create_before_start:
_write_file(n % {'tmp': tmp}, 'w', '')
_write_file(n % {'tmp': tmp}, 'w')
# parameters (sock/pid and config, increase verbosity, set log, etc.):
vvv, llev = (), "INFO"
if unittest.F2B.log_level < logging.INFO: # pragma: no cover
@ -937,10 +937,8 @@ class Fail2banServerTest(Fail2banClientServerBase):
"Jail 'broken-jail' skipped, because of wrong configuration", all=True)
# enable both jails, 3 logs for jail1, etc...
# truncate test-log - we should not find unban/ban again by reload:
self.pruneLog("[test-phase 1b]")
_write_jail_cfg(actions=[1,2])
_write_file(test1log, "w+")
if unittest.F2B.log_level < logging.DEBUG: # pragma: no cover
_out_file(test1log)
self.execCmd(SUCCESS, startparams, "reload")
@ -1003,7 +1001,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.pruneLog("[test-phase 2b]")
# write new failures:
_write_file(test2log, "w+", *(
_write_file(test2log, "a+", *(
(str(int(MyTime.time())) + " error 403 from 192.0.2.2: test 2",) * 3 +
(str(int(MyTime.time())) + " error 403 from 192.0.2.3: test 2",) * 3 +
(str(int(MyTime.time())) + " failure 401 from 192.0.2.4: test 2",) * 3 +
@ -1062,10 +1060,6 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.assertEqual(self.execCmdDirect(startparams,
'get', 'test-jail1', 'banned', '192.0.2.3', '192.0.2.9')[1], [1, 0])
# rotate logs:
_write_file(test1log, "w+")
_write_file(test2log, "w+")
# restart jail without unban all:
self.pruneLog("[test-phase 2c]")
self.execCmd(SUCCESS, startparams,
@ -1183,7 +1177,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
# now write failures again and check already banned (jail1 was alive the whole time) and new bans occurred (jail1 was alive the whole time):
self.pruneLog("[test-phase 5]")
_write_file(test1log, "w+", *(
_write_file(test1log, "a+", *(
(str(int(MyTime.time())) + " failure 401 from 192.0.2.1: test 5",) * 3 +
(str(int(MyTime.time())) + " error 403 from 192.0.2.5: test 5",) * 3 +
(str(int(MyTime.time())) + " failure 401 from 192.0.2.6: test 5",) * 3
@ -1469,7 +1463,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.pruneLog("[test-phase sendmail-reject]")
# write log:
_write_file(lgfn, "w+", *smrej_msg)
_write_file(lgfn, "a+", *smrej_msg)
# wait and check it caused banned (and dump in the test-file):
self.assertLogged(
"[sendmail-reject] Ban 192.0.2.2", "stdout: 'found: 0 / 3, banned: 1 / 1'",
@ -1597,7 +1591,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
wakeObs = False
_observer_wait_before_incrban(lambda: wakeObs)
# write again (IP already bad):
_write_file(test1log, "w+", *(
_write_file(test1log, "a+", *(
(str(int(MyTime.time())) + " failure 401 from 192.0.2.11: I'm very bad \"hacker\" `` $(echo test)",) * 2
))
# wait for ban:

View File

@ -25,6 +25,7 @@ __license__ = "GPL"
import os
import sys
import tempfile
import unittest
from ..client import fail2banregex
@ -80,6 +81,11 @@ def _test_exec_command_line(*args):
sys.stderr = _org['stderr']
return _exit_code
def _reset():
# reset global warn-counter:
from ..server.filter import _decode_line_warn
_decode_line_warn.clear()
STR_00 = "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0"
STR_00_NODT = "[sshd] error: PAM: Authentication failure for kevin from 192.0.2.0"
@ -122,6 +128,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
"""Call before every test case."""
LogCaptureTestCase.setUp(self)
setUpMyTime()
_reset()
def tearDown(self):
"""Call after every test case."""
@ -454,14 +461,8 @@ class Fail2banRegexTest(LogCaptureTestCase):
FILENAME_ZZZ_GEN, FILENAME_ZZZ_GEN
))
def _reset(self):
# reset global warn-counter:
from ..server.filter import _decode_line_warn
_decode_line_warn.clear()
def testWronChar(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset()
self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
@ -477,7 +478,6 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronCharDebuggex(self):
unittest.F2B.SkipIfCfgMissing(stock=True)
self._reset()
self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages
"--datepattern", r"^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
@ -490,6 +490,36 @@ class Fail2banRegexTest(LogCaptureTestCase):
self.assertLogged('https://')
def testNLCharAsPartOfUniChar(self):
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='uni')
# test two multi-byte encodings (both contains `\x0A` in either \x02\x0A or \x0A\x02):
for enc in ('utf-16be', 'utf-16le'):
self.pruneLog("[test-phase encoding=%s]" % enc)
try:
fout = open(fname, 'wb')
# test on unicode string containing \x0A as part of uni-char,
# it must produce exactly 2 lines (both are failures):
for l in (
u'1490349000 \u20AC Failed auth: invalid user Test\u020A from 192.0.2.1\n',
u'1490349000 \u20AC Failed auth: invalid user TestI from 192.0.2.2\n'
):
fout.write(l.encode(enc))
fout.close()
self.assertTrue(_test_exec(
"-l", "notice", # put down log-level, because of too many debug-messages
"--encoding", enc,
"--datepattern", r"^EPOCH",
fname, r"Failed .* from <HOST>",
))
self.assertLogged(" encoding : %s" % enc,
"Lines: 2 lines, 0 ignored, 2 matched, 0 missed", all=True)
self.assertNotLogged("Missed line(s)")
finally:
fout.close()
os.unlink(fname)
def testExecCmdLine_Usage(self):
self.assertNotEqual(_test_exec_command_line(), 0)
self.pruneLog()

View File

@ -195,7 +195,7 @@ def _assert_correct_last_attempt(utest, filter_, output, count=None):
_assert_equal_entries(utest, f, o)
def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line=""):
def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line="", lines=None):
"""Copy lines from one file to another (which might be already open)
Returns open fout
@ -212,9 +212,9 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
fin.readline()
# Read
i = 0
lines = []
if not lines: lines = []
while n is None or i < n:
l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n')
l = fin.readline().decode('UTF-8', 'replace').rstrip('\r\n')
if terminal_line is not None and l == terminal_line:
break
lines.append(l)
@ -222,6 +222,7 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
# Write: all at once and flush
if isinstance(fout, str):
fout = open(fout, mode)
DefLogSys.debug(' ++ write %d test lines', len(lines))
fout.write('\n'.join(lines)+'\n')
fout.flush()
if isinstance(in_, str): # pragma: no branch - only used with str in test cases
@ -253,7 +254,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p
# Read/Write
i = 0
while n is None or i < n:
l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n')
l = fin.readline().decode('UTF-8', 'replace').rstrip('\r\n')
if terminal_line is not None and l == terminal_line:
break
journal.send(MESSAGE=l.strip(), **fields)
@ -643,6 +644,19 @@ class LogFile(LogCaptureTestCase):
self.filter = FilterPoll(None)
self.assertRaises(IOError, self.filter.addLogPath, LogFile.MISSING)
def testDecodeLineWarn(self):
# incomplete line (missing byte at end), warning is suppressed:
l = u"correct line\n"
r = l.encode('utf-16le')
self.assertEqual(FileContainer.decode_line('TESTFILE', 'utf-16le', r), l)
self.assertEqual(FileContainer.decode_line('TESTFILE', 'utf-16le', r[0:-1]), l[0:-1])
self.assertNotLogged('Error decoding line')
# complete line (incorrect surrogate in the middle), warning is there:
r = b"incorrect \xc8\x0a line\n"
l = r.decode('utf-8', 'replace')
self.assertEqual(FileContainer.decode_line('TESTFILE', 'utf-8', r), l)
self.assertLogged('Error decoding line')
class LogFileFilterPoll(unittest.TestCase):
@ -1136,13 +1150,15 @@ def get_monitor_failures_testcase(Filter_):
# move aside, but leaving the handle still open...
os.rename(self.name, self.name + '.bak')
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1).close()
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1,
lines=["Aug 14 11:59:59 [logrotate] rotation 1"]).close()
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
# now remove the moved file
_killfile(None, self.name + '.bak')
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3).close()
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3,
lines=["Aug 14 11:59:59 [logrotate] rotation 2"]).close()
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
@ -1196,7 +1212,7 @@ def get_monitor_failures_testcase(Filter_):
os.rename(tmpsub1, tmpsub2 + 'a')
os.mkdir(tmpsub1)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w')
skip=12, n=1, mode='w', lines=["Aug 14 11:59:59 [logrotate] rotation 1"])
self.file.close()
self._wait4failures(2)
@ -1207,7 +1223,7 @@ def get_monitor_failures_testcase(Filter_):
os.mkdir(tmpsub1)
self.waitForTicks(2)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w')
skip=12, n=1, mode='w', lines=["Aug 14 11:59:59 [logrotate] rotation 2"])
self.file.close()
self._wait4failures(3)
@ -1630,16 +1646,49 @@ class GetFailures(LogCaptureTestCase):
def testCRLFFailures01(self):
# We first adjust logfile/failures to end with CR+LF
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
# poor man unix2dos:
fin, fout = open(GetFailures.FILENAME_01, 'rb'), open(fname, 'wb')
for l in fin.read().splitlines():
fout.write(l + b'\r\n')
fin.close()
fout.close()
try:
# poor man unix2dos:
fin, fout = open(GetFailures.FILENAME_01, 'rb'), open(fname, 'wb')
for l in fin.read().splitlines():
fout.write(l + b'\r\n')
fin.close()
fout.close()
# now see if we should be getting the "same" failures
self.testGetFailures01(filename=fname)
_killfile(fout, fname)
# now see if we should be getting the "same" failures
self.testGetFailures01(filename=fname)
finally:
_killfile(fout, fname)
def testNLCharAsPartOfUniChar(self):
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='uni')
# test two multi-byte encodings (both contains `\x0A` in either \x02\x0A or \x0A\x02):
for enc in ('utf-16be', 'utf-16le'):
self.pruneLog("[test-phase encoding=%s]" % enc)
try:
fout = open(fname, 'wb')
tm = int(time.time())
# test on unicode string containing \x0A as part of uni-char,
# it must produce exactly 2 lines (both are failures):
for l in (
u'%s \u20AC Failed auth: invalid user Test\u020A from 192.0.2.1\n' % tm,
u'%s \u20AC Failed auth: invalid user TestI from 192.0.2.2\n' % tm
):
fout.write(l.encode(enc))
fout.close()
self.filter.setLogEncoding(enc)
self.filter.addLogPath(fname, autoSeek=0)
self.filter.setDatePattern((r'^EPOCH',))
self.filter.addFailRegex(r"Failed .* from <HOST>")
self.filter.getFailures(fname)
self.assertLogged(
"[DummyJail] Found 192.0.2.1",
"[DummyJail] Found 192.0.2.2", all=True, wait=True)
finally:
_killfile(fout, fname)
self.filter.delLogPath(fname)
# must find 4 failures and generate 2 tickets (2 IPs with each 2 failures):
self.assertEqual(self.filter.failManager.getFailCount(), (2, 4))
def testGetFailures02(self):
output = ('141.3.81.106', 4, 1124013539.0,

View File

@ -23,7 +23,6 @@ __copyright__ = "Copyright (c) 2013 Steven Hiscocks"
__license__ = "GPL"
import datetime
import fileinput
import inspect
import json
import os
@ -156,12 +155,15 @@ def testSampleRegexsFactory(name, basedir):
i = 0
while i < len(filenames):
filename = filenames[i]; i += 1;
logFile = fileinput.FileInput(os.path.join(TEST_FILES_DIR, "logs",
filename), mode='rb')
logFile = FileContainer(os.path.join(TEST_FILES_DIR, "logs",
filename), 'UTF-8', doOpen=True)
# avoid errors if no NL char at end of test log-file:
logFile.waitForLineEnd = False
ignoreBlock = False
lnnum = 0
for line in logFile:
line = FileContainer.decode_line(logFile.filename(), 'UTF-8', line)
lnnum += 1
jsonREMatch = re.match("^#+ ?(failJSON|(?:file|filter)Options|addFILE):(.+)$", line)
if jsonREMatch:
try:
@ -201,9 +203,8 @@ def testSampleRegexsFactory(name, basedir):
# failJSON - faildata contains info of the failure to check it.
except ValueError as e: # pragma: no cover - we've valid json's
raise ValueError("%s: %s:%i" %
(e, logFile.filename(), logFile.filelineno()))
(e, logFile.getFileName(), lnnum))
line = next(logFile)
line = FileContainer.decode_line(logFile.filename(), 'UTF-8', line)
elif ignoreBlock or line.startswith("#") or not line.strip():
continue
else: # pragma: no cover - normally unreachable
@ -298,7 +299,7 @@ def testSampleRegexsFactory(name, basedir):
import pprint
raise AssertionError("%s: %s on: %s:%i, line:\n %s\nregex (%s):\n %s\n"
"faildata: %s\nfail: %s" % (
fltName, e, logFile.filename(), logFile.filelineno(),
fltName, e, logFile.getFileName(), lnnum,
line, failregex, regexList[failregex] if failregex != -1 else None,
'\n'.join(pprint.pformat(faildata).splitlines()),
'\n'.join(pprint.pformat(fail).splitlines())))