increase default wait operation (sleep time, threshold interval) - avowedly greater inertance, but fewer system load by many jails resp. log files;

waiting with `wait_for` extended with verifying of active flag;
implemented better error handling in some multi-threaded routines;
shutdown of jails rewritten (faster and safer, does not breaks shutdown process if some error occurred);
pull/1557/head
sebres 2016-09-22 14:28:47 +02:00
parent 35ce1166b6
commit d153555a07
10 changed files with 130 additions and 79 deletions

View File

@ -281,9 +281,10 @@ class Actions(JailThread, Mapping):
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
while self.active: while self.active:
if self.idle: if self.idle:
time.sleep(self.sleeptime) Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime)
continue continue
if not Utils.wait_for(self.__checkBan, self.sleeptime): if not Utils.wait_for(lambda: not self.active or self.__checkBan(), self.sleeptime):
self.__checkUnBan() self.__checkUnBan()
self.__flushBan() self.__flushBan()

View File

@ -67,23 +67,28 @@ class RequestHandler(asynchat.async_chat):
# This method is called once we have a complete request. # This method is called once we have a complete request.
def found_terminator(self): def found_terminator(self):
# Pop whole buffer try:
message = self.__buffer # Pop whole buffer
self.__buffer = [] message = self.__buffer
# Joins the buffer items. self.__buffer = []
message = CSPROTO.EMPTY.join(message) # Joins the buffer items.
# Closes the channel if close was received message = CSPROTO.EMPTY.join(message)
if message == CSPROTO.CLOSE: # Closes the channel if close was received
self.close_when_done() if message == CSPROTO.CLOSE:
return self.close_when_done()
# Deserialize return
message = loads(message) # Deserialize
# Gives the message to the transmitter. message = loads(message)
message = self.__transmitter.proceed(message) # Gives the message to the transmitter.
# Serializes the response. message = self.__transmitter.proceed(message)
message = dumps(message, HIGHEST_PROTOCOL) # Serializes the response.
# Sends the response to the client. message = dumps(message, HIGHEST_PROTOCOL)
self.push(message + CSPROTO.END) # Sends the response to the client.
self.push(message + CSPROTO.END)
except Exception as e: # pragma: no cover
logSys.error("Caught unhandled exception: %r", e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
def handle_error(self): def handle_error(self):
e1, e2 = formatExceptionInfo() e1, e2 = formatExceptionInfo()

View File

@ -501,14 +501,18 @@ class Filter(JailThread):
except Exception as e: except Exception as e:
logSys.error("Failed to process line: %r, caught exception: %r", line, e, logSys.error("Failed to process line: %r, caught exception: %r", line, e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
# incr error counter, stop processing (going idle) after 100th error : # incr common error counter:
self._errors += 1 self.commonError()
# sleep a little bit (to get around time-related errors):
time.sleep(self.sleeptime) def commonError(self):
if self._errors >= 100: # incr error counter, stop processing (going idle) after 100th error :
logSys.error("Too many errors at once (%s), going idle", self._errors) self._errors += 1
self._errors //= 2 # sleep a little bit (to get around time-related errors):
self.idle = True time.sleep(self.sleeptime)
if self._errors >= 100:
logSys.error("Too many errors at once (%s), going idle", self._errors)
self._errors //= 2
self.idle = True
## ##
# Returns true if the line should be ignored. # Returns true if the line should be ignored.

View File

@ -124,12 +124,13 @@ class FilterGamin(FileFilter):
while self.active: while self.active:
if self.idle: if self.idle:
# wait a little bit here for not idle, to prevent hi-load: # wait a little bit here for not idle, to prevent hi-load:
if not Utils.wait_for(lambda: not self.idle, if not Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime self.sleeptime * 10, self.sleeptime
): ):
self.ticks += 1 self.ticks += 1
continue continue
Utils.wait_for(self._handleEvents, self.sleeptime) Utils.wait_for(lambda: not self.active or self._handleEvents(),
self.sleeptime)
self.ticks += 1 self.ticks += 1
logSys.debug(self.jail.name + ": filter terminated") logSys.debug(self.jail.name + ": filter terminated")
return True return True

View File

@ -101,14 +101,15 @@ class FilterPoll(FileFilter):
logSys.log(6, "Woke up idle=%s with %d files monitored", logSys.log(6, "Woke up idle=%s with %d files monitored",
self.idle, self.getLogCount()) self.idle, self.getLogCount())
if self.idle: if self.idle:
if not Utils.wait_for(lambda: not self.idle, if not Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime self.sleeptime * 10, self.sleeptime
): ):
self.ticks += 1 self.ticks += 1
continue continue
# Get file modification # Get file modification
modlst = [] modlst = []
Utils.wait_for(lambda: self.getModified(modlst), self.sleeptime) Utils.wait_for(lambda: not self.active or self.getModified(modlst),
self.sleeptime)
for filename in modlst: for filename in modlst:
self.getFailures(filename) self.getFailures(filename)
self.__modified = True self.__modified = True
@ -162,7 +163,7 @@ class FilterPoll(FileFilter):
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG) exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
# increase file and common error counters: # increase file and common error counters:
self.__file404Cnt[filename] += 1 self.__file404Cnt[filename] += 1
self._errors += 1 self.commonError()
if self.__file404Cnt[filename] > 50: if self.__file404Cnt[filename] > 50:
logSys.warning("Too many errors. Remove file %r from monitoring process", filename) logSys.warning("Too many errors. Remove file %r from monitoring process", filename)
self.__file404Cnt[filename] = 0 self.__file404Cnt[filename] = 0

View File

@ -178,7 +178,7 @@ class FilterPyinotify(FileFilter):
# slow check events while idle: # slow check events while idle:
def __check_events(self, *args, **kwargs): def __check_events(self, *args, **kwargs):
if self.idle: if self.idle:
if Utils.wait_for(lambda: not self.idle, if Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime self.sleeptime * 10, self.sleeptime
): ):
pass pass

View File

@ -252,40 +252,63 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
while self.active: while self.active:
# wait for records (or for timeout in sleeptime seconds): # wait for records (or for timeout in sleeptime seconds):
self.__journal.wait(self.sleeptime) try:
if self.idle: ## todo: find better method as wait_for to break (e.g. notify) journal.wait(self.sleeptime),
# because journal.wait will returns immediatelly if we have records in journal, ## don't use `journal.close()` for it, because in some python/systemd implementation it may
# just wait a little bit here for not idle, to prevent hi-load: ## cause abnormal program termination
if not Utils.wait_for(lambda: not self.idle, #self.__journal.wait(self.sleeptime) != journal.NOP
self.sleeptime * 10, self.sleeptime ##
): ## wait for entries without sleep in intervals, because "sleeping" in journal.wait:
Utils.wait_for(lambda: not self.active or \
self.__journal.wait(Utils.DEFAULT_SLEEP_INTERVAL) != journal.NOP,
self.sleeptime, 0.00001)
if self.idle:
# because journal.wait will returns immediatelly if we have records in journal,
# just wait a little bit here for not idle, to prevent hi-load:
if not Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime
):
self.ticks += 1
continue
self.__modified = 0
while self.active:
logentry = None
try:
logentry = self.__journal.get_next()
except OSError as e:
logSys.error("Error reading line from systemd journal: %s",
e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
self.ticks += 1 self.ticks += 1
continue if logentry:
self.__modified = 0 self.processLineAndAdd(
while self.active: *self.formatJournalEntry(logentry))
logentry = None self.__modified += 1
try: if self.__modified >= 100: # todo: should be configurable
logentry = self.__journal.get_next() break
except OSError as e: else:
logSys.error("Error reading line from systemd journal: %s",
e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
self.ticks += 1
if logentry:
self.processLineAndAdd(
*self.formatJournalEntry(logentry))
self.__modified += 1
if self.__modified >= 100: # todo: should be configurable
break break
else: if self.__modified:
try:
while True:
ticket = self.failManager.toBan()
self.jail.putFailTicket(ticket)
except FailManagerEmpty:
self.failManager.cleanup(MyTime.time())
except Exception as e: # pragma: no cover
if not self.active: # if not active - error by stop...
break break
if self.__modified: logSys.error("Caught unhandled exception in main cycle: %r", e,
try: exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
while True: # incr common error counter:
ticket = self.failManager.toBan() self.commonError()
self.jail.putFailTicket(ticket)
except FailManagerEmpty:
self.failManager.cleanup(MyTime.time())
# close journal:
try:
if self.__journal:
self.__journal.close()
except Exception as e: # pragma: no cover
logSys.error("Close journal failed: %r", e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
logSys.debug((self.jail is not None and self.jail.name logSys.debug((self.jail is not None and self.jail.name
or "jailless") +" filter terminated") or "jailless") +" filter terminated")
return True return True

View File

@ -239,19 +239,31 @@ class Jail(object):
Once stated, also queries the persistent database to reinstate Once stated, also queries the persistent database to reinstate
any valid bans. any valid bans.
""" """
logSys.debug("Starting jail %r", self.name)
self.filter.start() self.filter.start()
self.actions.start() self.actions.start()
self.restoreCurrentBans() self.restoreCurrentBans()
logSys.info("Jail %r started", self.name) logSys.info("Jail %r started", self.name)
def stop(self): def stop(self, stop=True, join=True):
"""Stop the jail, by stopping filter and actions threads. """Stop the jail, by stopping filter and actions threads.
""" """
self.filter.stop() if stop:
self.actions.stop() logSys.debug("Stopping jail %r", self.name)
self.filter.join() for obj in (self.filter, self.actions):
self.actions.join() try:
logSys.info("Jail '%s' stopped" % self.name) ## signal to stop filter / actions:
if stop:
obj.stop()
## wait for end of threads:
if join:
if obj.isAlive():
obj.join()
except Exception as e:
logSys.error("Stop %r of jail %r failed: %s", obj, self.name, e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
if join:
logSys.info("Jail %r stopped", self.name)
def isAlive(self): def isAlive(self):
"""Check jail "isAlive" by checking filter and actions threads. """Check jail "isAlive" by checking filter and actions threads.

View File

@ -210,14 +210,14 @@ class Server:
if self.__db is not None: if self.__db is not None:
self.__db.addJail(self.__jails[name]) self.__db.addJail(self.__jails[name])
def delJail(self, name, stop=True): def delJail(self, name, stop=True, join=True):
jail = self.__jails[name] jail = self.__jails[name]
if stop and jail.isAlive(): if join or jail.isAlive():
logSys.debug("Stopping jail %r" % name) jail.stop(stop=stop, join=join)
jail.stop() if join:
if self.__db is not None: if self.__db is not None:
self.__db.delJail(jail) self.__db.delJail(jail)
del self.__jails[name] del self.__jails[name]
def startJail(self, name): def startJail(self, name):
with self.__lock: with self.__lock:
@ -237,8 +237,12 @@ class Server:
def stopAllJail(self): def stopAllJail(self):
logSys.info("Stopping all jails") logSys.info("Stopping all jails")
with self.__lock: with self.__lock:
# 1st stop all jails (signal and stop actions/filter thread):
for name in self.__jails.keys(): for name in self.__jails.keys():
self.delJail(name, stop=True) self.delJail(name, stop=True, join=False)
# 2nd wait for end and delete jails:
for name in self.__jails.keys():
self.delJail(name, stop=False, join=True)
def reloadJails(self, name, opts, begin): def reloadJails(self, name, opts, begin):
if begin: if begin:

View File

@ -52,8 +52,8 @@ class Utils():
"""Utilities provide diverse static methods like executes OS shell commands, etc. """Utilities provide diverse static methods like executes OS shell commands, etc.
""" """
DEFAULT_SLEEP_TIME = 0.1 DEFAULT_SLEEP_TIME = 2
DEFAULT_SLEEP_INTERVAL = 0.01 DEFAULT_SLEEP_INTERVAL = 0.2
class Cache(object): class Cache(object):