stability: better recognition of rotation (e. g. on hash collision, consider current size and last known position now), no hash of empty file (or not fulfilled line), etc;

performance: avoid unnecessary seek to start of file and hash calculation - now it occurs only if file really rotated (ino changing or size shrinking), otherwise not earlier than in 30 seconds;
avoid unneeded log-rotation in tests
pull/2337/merge
sebres 2021-03-21 23:35:09 +01:00
parent 725354c793
commit 9bdc4be6cc
5 changed files with 88 additions and 65 deletions

View File

@ -502,7 +502,7 @@ class Fail2BanDb(object):
except TypeError: except TypeError:
firstLineMD5 = None firstLineMD5 = None
if not firstLineMD5 and (pos or md5): if firstLineMD5 is None and (pos or md5 is not None):
cur.execute( cur.execute(
"INSERT OR REPLACE INTO logs(jail, path, firstlinemd5, lastfilepos) " "INSERT OR REPLACE INTO logs(jail, path, firstlinemd5, lastfilepos) "
"VALUES(?, ?, ?, ?)", (jail.name, name, md5, pos)) "VALUES(?, ?, ?, ?)", (jail.name, name, md5, pos))

View File

@ -1155,6 +1155,8 @@ class FileFilter(Filter):
if logSys.getEffectiveLevel() <= logging.DEBUG: if logSys.getEffectiveLevel() <= logging.DEBUG:
logSys.debug("Seek to find time %s (%s), file size %s", date, logSys.debug("Seek to find time %s (%s), file size %s", date,
MyTime.time2str(date), fs) MyTime.time2str(date), fs)
if not fs:
return
minp = container.getPos() minp = container.getPos()
maxp = fs maxp = fs
tryPos = minp tryPos = minp
@ -1281,20 +1283,25 @@ class FileContainer:
self.setEncoding(encoding) self.setEncoding(encoding)
self.__tail = tail self.__tail = tail
self.__handler = None self.__handler = None
self.__pos = 0
self.__pos4hash = 0
self.__hash = ''
self.__hashNextTime = time.time() + 30
# Try to open the file. Raises an exception if an error occurred. # Try to open the file. Raises an exception if an error occurred.
handler = open(filename, 'rb') handler = open(filename, 'rb')
try:
stats = os.fstat(handler.fileno()) stats = os.fstat(handler.fileno())
self.__ino = stats.st_ino self.__ino = stats.st_ino
try: if stats.st_size:
firstLine = handler.readline() firstLine = handler.readline()
# first line available and contains new-line:
if firstLine != firstLine.rstrip('\r\n'):
# Computes the MD5 of the first line. # Computes the MD5 of the first line.
self.__hash = md5sum(firstLine).hexdigest() self.__hash = md5sum(firstLine).hexdigest()
# Start at the beginning of file if tail mode is off. # if tail mode scroll to the end of file
if tail: if tail:
handler.seek(0, 2) handler.seek(0, 2)
self.__pos = handler.tell() self.__pos = handler.tell()
else:
self.__pos = 0
finally: finally:
handler.close() handler.close()
## shows that log is in operation mode (expecting new messages only from here): ## shows that log is in operation mode (expecting new messages only from here):
@ -1304,6 +1311,10 @@ class FileContainer:
return self.__filename return self.__filename
def getFileSize(self): def getFileSize(self):
h = self.__handler
if h is not None:
stats = os.fstat(h.fileno())
return stats.st_size
return os.path.getsize(self.__filename); return os.path.getsize(self.__filename);
def setEncoding(self, encoding): def setEncoding(self, encoding):
@ -1322,38 +1333,54 @@ class FileContainer:
def setPos(self, value): def setPos(self, value):
self.__pos = value self.__pos = value
def open(self): def open(self, forcePos=None):
self.__handler = open(self.__filename, 'rb') h = open(self.__filename, 'rb')
try:
# Set the file descriptor to be FD_CLOEXEC # Set the file descriptor to be FD_CLOEXEC
fd = self.__handler.fileno() fd = h.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFD) flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
myHash = self.__hash
# Stat the file before even attempting to read it # Stat the file before even attempting to read it
stats = os.fstat(self.__handler.fileno()) stats = os.fstat(h.fileno())
if not stats.st_size: rotflg = stats.st_size < self.__pos or stats.st_ino != self.__ino
# yoh: so it is still an empty file -- nothing should be if rotflg or not len(myHash) or time.time() > self.__hashNextTime:
# read from it yet myHash = ''
# print "D: no content -- return" firstLine = h.readline()
return False # Computes the MD5 of the first line (if it is complete)
firstLine = self.__handler.readline() if firstLine != firstLine.rstrip('\r\n'):
# Computes the MD5 of the first line.
myHash = md5sum(firstLine).hexdigest() myHash = md5sum(firstLine).hexdigest()
## print "D: fn=%s hashes=%s/%s inos=%s/%s pos=%s rotate=%s" % ( self.__hashNextTime = time.time() + 30
## self.__filename, self.__hash, myHash, stats.st_ino, self.__ino, self.__pos, elif stats.st_size == self.__pos:
## self.__hash != myHash or self.__ino != stats.st_ino) myHash = self.__hash
## sys.stdout.flush() # Compare size, hash and inode
# Compare hash and inode if rotflg or myHash != self.__hash:
if self.__hash != myHash or self.__ino != stats.st_ino: if self.__hash != '':
logSys.log(logging.MSG, "Log rotation detected for %s", self.__filename) logSys.log(logging.MSG, "Log rotation detected for %s, reason: %r", self.__filename,
self.__hash = myHash (stats.st_size, self.__pos, stats.st_ino, self.__ino, myHash, self.__hash))
self.__ino = stats.st_ino self.__ino = stats.st_ino
self.__pos = 0 self.__pos = 0
self.__hash = myHash
# if nothing to read from file yet (empty or no new data):
if forcePos is not None:
self.__pos = forcePos
elif stats.st_size <= self.__pos:
return False
# Sets the file pointer to the last position. # Sets the file pointer to the last position.
self.__handler.seek(self.__pos) h.seek(self.__pos)
# leave file open (to read content):
self.__handler = h; h = None
finally:
# close (no content or error only)
if h:
h.close(); h = None
return True return True
def seek(self, offs, endLine=True): def seek(self, offs, endLine=True):
h = self.__handler h = self.__handler
if h is None:
self.open(offs)
h = self.__handler
# seek to given position # seek to given position
h.seek(offs, 0) h.seek(offs, 0)
# goto end of next line # goto end of next line
@ -1394,14 +1421,12 @@ class FileContainer:
self.getFileName(), self.getEncoding(), self.__handler.readline()) self.getFileName(), self.getEncoding(), self.__handler.readline())
def close(self): def close(self):
if not self.__handler is None: if self.__handler is not None:
# Saves the last position. # Saves the last position.
self.__pos = self.__handler.tell() self.__pos = self.__handler.tell()
# Closes the file. # Closes the file.
self.__handler.close() self.__handler.close()
self.__handler = None self.__handler = None
## print "D: Closed %s with pos %d" % (handler, self.__pos)
## sys.stdout.flush()
_decode_line_warn = Utils.Cache(maxCount=1000, maxTime=24*60*60); _decode_line_warn = Utils.Cache(maxCount=1000, maxTime=24*60*60);

View File

@ -212,19 +212,20 @@ class DatabaseTest(LogCaptureTestCase):
self.jail.name in self.db.getJailNames(True), self.jail.name in self.db.getJailNames(True),
"Jail not added to database") "Jail not added to database")
def testAddLog(self): def _testAddLog(self):
self.testAddJail() # Jail required self.testAddJail() # Jail required
_, filename = tempfile.mkstemp(".log", "Fail2BanDb_") _, filename = tempfile.mkstemp(".log", "Fail2BanDb_")
self.fileContainer = FileContainer(filename, "utf-8") self.fileContainer = FileContainer(filename, "utf-8")
self.db.addLog(self.jail, self.fileContainer) pos = self.db.addLog(self.jail, self.fileContainer)
self.assertTrue(pos is None); # unknown previously
self.assertIn(filename, self.db.getLogPaths(self.jail)) self.assertIn(filename, self.db.getLogPaths(self.jail))
os.remove(filename) os.remove(filename)
def testUpdateLog(self): def testUpdateLog(self):
self.testAddLog() # Add log file self._testAddLog() # Add log file
# Write some text # Write some text
filename = self.fileContainer.getFileName() filename = self.fileContainer.getFileName()

View File

@ -230,7 +230,7 @@ def _start_params(tmp, use_stock=False, use_stock_cfg=None,
os.symlink(os.path.abspath(pjoin(STOCK_CONF_DIR, n)), pjoin(cfg, n)) os.symlink(os.path.abspath(pjoin(STOCK_CONF_DIR, n)), pjoin(cfg, n))
if create_before_start: if create_before_start:
for n in create_before_start: for n in create_before_start:
_write_file(n % {'tmp': tmp}, 'w', '') _write_file(n % {'tmp': tmp}, 'w')
# parameters (sock/pid and config, increase verbosity, set log, etc.): # parameters (sock/pid and config, increase verbosity, set log, etc.):
vvv, llev = (), "INFO" vvv, llev = (), "INFO"
if unittest.F2B.log_level < logging.INFO: # pragma: no cover if unittest.F2B.log_level < logging.INFO: # pragma: no cover
@ -937,10 +937,8 @@ class Fail2banServerTest(Fail2banClientServerBase):
"Jail 'broken-jail' skipped, because of wrong configuration", all=True) "Jail 'broken-jail' skipped, because of wrong configuration", all=True)
# enable both jails, 3 logs for jail1, etc... # enable both jails, 3 logs for jail1, etc...
# truncate test-log - we should not find unban/ban again by reload:
self.pruneLog("[test-phase 1b]") self.pruneLog("[test-phase 1b]")
_write_jail_cfg(actions=[1,2]) _write_jail_cfg(actions=[1,2])
_write_file(test1log, "w+")
if unittest.F2B.log_level < logging.DEBUG: # pragma: no cover if unittest.F2B.log_level < logging.DEBUG: # pragma: no cover
_out_file(test1log) _out_file(test1log)
self.execCmd(SUCCESS, startparams, "reload") self.execCmd(SUCCESS, startparams, "reload")
@ -1003,7 +1001,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.pruneLog("[test-phase 2b]") self.pruneLog("[test-phase 2b]")
# write new failures: # write new failures:
_write_file(test2log, "w+", *( _write_file(test2log, "a+", *(
(str(int(MyTime.time())) + " error 403 from 192.0.2.2: test 2",) * 3 + (str(int(MyTime.time())) + " error 403 from 192.0.2.2: test 2",) * 3 +
(str(int(MyTime.time())) + " error 403 from 192.0.2.3: test 2",) * 3 + (str(int(MyTime.time())) + " error 403 from 192.0.2.3: test 2",) * 3 +
(str(int(MyTime.time())) + " failure 401 from 192.0.2.4: test 2",) * 3 + (str(int(MyTime.time())) + " failure 401 from 192.0.2.4: test 2",) * 3 +
@ -1062,10 +1060,6 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.assertEqual(self.execCmdDirect(startparams, self.assertEqual(self.execCmdDirect(startparams,
'get', 'test-jail1', 'banned', '192.0.2.3', '192.0.2.9')[1], [1, 0]) 'get', 'test-jail1', 'banned', '192.0.2.3', '192.0.2.9')[1], [1, 0])
# rotate logs:
_write_file(test1log, "w+")
_write_file(test2log, "w+")
# restart jail without unban all: # restart jail without unban all:
self.pruneLog("[test-phase 2c]") self.pruneLog("[test-phase 2c]")
self.execCmd(SUCCESS, startparams, self.execCmd(SUCCESS, startparams,
@ -1183,7 +1177,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
# now write failures again and check already banned (jail1 was alive the whole time) and new bans occurred (jail1 was alive the whole time): # now write failures again and check already banned (jail1 was alive the whole time) and new bans occurred (jail1 was alive the whole time):
self.pruneLog("[test-phase 5]") self.pruneLog("[test-phase 5]")
_write_file(test1log, "w+", *( _write_file(test1log, "a+", *(
(str(int(MyTime.time())) + " failure 401 from 192.0.2.1: test 5",) * 3 + (str(int(MyTime.time())) + " failure 401 from 192.0.2.1: test 5",) * 3 +
(str(int(MyTime.time())) + " error 403 from 192.0.2.5: test 5",) * 3 + (str(int(MyTime.time())) + " error 403 from 192.0.2.5: test 5",) * 3 +
(str(int(MyTime.time())) + " failure 401 from 192.0.2.6: test 5",) * 3 (str(int(MyTime.time())) + " failure 401 from 192.0.2.6: test 5",) * 3
@ -1469,7 +1463,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.pruneLog("[test-phase sendmail-reject]") self.pruneLog("[test-phase sendmail-reject]")
# write log: # write log:
_write_file(lgfn, "w+", *smrej_msg) _write_file(lgfn, "a+", *smrej_msg)
# wait and check it caused banned (and dump in the test-file): # wait and check it caused banned (and dump in the test-file):
self.assertLogged( self.assertLogged(
"[sendmail-reject] Ban 192.0.2.2", "stdout: 'found: 0 / 3, banned: 1 / 1'", "[sendmail-reject] Ban 192.0.2.2", "stdout: 'found: 0 / 3, banned: 1 / 1'",
@ -1597,7 +1591,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
wakeObs = False wakeObs = False
_observer_wait_before_incrban(lambda: wakeObs) _observer_wait_before_incrban(lambda: wakeObs)
# write again (IP already bad): # write again (IP already bad):
_write_file(test1log, "w+", *( _write_file(test1log, "a+", *(
(str(int(MyTime.time())) + " failure 401 from 192.0.2.11: I'm very bad \"hacker\" `` $(echo test)",) * 2 (str(int(MyTime.time())) + " failure 401 from 192.0.2.11: I'm very bad \"hacker\" `` $(echo test)",) * 2
)) ))
# wait for ban: # wait for ban:

View File

@ -195,7 +195,7 @@ def _assert_correct_last_attempt(utest, filter_, output, count=None):
_assert_equal_entries(utest, f, o) _assert_equal_entries(utest, f, o)
def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line=""): def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line="", lines=None):
"""Copy lines from one file to another (which might be already open) """Copy lines from one file to another (which might be already open)
Returns open fout Returns open fout
@ -212,9 +212,9 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
fin.readline() fin.readline()
# Read # Read
i = 0 i = 0
lines = [] if not lines: lines = []
while n is None or i < n: while n is None or i < n:
l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n') l = fin.readline().decode('UTF-8', 'replace').rstrip('\r\n')
if terminal_line is not None and l == terminal_line: if terminal_line is not None and l == terminal_line:
break break
lines.append(l) lines.append(l)
@ -222,6 +222,7 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
# Write: all at once and flush # Write: all at once and flush
if isinstance(fout, str): if isinstance(fout, str):
fout = open(fout, mode) fout = open(fout, mode)
DefLogSys.debug(' ++ write %d test lines', len(lines))
fout.write('\n'.join(lines)+'\n') fout.write('\n'.join(lines)+'\n')
fout.flush() fout.flush()
if isinstance(in_, str): # pragma: no branch - only used with str in test cases if isinstance(in_, str): # pragma: no branch - only used with str in test cases
@ -253,7 +254,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p
# Read/Write # Read/Write
i = 0 i = 0
while n is None or i < n: while n is None or i < n:
l = FileContainer.decode_line(in_, 'UTF-8', fin.readline()).rstrip('\r\n') l = fin.readline().decode('UTF-8', 'replace').rstrip('\r\n')
if terminal_line is not None and l == terminal_line: if terminal_line is not None and l == terminal_line:
break break
journal.send(MESSAGE=l.strip(), **fields) journal.send(MESSAGE=l.strip(), **fields)
@ -1136,13 +1137,15 @@ def get_monitor_failures_testcase(Filter_):
# move aside, but leaving the handle still open... # move aside, but leaving the handle still open...
os.rename(self.name, self.name + '.bak') os.rename(self.name, self.name + '.bak')
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1).close() _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14, n=1,
lines=["Aug 14 11:59:59 [logrotate] rotation 1"]).close()
self.assert_correct_last_attempt(GetFailures.FAILURES_01) self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 3) self.assertEqual(self.filter.failManager.getFailTotal(), 3)
# now remove the moved file # now remove the moved file
_killfile(None, self.name + '.bak') _killfile(None, self.name + '.bak')
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3).close() _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=12, n=3,
lines=["Aug 14 11:59:59 [logrotate] rotation 2"]).close()
self.assert_correct_last_attempt(GetFailures.FAILURES_01) self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 6) self.assertEqual(self.filter.failManager.getFailTotal(), 6)
@ -1196,7 +1199,7 @@ def get_monitor_failures_testcase(Filter_):
os.rename(tmpsub1, tmpsub2 + 'a') os.rename(tmpsub1, tmpsub2 + 'a')
os.mkdir(tmpsub1) os.mkdir(tmpsub1)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w') skip=12, n=1, mode='w', lines=["Aug 14 11:59:59 [logrotate] rotation 1"])
self.file.close() self.file.close()
self._wait4failures(2) self._wait4failures(2)
@ -1207,7 +1210,7 @@ def get_monitor_failures_testcase(Filter_):
os.mkdir(tmpsub1) os.mkdir(tmpsub1)
self.waitForTicks(2) self.waitForTicks(2)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name, self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w') skip=12, n=1, mode='w', lines=["Aug 14 11:59:59 [logrotate] rotation 2"])
self.file.close() self.file.close()
self._wait4failures(3) self._wait4failures(3)