rewritten idle handling for all filters, test cases extended to work in idle mode also (should not find/process failures)

pull/1523/head
sebres 2016-09-01 19:38:40 +02:00
parent 23c14acec0
commit d974ebd581
7 changed files with 83 additions and 39 deletions

View File

@ -85,6 +85,8 @@ class Filter(JailThread):
self.__lastDate = None self.__lastDate = None
## External command ## External command
self.__ignoreCommand = False self.__ignoreCommand = False
## Ticks counter
self.ticks = 0
self.dateDetector = DateDetector() self.dateDetector = DateDetector()
self.dateDetector.addDefaultTemplate() self.dateDetector.addDefaultTemplate()

View File

@ -69,6 +69,7 @@ class FilterGamin(FileFilter):
logSys.debug("File changed: " + path) logSys.debug("File changed: " + path)
self.__modified = True self.__modified = True
self.ticks += 1
self._process_file(path) self._process_file(path)
def _process_file(self, path): def _process_file(self, path):
@ -105,7 +106,7 @@ class FilterGamin(FileFilter):
def _handleEvents(self): def _handleEvents(self):
ret = False ret = False
mon = self.monitor mon = self.monitor
while mon and mon.event_pending(): while mon and mon.event_pending() > 0:
mon.handle_events() mon.handle_events()
mon = self.monitor mon = self.monitor
ret = True ret = True
@ -122,9 +123,14 @@ class FilterGamin(FileFilter):
# Gamin needs a loop to collect and dispatch events # Gamin needs a loop to collect and dispatch events
while self.active: while self.active:
if self.idle: if self.idle:
time.sleep(self.sleeptime) # wait a little bit here for not idle, to prevent hi-load:
continue if not Utils.wait_for(lambda: not self.idle,
self.sleeptime * 10, self.sleeptime
):
self.ticks += 1
continue
Utils.wait_for(self._handleEvents, self.sleeptime) Utils.wait_for(self._handleEvents, self.sleeptime)
self.ticks += 1
logSys.debug(self.jail.name + ": filter terminated") logSys.debug(self.jail.name + ": filter terminated")
return True return True

View File

@ -30,8 +30,9 @@ import time
from .failmanager import FailManagerEmpty from .failmanager import FailManagerEmpty
from .filter import FileFilter from .filter import FileFilter
from .mytime import MyTime from .mytime import MyTime
from .utils import Utils
from ..helpers import getLogger from ..helpers import getLogger
from ..server.utils import Utils
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
@ -101,8 +102,9 @@ class FilterPoll(FileFilter):
self.idle, self.getLogCount()) self.idle, self.getLogCount())
if self.idle: if self.idle:
if not Utils.wait_for(lambda: not self.idle, if not Utils.wait_for(lambda: not self.idle,
self.sleeptime * 100, self.sleeptime self.sleeptime * 10, self.sleeptime
): ):
self.ticks += 1
continue continue
# Get file modification # Get file modification
modlst = [] modlst = []
@ -111,6 +113,7 @@ class FilterPoll(FileFilter):
self.getFailures(filename) self.getFailures(filename)
self.__modified = True self.__modified = True
self.ticks += 1
if self.__modified: if self.__modified:
try: try:
while True: while True:

View File

@ -32,6 +32,7 @@ import pyinotify
from .failmanager import FailManagerEmpty from .failmanager import FailManagerEmpty
from .filter import FileFilter from .filter import FileFilter
from .mytime import MyTime from .mytime import MyTime
from .utils import Utils
from ..helpers import getLogger from ..helpers import getLogger
@ -92,7 +93,9 @@ class FilterPyinotify(FileFilter):
self._delFileWatcher(path) self._delFileWatcher(path)
# place a new one # place a new one
self._addFileWatcher(path) self._addFileWatcher(path)
# do nothing if idle:
if self.idle:
return
self._process_file(path) self._process_file(path)
def _process_file(self, path): def _process_file(self, path):
@ -159,6 +162,25 @@ class FilterPyinotify(FileFilter):
self.__monitor.rm_watch(wdInt) self.__monitor.rm_watch(wdInt)
logSys.debug("Removed monitor for the parent directory %s", path_dir) logSys.debug("Removed monitor for the parent directory %s", path_dir)
# pyinotify.ProcessEvent default handler:
def __process_default(self, event):
try:
self.callback(event, origin='Default ')
except Exception as e:
logSys.error("Error in FilterPyinotify callback: %s",
e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
self.ticks += 1
# slow check events while idle:
def __check_events(self, *args, **kwargs):
if self.idle:
if Utils.wait_for(lambda: not self.idle,
self.sleeptime * 10, self.sleeptime
):
pass
self.ticks += 1
return pyinotify.ThreadedNotifier.check_events(self.__notifier, *args, **kwargs)
## ##
# Main loop. # Main loop.
# #
@ -166,12 +188,13 @@ class FilterPyinotify(FileFilter):
# loop is necessary # loop is necessary
def run(self): def run(self):
prcevent = pyinotify.ProcessEvent()
prcevent.process_default = self.__process_default
self.__notifier = pyinotify.ThreadedNotifier(self.__monitor, self.__notifier = pyinotify.ThreadedNotifier(self.__monitor,
ProcessPyinotify(self)) prcevent, timeout=self.sleeptime)
self.__notifier.check_events = self.__check_events
self.__notifier.start() self.__notifier.start()
logSys.debug("pyinotifier started for %s.", self.jail.name) logSys.debug("pyinotifier started for %s.", self.jail.name)
# TODO: verify that there is nothing really to be done for
# idle jails
return True return True
## ##
@ -191,22 +214,3 @@ class FilterPyinotify(FileFilter):
def __cleanup(self): def __cleanup(self):
self.__notifier = None self.__notifier = None
self.__monitor = None self.__monitor = None
class ProcessPyinotify(pyinotify.ProcessEvent):
def __init__(self, FileFilter, **kargs):
#super(ProcessPyinotify, self).__init__(**kargs)
# for some reason root class _ProcessEvent is old-style (is
# not derived from object), so to play safe let's avoid super
# for now, and call superclass directly
pyinotify.ProcessEvent.__init__(self, **kargs)
self.__FileFilter = FileFilter
pass
# just need default, since using mask on watch to limit events
def process_default(self, event):
try:
self.__FileFilter.callback(event, origin='Default ')
except Exception as e:
logSys.error("Error in FilterPyinotify callback: %s",
e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)

View File

@ -33,6 +33,7 @@ if LooseVersion(getattr(journal, '__version__', "0")) < '204':
from .failmanager import FailManagerEmpty from .failmanager import FailManagerEmpty
from .filter import JournalFilter from .filter import JournalFilter
from .mytime import MyTime from .mytime import MyTime
from .utils import Utils
from ..helpers import getLogger, logging, splitwords from ..helpers import getLogger, logging, splitwords
# Gets the instance of the logger. # Gets the instance of the logger.
@ -61,7 +62,6 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
self.__journal = journal.Reader(**jrnlargs) self.__journal = journal.Reader(**jrnlargs)
self.__matches = [] self.__matches = []
self.setDatePattern(None) self.setDatePattern(None)
self.ticks = 0
logSys.debug("Created FilterSystemd") logSys.debug("Created FilterSystemd")
@staticmethod @staticmethod
@ -277,6 +277,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
if not Utils.wait_for(lambda: not self.idle, if not Utils.wait_for(lambda: not self.idle,
self.sleeptime * 10, self.sleeptime self.sleeptime * 10, self.sleeptime
): ):
self.ticks += 1
continue continue
self.__modified = 0 self.__modified = 0
while self.active: while self.active:

View File

@ -720,6 +720,12 @@ class CommonMonitorTestCase(unittest.TestCase):
""" """
return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay)) return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay))
def waitForTicks(self, ticks, delay=2.):
"""Wait up to `delay` sec to assure that it was modified or not
"""
last_ticks = self.filter.ticks
return Utils.wait_for(lambda: self.filter.ticks >= last_ticks + ticks, _maxWaitTime(delay))
def get_monitor_failures_testcase(Filter_): def get_monitor_failures_testcase(Filter_):
"""Generator of TestCase's for different filters/backends """Generator of TestCase's for different filters/backends
@ -776,6 +782,16 @@ def get_monitor_failures_testcase(Filter_):
_assert_correct_last_attempt(self, self.jail, failures, count=count) _assert_correct_last_attempt(self, self.jail, failures, count=count)
def test_grow_file(self): def test_grow_file(self):
self._test_grow_file()
def test_grow_file_in_idle(self):
self._test_grow_file(True)
def _test_grow_file(self, idle=False):
if idle:
self.filter.sleeptime /= 100.0
self.filter.idle = True
self.waitForTicks(1)
# suck in lines from this sample log file # suck in lines from this sample log file
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
@ -787,6 +803,10 @@ def get_monitor_failures_testcase(Filter_):
# since it should have not been enough # since it should have not been enough
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5) _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
if idle:
self.waitForTicks(1)
self.assertTrue(self.isEmpty(1))
return
self.assertTrue(self.isFilled(10)) self.assertTrue(self.isFilled(10))
# so we sleep a bit for it not to become empty, # so we sleep a bit for it not to become empty,
# and meanwhile pass to other thread(s) and filter should # and meanwhile pass to other thread(s) and filter should
@ -958,12 +978,6 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.filter.join() # wait for the thread to terminate self.filter.join() # wait for the thread to terminate
pass pass
def waitForTicks(self, ticks, delay=2.):
"""Wait up to `delay` sec to assure that it was modified or not
"""
last_ticks = self.filter.ticks
return Utils.wait_for(lambda: self.filter.ticks >= last_ticks + ticks, _maxWaitTime(delay))
def _getRuntimeJournal(self): def _getRuntimeJournal(self):
# retrieve current system journal path # retrieve current system journal path
tmp = Utils.executeCmd('find "$(systemd-path system-runtime-logs)" -name system.journal', tmp = Utils.executeCmd('find "$(systemd-path system-runtime-logs)" -name system.journal',
@ -1006,8 +1020,18 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.assertEqual(attempts, test_attempts) self.assertEqual(attempts, test_attempts)
def test_grow_file(self): def test_grow_file(self):
self._test_grow_file()
def test_grow_file_in_idle(self):
self._test_grow_file(True)
def _test_grow_file(self, idle=False):
self._initFilter() self._initFilter()
self.filter.start() self.filter.start()
if idle:
self.filter.sleeptime /= 100.0
self.filter.idle = True
self.waitForTicks(1)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
# Now let's feed it with entries from the file # Now let's feed it with entries from the file
@ -1020,6 +1044,10 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
_copy_lines_to_journal( _copy_lines_to_journal(
self.test_file, self.journal_fields, skip=2, n=3) self.test_file, self.journal_fields, skip=2, n=3)
if idle:
self.waitForTicks(1)
self.assertTrue(self.isEmpty(1))
return
self.assertTrue(self.isFilled(10)) self.assertTrue(self.isFilled(10))
# so we sleep for up to 6 sec for it not to become empty, # so we sleep for up to 6 sec for it not to become empty,
# and meanwhile pass to other thread(s) and filter should # and meanwhile pass to other thread(s) and filter should

View File

@ -430,16 +430,16 @@ def gatherTests(regexps=None, opts=None):
# because gamin can be very slow on some platforms (and can produce many failures # because gamin can be very slow on some platforms (and can produce many failures
# with fast sleep interval) - skip it by fast run: # with fast sleep interval) - skip it by fast run:
if unittest.F2B.fast or unittest.F2B.no_gamin: # pragma: no cover if unittest.F2B.fast or unittest.F2B.no_gamin: # pragma: no cover
raise Exception('Skip, fast: %s, no_gamin: %s' % (unittest.F2B.fast, unittest.F2B.no_gamin)) raise ImportError('Skip, fast: %s, no_gamin: %s' % (unittest.F2B.fast, unittest.F2B.no_gamin))
from ..server.filtergamin import FilterGamin from ..server.filtergamin import FilterGamin
filters.append(FilterGamin) filters.append(FilterGamin)
except Exception, e: # pragma: no cover except ImportError, e: # pragma: no cover
logSys.warning("Skipping gamin backend testing. Got exception '%s'" % e) logSys.warning("Skipping gamin backend testing. Got exception '%s'" % e)
try: try:
from ..server.filterpyinotify import FilterPyinotify from ..server.filterpyinotify import FilterPyinotify
filters.append(FilterPyinotify) filters.append(FilterPyinotify)
except Exception, e: # pragma: no cover except ImportError, e: # pragma: no cover
logSys.warning("I: Skipping pyinotify backend testing. Got exception '%s'" % e) logSys.warning("I: Skipping pyinotify backend testing. Got exception '%s'" % e)
for Filter_ in filters: for Filter_ in filters:
@ -448,7 +448,7 @@ def gatherTests(regexps=None, opts=None):
try: # pragma: systemd no cover try: # pragma: systemd no cover
from ..server.filtersystemd import FilterSystemd from ..server.filtersystemd import FilterSystemd
tests.addTest(unittest.makeSuite(filtertestcase.get_monitor_failures_journal_testcase(FilterSystemd))) tests.addTest(unittest.makeSuite(filtertestcase.get_monitor_failures_journal_testcase(FilterSystemd)))
except Exception, e: # pragma: no cover except ImportError, e: # pragma: no cover
logSys.warning("I: Skipping systemd backend testing. Got exception '%s'" % e) logSys.warning("I: Skipping systemd backend testing. Got exception '%s'" % e)
# Server test for logging elements which break logging used to support # Server test for logging elements which break logging used to support