diff --git a/config/filter.d/mysqld-auth.conf b/config/filter.d/mysqld-auth.conf index 97b37920..930c9b5a 100644 --- a/config/filter.d/mysqld-auth.conf +++ b/config/filter.d/mysqld-auth.conf @@ -17,7 +17,7 @@ before = common.conf _daemon = mysqld -failregex = ^%(__prefix_line)s(?:(?:\d{6}|\d{4}-\d{2}-\d{2})[ T]\s?\d{1,2}:\d{2}:\d{2} )?(?:\d+ )?\[\w+\] (?:\[[^\]]+\] )*Access denied for user '[^']+'@'' (to database '[^']*'|\(using password: (YES|NO)\))*\s*$ +failregex = ^%(__prefix_line)s(?:(?:\d{6}|\d{4}-\d{2}-\d{2})[ T]\s?\d{1,2}:\d{2}:\d{2} )?(?:\d+ )?\[\w+\] (?:\[[^\]]+\] )*Access denied for user '[^']+'@'' (to database '[^']*'|\(using password: (YES|NO)\))*\s*$ ignoreregex = diff --git a/fail2ban/server/database.py b/fail2ban/server/database.py index 0dd9acb6..ed736a7a 100644 --- a/fail2ban/server/database.py +++ b/fail2ban/server/database.py @@ -489,22 +489,24 @@ class Fail2BanDb(object): If log was already present in database, value of last position in the log file; else `None` """ + return self._addLog(cur, jail, container.getFileName(), container.getPos(), container.getHash()) + + def _addLog(self, cur, jail, name, pos=0, md5=None): lastLinePos = None cur.execute( "SELECT firstlinemd5, lastfilepos FROM logs " "WHERE jail=? AND path=?", - (jail.name, container.getFileName())) + (jail.name, name)) try: firstLineMD5, lastLinePos = cur.fetchone() except TypeError: - firstLineMD5 = False + firstLineMD5 = None - cur.execute( - "INSERT OR REPLACE INTO logs(jail, path, firstlinemd5, lastfilepos) " - "VALUES(?, ?, ?, ?)", - (jail.name, container.getFileName(), - container.getHash(), container.getPos())) - if container.getHash() != firstLineMD5: + if not firstLineMD5 and (pos or md5): + cur.execute( + "INSERT OR REPLACE INTO logs(jail, path, firstlinemd5, lastfilepos) " + "VALUES(?, ?, ?, ?)", (jail.name, name, md5, pos)) + if md5 is not None and md5 != firstLineMD5: lastLinePos = None return lastLinePos @@ -533,7 +535,7 @@ class Fail2BanDb(object): return set(row[0] for row in cur.fetchmany()) @commitandrollback - def updateLog(self, cur, *args, **kwargs): + def updateLog(self, cur, jail, container): """Updates hash and last position in log file. Parameters @@ -543,14 +545,48 @@ class Fail2BanDb(object): container : FileContainer File container of the log file being updated. """ - self._updateLog(cur, *args, **kwargs) + self._updateLog(cur, jail, container.getFileName(), container.getPos(), container.getHash()) - def _updateLog(self, cur, jail, container): + def _updateLog(self, cur, jail, name, pos, md5): cur.execute( "UPDATE logs SET firstlinemd5=?, lastfilepos=? " - "WHERE jail=? AND path=?", - (container.getHash(), container.getPos(), - jail.name, container.getFileName())) + "WHERE jail=? AND path=?", (md5, pos, jail.name, name)) + # be sure it is set (if not available): + if not cur.rowcount: + cur.execute( + "INSERT OR REPLACE INTO logs(jail, path, firstlinemd5, lastfilepos) " + "VALUES(?, ?, ?, ?)", (jail.name, name, md5, pos)) + + @commitandrollback + def getJournalPos(self, cur, jail, name, time=0, iso=None): + """Get journal position from database. + + Parameters + ---------- + jail : Jail + Jail of which the journal belongs to. + name, time, iso : + Journal name (typically systemd-journal) and last known time. + + Returns + ------- + int (or float) + Last position (as time) if it was already present in database; else `None` + """ + return self._addLog(cur, jail, name, time, iso); # no hash, just time as iso + + @commitandrollback + def updateJournal(self, cur, jail, name, time, iso): + """Updates last position (as time) of journal. + + Parameters + ---------- + jail : Jail + Jail of which the journal belongs to. + name, time, iso : + Journal name (typically systemd-journal) and last known time. + """ + self._updateLog(cur, jail, name, time, iso); # no hash, just time as iso @commitandrollback def addBan(self, cur, jail, ticket): @@ -754,7 +790,8 @@ class Fail2BanDb(object): if overalljails or jail is None: query += " GROUP BY ip ORDER BY timeofban DESC LIMIT 1" cur = self._db.cursor() - return cur.execute(query, queryArgs) + # repack iterator as long as in lock: + return list(cur.execute(query, queryArgs)) def _getCurrentBans(self, cur, jail = None, ip = None, forbantime=None, fromtime=None): queryArgs = [] diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index c2a72598..870b3058 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -190,6 +190,13 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover def getJournalReader(self): return self.__journal + def getJrnEntTime(self, logentry): + """ Returns time of entry as tuple (ISO-str, Posix).""" + date = logentry.get('_SOURCE_REALTIME_TIMESTAMP') + if date is None: + date = logentry.get('__REALTIME_TIMESTAMP') + return (date.isoformat(), time.mktime(date.timetuple()) + date.microsecond/1.0E6) + ## # Format journal log entry into syslog style # @@ -222,9 +229,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover logelements[-1] += v logelements[-1] += ":" if logelements[-1] == "kernel:": - if '_SOURCE_MONOTONIC_TIMESTAMP' in logentry: - monotonic = logentry.get('_SOURCE_MONOTONIC_TIMESTAMP') - else: + monotonic = logentry.get('_SOURCE_MONOTONIC_TIMESTAMP') + if monotonic is None: monotonic = logentry.get('__MONOTONIC_TIMESTAMP')[0] logelements.append("[%12.6f]" % monotonic.total_seconds()) msg = logentry.get('MESSAGE','') @@ -235,13 +241,11 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover logline = " ".join(logelements) - date = logentry.get('_SOURCE_REALTIME_TIMESTAMP', - logentry.get('__REALTIME_TIMESTAMP')) + date = self.getJrnEntTime(logentry) logSys.log(5, "[%s] Read systemd journal entry: %s %s", self.jailName, - date.isoformat(), logline) + date[0], logline) ## use the same type for 1st argument: - return ((logline[:0], date.isoformat(), logline.replace('\n', '\\n')), - time.mktime(date.timetuple()) + date.microsecond/1.0E6) + return ((logline[:0], date[0], logline.replace('\n', '\\n')), date[1]) def seekToTime(self, date): if not isinstance(date, datetime.datetime): @@ -262,9 +266,12 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover "Jail regexs will be checked against all journal entries, " "which is not advised for performance reasons.") - # Seek to now - findtime in journal - start_time = datetime.datetime.now() - \ - datetime.timedelta(seconds=int(self.getFindTime())) + # Try to obtain the last known time (position of journal) + start_time = 0 + if self.jail.database is not None: + start_time = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0 + # Seek to max(last_known_time, now - findtime) in journal + start_time = max( start_time, MyTime.time() - int(self.getFindTime()) ) self.seekToTime(start_time) # Move back one entry to ensure do not end up in dead space # if start time beyond end of journal @@ -303,8 +310,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) self.ticks += 1 if logentry: - self.processLineAndAdd( - *self.formatJournalEntry(logentry)) + line = self.formatJournalEntry(logentry) + self.processLineAndAdd(*line) self.__modified += 1 if self.__modified >= 100: # todo: should be configurable break @@ -313,6 +320,9 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover if self.__modified: self.performBan() self.__modified = 0 + # update position in log (time and iso string): + if self.jail.database is not None: + self.jail.database.updateJournal(self.jail, 'systemd-journal', line[1], line[0][1]) except Exception as e: # pragma: no cover if not self.active: # if not active - error by stop... break diff --git a/fail2ban/server/jail.py b/fail2ban/server/jail.py index 9453e205..ce9968a8 100644 --- a/fail2ban/server/jail.py +++ b/fail2ban/server/jail.py @@ -161,6 +161,10 @@ class Jail(object): """ return self.__db + @database.setter + def database(self, value): + self.__db = value; + @property def filter(self): """The filter which the jail is using to monitor log files. diff --git a/fail2ban/server/observer.py b/fail2ban/server/observer.py index bd7cbe4a..c19549ba 100644 --- a/fail2ban/server/observer.py +++ b/fail2ban/server/observer.py @@ -146,9 +146,11 @@ class ObserverThread(JailThread): def pulse_notify(self): """Notify wakeup (sets /and resets/ notify event) """ - if not self._paused and self._notify: - self._notify.set() - #self._notify.clear() + if not self._paused: + n = self._notify + if n: + n.set() + #n.clear() def add(self, *event): """Add a event to queue and notify thread to wake up. @@ -237,6 +239,7 @@ class ObserverThread(JailThread): break ## end of main loop - exit logSys.info("Observer stopped, %s events remaining.", len(self._queue)) + self._notify = None #print("Observer stopped, %s events remaining." % len(self._queue)) except Exception as e: logSys.error('Observer stopped after error: %s', e, exc_info=True) @@ -262,9 +265,8 @@ class ObserverThread(JailThread): if not self.active: super(ObserverThread, self).start() - def stop(self): + def stop(self, wtime=5, forceQuit=True): if self.active and self._notify: - wtime = 5 logSys.info("Observer stop ... try to end queue %s seconds", wtime) #print("Observer stop ....") # just add shutdown job to make possible wait later until full (events remaining) @@ -276,10 +278,15 @@ class ObserverThread(JailThread): #self.pulse_notify() self._notify = None # wait max wtime seconds until full (events remaining) - self.wait_empty(wtime) - n.clear() - self.active = False - self.wait_idle(0.5) + if self.wait_empty(wtime) or forceQuit: + n.clear() + self.active = False; # leave outer (active) loop + self._paused = True; # leave inner (queue) loop + self.__db = None + else: + self._notify = n + return self.wait_idle(min(wtime, 0.5)) and not self.is_full + return True @property def is_full(self): diff --git a/fail2ban/server/server.py b/fail2ban/server/server.py index 15265822..feb3b399 100644 --- a/fail2ban/server/server.py +++ b/fail2ban/server/server.py @@ -193,23 +193,26 @@ class Server: signal.signal(s, sh) # Give observer a small chance to complete its work before exit - if Observers.Main is not None: - Observers.Main.stop() + obsMain = Observers.Main + if obsMain is not None: + if obsMain.stop(forceQuit=False): + obsMain = None + Observers.Main = None # Now stop all the jails self.stopAllJail() + # Stop observer ultimately + if obsMain is not None: + obsMain.stop() + # Explicit close database (server can leave in a thread, # so delayed GC can prevent commiting changes) if self.__db: self.__db.close() self.__db = None - # Stop observer and exit - if Observers.Main is not None: - Observers.Main.stop() - Observers.Main = None - # Stop async + # Stop async and exit if self.__asyncServer is not None: self.__asyncServer.stop() self.__asyncServer = None diff --git a/fail2ban/tests/databasetestcase.py b/fail2ban/tests/databasetestcase.py index a1df2993..9a5e9fa1 100644 --- a/fail2ban/tests/databasetestcase.py +++ b/fail2ban/tests/databasetestcase.py @@ -262,6 +262,15 @@ class DatabaseTest(LogCaptureTestCase): self.db.addLog(self.jail, self.fileContainer), None) os.remove(filename) + def testUpdateJournal(self): + self.testAddJail() # Jail required + # not yet updated: + self.assertEqual(self.db.getJournalPos(self.jail, 'systemd-journal'), None) + # update 3 times (insert and 2 updates) and check it was set (and overwritten): + for t in (1500000000, 1500000001, 1500000002): + self.db.updateJournal(self.jail, 'systemd-journal', t, 'TEST'+str(t)) + self.assertEqual(self.db.getJournalPos(self.jail, 'systemd-journal'), t) + def testAddBan(self): self.testAddJail() ticket = FailTicket("127.0.0.1", 0, ["abc\n"]) diff --git a/fail2ban/tests/dummyjail.py b/fail2ban/tests/dummyjail.py index ec960290..9bd1bcaf 100644 --- a/fail2ban/tests/dummyjail.py +++ b/fail2ban/tests/dummyjail.py @@ -40,7 +40,6 @@ class DummyJail(Jail): self.lock = Lock() self.queue = [] super(DummyJail, self).__init__(name=name, backend=backend) - self.__db = None self.__actions = DummyActions(self) def __len__(self): @@ -74,14 +73,6 @@ class DummyJail(Jail): def idle(self, value): pass - @property - def database(self): - return self.__db; - - @database.setter - def database(self, value): - self.__db = value; - @property def actions(self): return self.__actions; diff --git a/fail2ban/tests/fail2banclienttestcase.py b/fail2ban/tests/fail2banclienttestcase.py index 5caa4dd9..95f73ed3 100644 --- a/fail2ban/tests/fail2banclienttestcase.py +++ b/fail2ban/tests/fail2banclienttestcase.py @@ -343,6 +343,7 @@ def with_foreground_server_thread(startextra={}): # to wait for end of server, default accept any exit code, because multi-threaded, # thus server can exit in-between... def _stopAndWaitForServerEnd(code=(SUCCESS, FAILED)): + tearDownMyTime() # if seems to be down - try to catch end phase (wait a bit for end:True to recognize down state): if not phase.get('end', None) and not os.path.exists(pjoin(tmp, "f2b.pid")): Utils.wait_for(lambda: phase.get('end', None) is not None, MID_WAITTIME) @@ -1570,6 +1571,37 @@ class Fail2banServerTest(Fail2banClientServerBase): self.assertLogged( "192.0.2.11", "+ 600 =", all=True, wait=MID_WAITTIME) + # test stop with busy observer: + self.pruneLog("[test-phase end) stop on busy observer]") + tearDownMyTime() + a = {'state': 0} + obsMain = Observers.Main + def _long_action(): + logSys.info('++ observer enters busy state ...') + a['state'] = 1 + Utils.wait_for(lambda: a['state'] == 2, MAX_WAITTIME) + obsMain.db_purge(); # does nothing (db is already None) + logSys.info('-- observer leaves busy state.') + obsMain.add('call', _long_action) + obsMain.add('call', lambda: None) + # wait observer enter busy state: + Utils.wait_for(lambda: a['state'] == 1, MAX_WAITTIME) + # overwrite default wait time (normally 5 seconds): + obsMain_stop = obsMain.stop + def _stop(wtime=(0.01 if unittest.F2B.fast else 0.1), forceQuit=True): + return obsMain_stop(wtime, forceQuit) + obsMain.stop = _stop + # stop server and wait for end: + self.stopAndWaitForServerEnd(SUCCESS) + # check observer and db state: + self.assertNotLogged('observer leaves busy state') + self.assertFalse(obsMain.idle) + self.assertEqual(obsMain._ObserverThread__db, None) + # server is exited without wait for observer, stop it now: + a['state'] = 2 + self.assertLogged('observer leaves busy state', wait=True) + obsMain.join() + # test multiple start/stop of the server (threaded in foreground) -- if False: # pragma: no cover @with_foreground_server_thread() diff --git a/fail2ban/tests/files/logs/mysqld-auth b/fail2ban/tests/files/logs/mysqld-auth index 0b0827f9..29faeb71 100644 --- a/fail2ban/tests/files/logs/mysqld-auth +++ b/fail2ban/tests/files/logs/mysqld-auth @@ -33,3 +33,7 @@ Sep 16 21:30:32 catinthehat mysqld: 130916 21:30:32 [Warning] Access denied for 2019-09-06T01:45:18 srv mysqld: 2019-09-06 1:45:18 140581192722176 [Warning] Access denied for user 'global'@'192.0.2.2' (using password: YES) # failJSON: { "time": "2019-09-24T13:16:50", "match": true , "host": "192.0.2.3", "desc": "ISO timestamp within log message" } 2019-09-24T13:16:50 srv mysqld[1234]: 2019-09-24 13:16:50 8756 [Warning] Access denied for user 'root'@'192.0.2.3' (using password: YES) + +# filterOptions: [{"logtype": "file"}, {"logtype": "short"}, {"logtype": "journal"}] +# failJSON: { "match": true , "host": "192.0.2.1", "user":"root", "desc": "mariadb 10.4 log format, gh-2611" } +2020-01-16 21:34:14 4644 [Warning] Access denied for user 'root'@'192.0.2.1' (using password: YES) diff --git a/fail2ban/tests/filtertestcase.py b/fail2ban/tests/filtertestcase.py index d5f78dee..35785a58 100644 --- a/fail2ban/tests/filtertestcase.py +++ b/fail2ban/tests/filtertestcase.py @@ -43,7 +43,8 @@ from ..server.failmanager import FailManagerEmpty from ..server.ipdns import asip, getfqdn, DNSUtils, IPAddr from ..server.mytime import MyTime from ..server.utils import Utils, uni_decode -from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_tmpdir, LogCaptureTestCase, \ +from .databasetestcase import getFail2BanDb +from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_alt_time, with_tmpdir, LogCaptureTestCase, \ logSys as DefLogSys, CONFIG_DIR as STOCK_CONF_DIR from .dummyjail import DummyJail @@ -1397,6 +1398,52 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover self.test_file, self.journal_fields, skip=5, n=4) self.assert_correct_ban("193.168.0.128", 3) + @with_alt_time + def test_grow_file_with_db(self): + + def _gen_falure(ip): + # insert new failures ans check it is monitored: + fields = self.journal_fields + fields.update(TEST_JOURNAL_FIELDS) + journal.send(MESSAGE="error: PAM: Authentication failure for test from "+ip, **fields) + self.waitForTicks(1) + self.assert_correct_ban(ip, 1) + + # coverage for update log: + self.jail.database = getFail2BanDb(':memory:') + self.jail.database.addJail(self.jail) + MyTime.setTime(time.time()) + self._test_grow_file() + # stop: + self.filter.stop() + self.filter.join() + MyTime.setTime(time.time() + 2) + # update log manually (should cause a seek to end of log without wait for next second): + self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time(), 'TEST') + # check seek to last (simulated) position succeeds (without bans of previous copied tickets): + self._failTotal = 0 + self._initFilter() + self.filter.setMaxRetry(1) + self.filter.start() + self.waitForTicks(1) + # check new IP but no old IPs found: + _gen_falure("192.0.2.5") + self.assertFalse(self.jail.getFailTicket()) + + # now the same with increased time (check now - findtime case): + self.filter.stop() + self.filter.join() + MyTime.setTime(time.time() + 10000) + self._failTotal = 0 + self._initFilter() + self.filter.setMaxRetry(1) + self.filter.start() + self.waitForTicks(1) + MyTime.setTime(time.time() + 3) + # check new IP but no old IPs found: + _gen_falure("192.0.2.6") + self.assertFalse(self.jail.getFailTicket()) + def test_delJournalMatch(self): self._initFilter() self.filter.start() diff --git a/fail2ban/tests/samplestestcase.py b/fail2ban/tests/samplestestcase.py index 1039b65e..0bbd05f5 100644 --- a/fail2ban/tests/samplestestcase.py +++ b/fail2ban/tests/samplestestcase.py @@ -223,11 +223,9 @@ def testSampleRegexsFactory(name, basedir): try: fail = {} # for logtype "journal" we don't need parse timestamp (simulate real systemd-backend handling): - checktime = True if opts.get('logtype') != 'journal': ret = flt.processLine(line) else: # simulate journal processing, time is known from journal (formatJournalEntry): - checktime = False if opts.get('test.prefix-line'): # journal backends creates common prefix-line: line = opts.get('test.prefix-line') + line ret = flt.processLine(('', TEST_NOW_STR, line.rstrip('\r\n')), TEST_NOW) @@ -271,7 +269,7 @@ def testSampleRegexsFactory(name, basedir): self.assertEqual(fv, v) t = faildata.get("time", None) - if checktime or t is not None: + if t is not None: try: jsonTimeLocal = datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S") except ValueError: