From d153555a07ec81dabe4678ea06c8e644afc8ec40 Mon Sep 17 00:00:00 2001 From: sebres Date: Thu, 22 Sep 2016 14:28:47 +0200 Subject: [PATCH] increase default wait operation (sleep time, threshold interval) - avowedly greater inertance, but fewer system load by many jails resp. log files; waiting with `wait_for` extended with verifying of active flag; implemented better error handling in some multi-threaded routines; shutdown of jails rewritten (faster and safer, does not breaks shutdown process if some error occurred); --- fail2ban/server/actions.py | 5 +- fail2ban/server/asyncserver.py | 39 ++++++++------ fail2ban/server/filter.py | 20 ++++--- fail2ban/server/filtergamin.py | 5 +- fail2ban/server/filterpoll.py | 7 +-- fail2ban/server/filterpyinotify.py | 2 +- fail2ban/server/filtersystemd.py | 83 +++++++++++++++++++----------- fail2ban/server/jail.py | 24 ++++++--- fail2ban/server/server.py | 20 ++++--- fail2ban/server/utils.py | 4 +- 10 files changed, 130 insertions(+), 79 deletions(-) diff --git a/fail2ban/server/actions.py b/fail2ban/server/actions.py index bc8684b5..eed481d7 100644 --- a/fail2ban/server/actions.py +++ b/fail2ban/server/actions.py @@ -281,9 +281,10 @@ class Actions(JailThread, Mapping): exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) while self.active: if self.idle: - time.sleep(self.sleeptime) + Utils.wait_for(lambda: not self.active or not self.idle, + self.sleeptime * 10, self.sleeptime) continue - if not Utils.wait_for(self.__checkBan, self.sleeptime): + if not Utils.wait_for(lambda: not self.active or self.__checkBan(), self.sleeptime): self.__checkUnBan() self.__flushBan() diff --git a/fail2ban/server/asyncserver.py b/fail2ban/server/asyncserver.py index ffb20503..a489b585 100644 --- a/fail2ban/server/asyncserver.py +++ b/fail2ban/server/asyncserver.py @@ -67,23 +67,28 @@ class RequestHandler(asynchat.async_chat): # This method is called once we have a complete request. def found_terminator(self): - # Pop whole buffer - message = self.__buffer - self.__buffer = [] - # Joins the buffer items. - message = CSPROTO.EMPTY.join(message) - # Closes the channel if close was received - if message == CSPROTO.CLOSE: - self.close_when_done() - return - # Deserialize - message = loads(message) - # Gives the message to the transmitter. - message = self.__transmitter.proceed(message) - # Serializes the response. - message = dumps(message, HIGHEST_PROTOCOL) - # Sends the response to the client. - self.push(message + CSPROTO.END) + try: + # Pop whole buffer + message = self.__buffer + self.__buffer = [] + # Joins the buffer items. + message = CSPROTO.EMPTY.join(message) + # Closes the channel if close was received + if message == CSPROTO.CLOSE: + self.close_when_done() + return + # Deserialize + message = loads(message) + # Gives the message to the transmitter. + message = self.__transmitter.proceed(message) + # Serializes the response. + message = dumps(message, HIGHEST_PROTOCOL) + # Sends the response to the client. + self.push(message + CSPROTO.END) + except Exception as e: # pragma: no cover + logSys.error("Caught unhandled exception: %r", e, + exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) + def handle_error(self): e1, e2 = formatExceptionInfo() diff --git a/fail2ban/server/filter.py b/fail2ban/server/filter.py index 08f3e77d..ac64fb86 100644 --- a/fail2ban/server/filter.py +++ b/fail2ban/server/filter.py @@ -501,14 +501,18 @@ class Filter(JailThread): except Exception as e: logSys.error("Failed to process line: %r, caught exception: %r", line, e, exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) - # incr error counter, stop processing (going idle) after 100th error : - self._errors += 1 - # sleep a little bit (to get around time-related errors): - time.sleep(self.sleeptime) - if self._errors >= 100: - logSys.error("Too many errors at once (%s), going idle", self._errors) - self._errors //= 2 - self.idle = True + # incr common error counter: + self.commonError() + + def commonError(self): + # incr error counter, stop processing (going idle) after 100th error : + self._errors += 1 + # sleep a little bit (to get around time-related errors): + time.sleep(self.sleeptime) + if self._errors >= 100: + logSys.error("Too many errors at once (%s), going idle", self._errors) + self._errors //= 2 + self.idle = True ## # Returns true if the line should be ignored. diff --git a/fail2ban/server/filtergamin.py b/fail2ban/server/filtergamin.py index 39793067..a8d03ebd 100644 --- a/fail2ban/server/filtergamin.py +++ b/fail2ban/server/filtergamin.py @@ -124,12 +124,13 @@ class FilterGamin(FileFilter): while self.active: if self.idle: # wait a little bit here for not idle, to prevent hi-load: - if not Utils.wait_for(lambda: not self.idle, + if not Utils.wait_for(lambda: not self.active or not self.idle, self.sleeptime * 10, self.sleeptime ): self.ticks += 1 continue - Utils.wait_for(self._handleEvents, self.sleeptime) + Utils.wait_for(lambda: not self.active or self._handleEvents(), + self.sleeptime) self.ticks += 1 logSys.debug(self.jail.name + ": filter terminated") return True diff --git a/fail2ban/server/filterpoll.py b/fail2ban/server/filterpoll.py index f66be42b..d0660f15 100644 --- a/fail2ban/server/filterpoll.py +++ b/fail2ban/server/filterpoll.py @@ -101,14 +101,15 @@ class FilterPoll(FileFilter): logSys.log(6, "Woke up idle=%s with %d files monitored", self.idle, self.getLogCount()) if self.idle: - if not Utils.wait_for(lambda: not self.idle, + if not Utils.wait_for(lambda: not self.active or not self.idle, self.sleeptime * 10, self.sleeptime ): self.ticks += 1 continue # Get file modification modlst = [] - Utils.wait_for(lambda: self.getModified(modlst), self.sleeptime) + Utils.wait_for(lambda: not self.active or self.getModified(modlst), + self.sleeptime) for filename in modlst: self.getFailures(filename) self.__modified = True @@ -162,7 +163,7 @@ class FilterPoll(FileFilter): exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) # increase file and common error counters: self.__file404Cnt[filename] += 1 - self._errors += 1 + self.commonError() if self.__file404Cnt[filename] > 50: logSys.warning("Too many errors. Remove file %r from monitoring process", filename) self.__file404Cnt[filename] = 0 diff --git a/fail2ban/server/filterpyinotify.py b/fail2ban/server/filterpyinotify.py index 1e08f250..9b444844 100644 --- a/fail2ban/server/filterpyinotify.py +++ b/fail2ban/server/filterpyinotify.py @@ -178,7 +178,7 @@ class FilterPyinotify(FileFilter): # slow check events while idle: def __check_events(self, *args, **kwargs): if self.idle: - if Utils.wait_for(lambda: not self.idle, + if Utils.wait_for(lambda: not self.active or not self.idle, self.sleeptime * 10, self.sleeptime ): pass diff --git a/fail2ban/server/filtersystemd.py b/fail2ban/server/filtersystemd.py index f9f9395c..a29085ba 100644 --- a/fail2ban/server/filtersystemd.py +++ b/fail2ban/server/filtersystemd.py @@ -252,40 +252,63 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover while self.active: # wait for records (or for timeout in sleeptime seconds): - self.__journal.wait(self.sleeptime) - if self.idle: - # because journal.wait will returns immediatelly if we have records in journal, - # just wait a little bit here for not idle, to prevent hi-load: - if not Utils.wait_for(lambda: not self.idle, - self.sleeptime * 10, self.sleeptime - ): + try: + ## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime), + ## don't use `journal.close()` for it, because in some python/systemd implementation it may + ## cause abnormal program termination + #self.__journal.wait(self.sleeptime) != journal.NOP + ## + ## wait for entries without sleep in intervals, because "sleeping" in journal.wait: + Utils.wait_for(lambda: not self.active or \ + self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP, + self.sleeptime, 0.00001) + if self.idle: + # because journal.wait will returns immediatelly if we have records in journal, + # just wait a little bit here for not idle, to prevent hi-load: + if not Utils.wait_for(lambda: not self.active or not self.idle, + self.sleeptime * 10, self.sleeptime + ): + self.ticks += 1 + continue + self.__modified = 0 + while self.active: + logentry = None + try: + logentry = self.__journal.get_next() + except OSError as e: + logSys.error("Error reading line from systemd journal: %s", + e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) self.ticks += 1 - continue - self.__modified = 0 - while self.active: - logentry = None - try: - logentry = self.__journal.get_next() - except OSError as e: - logSys.error("Error reading line from systemd journal: %s", - e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) - self.ticks += 1 - if logentry: - self.processLineAndAdd( - *self.formatJournalEntry(logentry)) - self.__modified += 1 - if self.__modified >= 100: # todo: should be configurable + if logentry: + self.processLineAndAdd( + *self.formatJournalEntry(logentry)) + self.__modified += 1 + if self.__modified >= 100: # todo: should be configurable + break + else: break - else: + if self.__modified: + try: + while True: + ticket = self.failManager.toBan() + self.jail.putFailTicket(ticket) + except FailManagerEmpty: + self.failManager.cleanup(MyTime.time()) + except Exception as e: # pragma: no cover + if not self.active: # if not active - error by stop... break - if self.__modified: - try: - while True: - ticket = self.failManager.toBan() - self.jail.putFailTicket(ticket) - except FailManagerEmpty: - self.failManager.cleanup(MyTime.time()) + logSys.error("Caught unhandled exception in main cycle: %r", e, + exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) + # incr common error counter: + self.commonError() + # close journal: + try: + if self.__journal: + self.__journal.close() + except Exception as e: # pragma: no cover + logSys.error("Close journal failed: %r", e, + exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) logSys.debug((self.jail is not None and self.jail.name or "jailless") +" filter terminated") return True diff --git a/fail2ban/server/jail.py b/fail2ban/server/jail.py index e70eaddd..aa4a38cc 100644 --- a/fail2ban/server/jail.py +++ b/fail2ban/server/jail.py @@ -239,19 +239,31 @@ class Jail(object): Once stated, also queries the persistent database to reinstate any valid bans. """ + logSys.debug("Starting jail %r", self.name) self.filter.start() self.actions.start() self.restoreCurrentBans() logSys.info("Jail %r started", self.name) - def stop(self): + def stop(self, stop=True, join=True): """Stop the jail, by stopping filter and actions threads. """ - self.filter.stop() - self.actions.stop() - self.filter.join() - self.actions.join() - logSys.info("Jail '%s' stopped" % self.name) + if stop: + logSys.debug("Stopping jail %r", self.name) + for obj in (self.filter, self.actions): + try: + ## signal to stop filter / actions: + if stop: + obj.stop() + ## wait for end of threads: + if join: + if obj.isAlive(): + obj.join() + except Exception as e: + logSys.error("Stop %r of jail %r failed: %s", obj, self.name, e, + exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) + if join: + logSys.info("Jail %r stopped", self.name) def isAlive(self): """Check jail "isAlive" by checking filter and actions threads. diff --git a/fail2ban/server/server.py b/fail2ban/server/server.py index 97345331..a21b1fce 100644 --- a/fail2ban/server/server.py +++ b/fail2ban/server/server.py @@ -210,14 +210,14 @@ class Server: if self.__db is not None: self.__db.addJail(self.__jails[name]) - def delJail(self, name, stop=True): + def delJail(self, name, stop=True, join=True): jail = self.__jails[name] - if stop and jail.isAlive(): - logSys.debug("Stopping jail %r" % name) - jail.stop() - if self.__db is not None: - self.__db.delJail(jail) - del self.__jails[name] + if join or jail.isAlive(): + jail.stop(stop=stop, join=join) + if join: + if self.__db is not None: + self.__db.delJail(jail) + del self.__jails[name] def startJail(self, name): with self.__lock: @@ -237,8 +237,12 @@ class Server: def stopAllJail(self): logSys.info("Stopping all jails") with self.__lock: + # 1st stop all jails (signal and stop actions/filter thread): for name in self.__jails.keys(): - self.delJail(name, stop=True) + self.delJail(name, stop=True, join=False) + # 2nd wait for end and delete jails: + for name in self.__jails.keys(): + self.delJail(name, stop=False, join=True) def reloadJails(self, name, opts, begin): if begin: diff --git a/fail2ban/server/utils.py b/fail2ban/server/utils.py index a8a00257..74406363 100644 --- a/fail2ban/server/utils.py +++ b/fail2ban/server/utils.py @@ -52,8 +52,8 @@ class Utils(): """Utilities provide diverse static methods like executes OS shell commands, etc. """ - DEFAULT_SLEEP_TIME = 0.1 - DEFAULT_SLEEP_INTERVAL = 0.01 + DEFAULT_SLEEP_TIME = 2 + DEFAULT_SLEEP_INTERVAL = 0.2 class Cache(object):