Browse Source

better format of time delta (using seconds2str); increase stability for systemd test-cases

pull/3146/head
sebres 3 years ago
parent
commit
7678f59827
  1. 7
      fail2ban/server/filter.py
  2. 48
      fail2ban/server/mytime.py
  3. 8
      fail2ban/tests/filtertestcase.py
  4. 12
      fail2ban/tests/misctestcase.py

7
fail2ban/server/filter.py

@ -664,13 +664,12 @@ class Filter(JailThread):
# if weird date - we'd simulate now for timeing issue (too large deviation from now):
delta = int(date - MyTime.time())
if abs(delta) > 60:
delta //= 60
# log timing issue as warning once per day:
self._logWarnOnce("_next_simByTimeWarn",
("Detected a log entry %sm %s the current time in operation mode. "
("Detected a log entry %s %s the current time in operation mode. "
"This looks like a %s problem. Treating such entries as if they just happened.",
abs(delta), "before" if delta < 0 else "after",
"latency" if -55 <= delta < 0 else "timezone"
MyTime.seconds2str(abs(delta)), "before" if delta < 0 else "after",
"latency" if -3300 <= delta < 0 else "timezone"
),
("Please check a jail for a timing issue. Line with odd timestamp: %s",
line))

48
fail2ban/server/mytime.py

@ -161,3 +161,51 @@ class MyTime:
val = rexp.sub(rpl, val)
val = MyTime._str2sec_fini.sub(r"\1+\2", val)
return eval(val)
class seconds2str():
"""Converts seconds to string on demand (if string representation needed).
Ex: seconds2str(86400*390) = 1y 3w 4d
seconds2str(86400*368) = 1y 3d
seconds2str(86400*365.5) = 1y
seconds2str(86400*2+3600*7+60*15) = 2d 7h 15m
seconds2str(86400*2+3599) = 2d 1h
seconds2str(3600-5) = 1h
seconds2str(3600-10) = 59m 50s
seconds2str(59) = 59s
"""
def __init__(self, sec):
self.sec = sec
def __str__(self):
# s = str(datetime.timedelta(seconds=int(self.sec)))
# return s if s[-3:] != ":00" else s[:-3]
s = self.sec; r = ""; c = 3
# automatic accuracy: round by large values (upto 1 minute, or 1 day by year):
if s >= 3570:
if s >= 31536000:
s = int(round(float(s)/86400)*86400)
elif s >= 86400:
s = int(round(float(s)/60)*60)
else:
s = int(round(float(s)/10)*10)
for n, m in (
('y', 31536000), # a year as 365*24*60*60 (don't need to consider leap year by this accuracy)
('w', 604800), # a week as 24*60*60*7
('d', 86400), # a day as 24*60*60
('h', 3600), # a hour as 60*60
('m', 60), # a minute
('s', 1) # a second
):
if s >= m:
c -= 1
r += ' ' + str(s//m) + n
s %= m
# stop if no remaining part or no repeat needed (too small granularity):
if not s or not c: break
elif c < 3:
# avoid too small granularity:
c -= 1
if not c: break
#
return r[1:]
def __repr__(self):
return self.__str__()

8
fail2ban/tests/filtertestcase.py

@ -457,7 +457,7 @@ class IgnoreIP(LogCaptureTestCase):
for i in (1,2,3):
self.filter.processLineAndAdd('2019-10-27 02:00:00 fail from 192.0.2.15'); # +3 = 3
self.assertLogged(
"Detected a log entry 60m before the current time in operation mode. This looks like a timezone problem.",
"Detected a log entry 1h before the current time in operation mode. This looks like a timezone problem.",
"Please check a jail for a timing issue.",
"192.0.2.15:1", "192.0.2.15:2", "192.0.2.15:3",
"Total # of detected failures: 3.", all=True, wait=True)
@ -467,7 +467,7 @@ class IgnoreIP(LogCaptureTestCase):
for i in (1,2,3):
self.filter.processLineAndAdd('2019-10-27 04:00:00 GMT fail from 192.0.2.16'); # +3 = 6
self.assertLogged(
"Detected a log entry 120m after the current time in operation mode. This looks like a timezone problem.",
"Detected a log entry 2h after the current time in operation mode. This looks like a timezone problem.",
"Please check a jail for a timing issue.",
"192.0.2.16:1", "192.0.2.16:2", "192.0.2.16:3",
"Total # of detected failures: 6.", all=True, wait=True)
@ -1458,7 +1458,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
if idle:
self.filter.sleeptime /= 100.0
self.filter.idle = True
self.waitForTicks(1)
self.waitForTicks(1)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
# Now let's feed it with entries from the file
@ -1540,6 +1540,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
def test_delJournalMatch(self):
self._initFilter()
self.filter.start()
self.waitForTicks(1); # wait for start
# Smoke test for removing of match
# basic full test
@ -1572,6 +1573,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
def test_WrongChar(self):
self._initFilter()
self.filter.start()
self.waitForTicks(1); # wait for start
# Now let's feed it with entries from the file
_copy_lines_to_journal(
self.test_file, self.journal_fields, skip=15, n=4)

12
fail2ban/tests/misctestcase.py

@ -457,3 +457,15 @@ class MyTimeTest(unittest.TestCase):
self.assertEqual(float(str2sec("1 month")) / 60 / 60 / 24, 30.4375)
self.assertEqual(float(str2sec("1 year")) / 60 / 60 / 24, 365.25)
def testSec2Str(self):
sec2str = lambda s: str(MyTime.seconds2str(s))
self.assertEqual(sec2str(86400*390), '1y 3w 4d')
self.assertEqual(sec2str(86400*368), '1y 3d')
self.assertEqual(sec2str(86400*365.49), '1y')
self.assertEqual(sec2str(86400*15), '2w 1d')
self.assertEqual(sec2str(86400*14-10), '2w')
self.assertEqual(sec2str(86400*2+3600*7+60*15), '2d 7h 15m')
self.assertEqual(sec2str(86400*2+3599), '2d 1h')
self.assertEqual(sec2str(3600-5), '1h')
self.assertEqual(sec2str(3600-10), '59m 50s')
self.assertEqual(sec2str(59), '59s')

Loading…
Cancel
Save