Merge pull request #1778 from sebres/fix-pyinotify-dir-rotate

Fix pyinotify directory-based log-rotate
pull/1668/merge
Serg G. Brester 8 years ago committed by GitHub
commit a73b0c0064

@ -895,7 +895,8 @@ class FileFilter(Filter):
# see http://python.org/dev/peps/pep-3151/ # see http://python.org/dev/peps/pep-3151/
except IOError as e: except IOError as e:
logSys.error("Unable to open %s", filename) logSys.error("Unable to open %s", filename)
logSys.exception(e) if e.errno != 2: # errno.ENOENT
logSys.exception(e)
return False return False
except OSError as e: # pragma: no cover - requires race condition to tigger this except OSError as e: # pragma: no cover - requires race condition to tigger this
logSys.error("Error opening %s", filename) logSys.error("Error opening %s", filename)

@ -143,6 +143,8 @@ class FilterGamin(FileFilter):
# Desallocates the resources used by Gamin. # Desallocates the resources used by Gamin.
def __cleanup(self): def __cleanup(self):
if not self.monitor:
return
for filename in self.getLogPaths(): for filename in self.getLogPaths():
self.monitor.stop_watch(filename) self.monitor.stop_watch(filename)
self.monitor = None self.monitor = None

@ -25,19 +25,20 @@ __license__ = "GPL"
import logging import logging
from distutils.version import LooseVersion from distutils.version import LooseVersion
import os
from os.path import dirname, sep as pathsep from os.path import dirname, sep as pathsep
import pyinotify import pyinotify
from .failmanager import FailManagerEmpty from .failmanager import FailManagerEmpty
from .filter import FileFilter from .filter import FileFilter
from .mytime import MyTime from .mytime import MyTime, time
from .utils import Utils from .utils import Utils
from ..helpers import getLogger from ..helpers import getLogger
if not hasattr(pyinotify, '__version__') \ if not hasattr(pyinotify, '__version__') \
or LooseVersion(pyinotify.__version__) < '0.8.3': or LooseVersion(pyinotify.__version__) < '0.8.3': # pragma: no cover
raise ImportError("Fail2Ban requires pyinotify >= 0.8.3") raise ImportError("Fail2Ban requires pyinotify >= 0.8.3")
# Verify that pyinotify is functional on this system # Verify that pyinotify is functional on this system
@ -45,13 +46,18 @@ if not hasattr(pyinotify, '__version__') \
try: try:
manager = pyinotify.WatchManager() manager = pyinotify.WatchManager()
del manager del manager
except Exception as e: except Exception as e: # pragma: no cover
raise ImportError("Pyinotify is probably not functional on this system: %s" raise ImportError("Pyinotify is probably not functional on this system: %s"
% str(e)) % str(e))
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
# Override pyinotify default logger/init-handler:
def _pyinotify_logger_init(): # pragma: no cover
return logSys
pyinotify._logger_init = _pyinotify_logger_init
pyinotify.log = logSys
## ##
# Log reader class. # Log reader class.
@ -72,30 +78,57 @@ class FilterPyinotify(FileFilter):
self.__modified = False self.__modified = False
# Pyinotify watch manager # Pyinotify watch manager
self.__monitor = pyinotify.WatchManager() self.__monitor = pyinotify.WatchManager()
self.__watches = dict() self.__watchFiles = dict()
self.__watchDirs = dict()
self.__pending = dict()
self.__pendingChkTime = 0
self.__pendingNextTime = 0
logSys.debug("Created FilterPyinotify") logSys.debug("Created FilterPyinotify")
def callback(self, event, origin=''): def callback(self, event, origin=''):
logSys.log(7, "[%s] %sCallback for Event: %s", self.jailName, origin, event) logSys.log(7, "[%s] %sCallback for Event: %s", self.jailName, origin, event)
path = event.pathname path = event.pathname
# check watching of this path:
isWF = False
isWD = path in self.__watchDirs
if not isWD and path in self.__watchFiles:
isWF = True
assumeNoDir = False
if event.mask & ( pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO ): if event.mask & ( pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO ):
# skip directories altogether # skip directories altogether
if event.mask & pyinotify.IN_ISDIR: if event.mask & pyinotify.IN_ISDIR:
logSys.debug("Ignoring creation of directory %s", path) logSys.debug("Ignoring creation of directory %s", path)
return return
# check if that is a file we care about # check if that is a file we care about
if not path in self.__watches: if not isWF:
logSys.debug("Ignoring creation of %s we do not monitor", path) logSys.debug("Ignoring creation of %s we do not monitor", path)
return return
else: self._refreshWatcher(path)
# we need to substitute the watcher with a new one, so first elif event.mask & (pyinotify.IN_IGNORED | pyinotify.IN_MOVE_SELF | pyinotify.IN_DELETE_SELF):
# remove old one assumeNoDir = event.mask & (pyinotify.IN_MOVE_SELF | pyinotify.IN_DELETE_SELF)
self._delFileWatcher(path) # fix pyinotify behavior with '-unknown-path' (if target not watched also):
# place a new one if (assumeNoDir and
self._addFileWatcher(path) path.endswith('-unknown-path') and not isWF and not isWD
):
path = path[:-len('-unknown-path')]
isWD = path in self.__watchDirs
# watch was removed for some reasons (log-rotate?):
if isWD and (assumeNoDir or not os.path.isdir(path)):
self._addPending(path, event, isDir=True)
elif not isWF:
for logpath in self.__watchDirs:
if logpath.startswith(path + pathsep) and (assumeNoDir or not os.path.isdir(logpath)):
self._addPending(logpath, event, isDir=True)
if isWF and not os.path.isfile(path):
self._addPending(path, event)
return
# do nothing if idle: # do nothing if idle:
if self.idle: if self.idle:
return return
# be sure we process a file:
if not isWF:
logSys.debug("Ignoring event (%s) of %s we do not monitor", event.maskname, path)
return
self._process_file(path) self._process_file(path)
def _process_file(self, path): def _process_file(self, path):
@ -104,23 +137,97 @@ class FilterPyinotify(FileFilter):
TODO -- RF: TODO -- RF:
this is a common logic and must be shared/provided by FileFilter this is a common logic and must be shared/provided by FileFilter
""" """
self.getFailures(path) if not self.idle:
self.getFailures(path)
try:
while True:
ticket = self.failManager.toBan()
self.jail.putFailTicket(ticket)
except FailManagerEmpty:
self.failManager.cleanup(MyTime.time())
self.__modified = False
def _addPending(self, path, reason, isDir=False):
if path not in self.__pending:
self.__pending[path] = [Utils.DEFAULT_SLEEP_INTERVAL, isDir];
self.__pendingNextTime = 0
if isinstance(reason, pyinotify.Event):
reason = [reason.maskname, reason.pathname]
logSys.log(logging.MSG, "Log absence detected (possibly rotation) for %s, reason: %s of %s",
path, *reason)
def _delPending(self, path):
try: try:
while True: del self.__pending[path]
ticket = self.failManager.toBan() except KeyError: pass
self.jail.putFailTicket(ticket)
except FailManagerEmpty: def _checkPending(self):
self.failManager.cleanup(MyTime.time()) if not self.__pending:
self.__modified = False return
ntm = time.time()
if ntm < self.__pendingNextTime:
return
found = {}
minTime = 60
for path, (retardTM, isDir) in self.__pending.iteritems():
if ntm - self.__pendingChkTime < retardTM:
if minTime > retardTM: minTime = retardTM
continue
chkpath = os.path.isdir if isDir else os.path.isfile
if not chkpath(path): # not found - prolong for next time
if retardTM < 60: retardTM *= 2
if minTime > retardTM: minTime = retardTM
self.__pending[path][0] = retardTM
continue
logSys.log(logging.MSG, "Log presence detected for %s %s",
"directory" if isDir else "file", path)
found[path] = isDir
for path in found:
try:
del self.__pending[path]
except KeyError: pass
self.__pendingChkTime = time.time()
self.__pendingNextTime = self.__pendingChkTime + minTime
# process now because we've missed it in monitoring:
for path, isDir in found.iteritems():
# refresh monitoring of this:
self._refreshWatcher(path, isDir=isDir)
if isDir:
# check all files belong to this dir:
for logpath in self.__watchFiles:
if logpath.startswith(path + pathsep):
# if still no file - add to pending, otherwise refresh and process:
if not os.path.isfile(logpath):
self._addPending(logpath, ('FROM_PARDIR', path))
else:
self._refreshWatcher(logpath)
self._process_file(logpath)
else:
# process (possibly no old events for it from watcher):
self._process_file(path)
def _refreshWatcher(self, oldPath, newPath=None, isDir=False):
if not newPath: newPath = oldPath
# we need to substitute the watcher with a new one, so first
# remove old one and then place a new one
if not isDir:
self._delFileWatcher(oldPath)
self._addFileWatcher(newPath)
else:
self._delDirWatcher(oldPath)
self._addDirWatcher(newPath)
def _addFileWatcher(self, path): def _addFileWatcher(self, path):
# we need to watch also the directory for IN_CREATE
self._addDirWatcher(dirname(path))
# add file watcher:
wd = self.__monitor.add_watch(path, pyinotify.IN_MODIFY) wd = self.__monitor.add_watch(path, pyinotify.IN_MODIFY)
self.__watches.update(wd) self.__watchFiles.update(wd)
logSys.debug("Added file watcher for %s", path) logSys.debug("Added file watcher for %s", path)
def _delFileWatcher(self, path): def _delFileWatcher(self, path):
try: try:
wdInt = self.__watches.pop(path) wdInt = self.__watchFiles.pop(path)
wd = self.__monitor.rm_watch(wdInt) wd = self.__monitor.rm_watch(wdInt)
if wd[wdInt]: if wd[wdInt]:
logSys.debug("Removed file watcher for %s", path) logSys.debug("Removed file watcher for %s", path)
@ -129,19 +236,30 @@ class FilterPyinotify(FileFilter):
pass pass
return False return False
def _addDirWatcher(self, path_dir):
# Add watch for the directory:
if path_dir not in self.__watchDirs:
self.__watchDirs.update(
self.__monitor.add_watch(path_dir, pyinotify.IN_CREATE |
pyinotify.IN_MOVED_TO | pyinotify.IN_MOVE_SELF |
pyinotify.IN_DELETE_SELF | pyinotify.IN_ISDIR))
logSys.debug("Added monitor for the parent directory %s", path_dir)
def _delDirWatcher(self, path_dir):
# Remove watches for the directory:
try:
wdInt = self.__watchDirs.pop(path_dir)
self.__monitor.rm_watch(wdInt)
except KeyError: # pragma: no cover
pass
logSys.debug("Removed monitor for the parent directory %s", path_dir)
## ##
# Add a log file path # Add a log file path
# #
# @param path log file path # @param path log file path
def _addLogPath(self, path): def _addLogPath(self, path):
path_dir = dirname(path)
if not (path_dir in self.__watches):
# we need to watch also the directory for IN_CREATE
self.__watches.update(
self.__monitor.add_watch(path_dir, pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO))
logSys.debug("Added monitor for the parent directory %s", path_dir)
self._addFileWatcher(path) self._addFileWatcher(path)
self._process_file(path) self._process_file(path)
@ -151,40 +269,32 @@ class FilterPyinotify(FileFilter):
# @param path the log file to delete # @param path the log file to delete
def _delLogPath(self, path): def _delLogPath(self, path):
if not self._delFileWatcher(path): if not self._delFileWatcher(path): # pragma: no cover
logSys.error("Failed to remove watch on path: %s", path) logSys.error("Failed to remove watch on path: %s", path)
self._delPending(path)
path_dir = dirname(path) path_dir = dirname(path)
if not len([k for k in self.__watches for k in self.__watchFiles:
if k.startswith(path_dir + pathsep)]): if k.startswith(path_dir + pathsep):
path_dir = None
break
if path_dir:
# Remove watches for the directory # Remove watches for the directory
# since there is no other monitored file under this directory # since there is no other monitored file under this directory
try: self._delDirWatcher(path_dir)
wdInt = self.__watches.pop(path_dir) self._delPending(path_dir)
self.__monitor.rm_watch(wdInt)
except KeyError: # pragma: no cover
pass
logSys.debug("Removed monitor for the parent directory %s", path_dir)
# pyinotify.ProcessEvent default handler: # pyinotify.ProcessEvent default handler:
def __process_default(self, event): def __process_default(self, event):
try: try:
self.callback(event, origin='Default ') self.callback(event, origin='Default ')
except Exception as e: except Exception as e: # pragma: no cover
logSys.error("Error in FilterPyinotify callback: %s", logSys.error("Error in FilterPyinotify callback: %s",
e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG) e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
# incr common error counter:
self.commonError()
self.ticks += 1 self.ticks += 1
# slow check events while idle:
def __check_events(self, *args, **kwargs):
if self.idle:
if Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime
):
pass
self.ticks += 1
return pyinotify.ThreadedNotifier.check_events(self.__notifier, *args, **kwargs)
## ##
# Main loop. # Main loop.
# #
@ -195,25 +305,59 @@ class FilterPyinotify(FileFilter):
prcevent = pyinotify.ProcessEvent() prcevent = pyinotify.ProcessEvent()
prcevent.process_default = self.__process_default prcevent.process_default = self.__process_default
## timeout for pyinotify must be set in milliseconds (our time values are floats contain seconds) ## timeout for pyinotify must be set in milliseconds (our time values are floats contain seconds)
self.__notifier = pyinotify.ThreadedNotifier(self.__monitor, self.__notifier = pyinotify.Notifier(self.__monitor,
prcevent, timeout=self.sleeptime * 1000) prcevent, timeout=self.sleeptime * 1000)
self.__notifier.check_events = self.__check_events
self.__notifier.start()
logSys.debug("[%s] filter started (pyinotifier)", self.jailName) logSys.debug("[%s] filter started (pyinotifier)", self.jailName)
while self.active:
try:
# slow check events while idle:
if self.idle:
if Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime
):
if not self.active: break
# default pyinotify handling using Notifier:
self.__notifier.process_events()
if Utils.wait_for(lambda: not self.active or self.__notifier.check_events(), self.sleeptime):
if not self.active: break
self.__notifier.read_events()
# check pending files/dirs (logrotate ready):
if not self.idle:
self._checkPending()
except Exception as e: # pragma: no cover
if not self.active: # if not active - error by stop...
break
logSys.error("Caught unhandled exception in main cycle: %r", e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
# incr common error counter:
self.commonError()
self.ticks += 1
logSys.debug("[%s] filter exited (pyinotifier)", self.jailName)
self.__notifier = None
return True return True
## ##
# Call super.stop() and then stop the 'Notifier' # Call super.stop() and then stop the 'Notifier'
def stop(self): def stop(self):
if self.__notifier: # stop the notifier
self.__notifier.stop()
# stop filter thread:
super(FilterPyinotify, self).stop() super(FilterPyinotify, self).stop()
# Stop the notifier thread self.join()
self.__notifier.stop()
## ##
# Wait for exit with cleanup. # Wait for exit with cleanup.
def join(self): def join(self):
self.join = lambda *args: 0
self.__cleanup() self.__cleanup()
super(FilterPyinotify, self).join() super(FilterPyinotify, self).join()
logSys.debug("[%s] filter terminated (pyinotifier)", self.jailName) logSys.debug("[%s] filter terminated (pyinotifier)", self.jailName)
@ -223,6 +367,6 @@ class FilterPyinotify(FileFilter):
def __cleanup(self): def __cleanup(self):
if self.__notifier: if self.__notifier:
self.__notifier.join() # to not exit before notifier does if Utils.wait_for(lambda: not self.__notifier, self.sleeptime * 10):
self.__notifier = None self.__notifier = None
self.__monitor = None self.__monitor = None

@ -43,7 +43,7 @@ from ..server.failmanager import FailManagerEmpty
from ..server.ipdns import DNSUtils, IPAddr from ..server.ipdns import DNSUtils, IPAddr
from ..server.mytime import MyTime from ..server.mytime import MyTime
from ..server.utils import Utils, uni_decode from ..server.utils import Utils, uni_decode
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, LogCaptureTestCase from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_tmpdir, LogCaptureTestCase
from .dummyjail import DummyJail from .dummyjail import DummyJail
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
@ -942,18 +942,21 @@ def get_monitor_failures_testcase(Filter_):
skip=3, mode='w') skip=3, mode='w')
self.assert_correct_last_attempt(GetFailures.FAILURES_01) self.assert_correct_last_attempt(GetFailures.FAILURES_01)
def test_move_file(self): def _wait4failures(self, count=2):
# if we move file into a new location while it has been open already
self.file.close()
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
n=14, mode='w')
# Poll might need more time # Poll might need more time
self.assertTrue(self.isEmpty(_maxWaitTime(5)), self.assertTrue(self.isEmpty(_maxWaitTime(5)),
"Queue must be empty but it is not: %s." "Queue must be empty but it is not: %s."
% (', '.join([str(x) for x in self.jail.queue]))) % (', '.join([str(x) for x in self.jail.queue])))
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan) self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
Utils.wait_for(lambda: self.filter.failManager.getFailTotal() == 2, _maxWaitTime(10)) Utils.wait_for(lambda: self.filter.failManager.getFailTotal() >= count, _maxWaitTime(10))
self.assertEqual(self.filter.failManager.getFailTotal(), 2) self.assertEqual(self.filter.failManager.getFailTotal(), count)
def test_move_file(self):
# if we move file into a new location while it has been open already
self.file.close()
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
n=14, mode='w')
self._wait4failures()
# move aside, but leaving the handle still open... # move aside, but leaving the handle still open...
os.rename(self.name, self.name + '.bak') os.rename(self.name, self.name + '.bak')
@ -967,6 +970,34 @@ def get_monitor_failures_testcase(Filter_):
self.assert_correct_last_attempt(GetFailures.FAILURES_01) self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 6) self.assertEqual(self.filter.failManager.getFailTotal(), 6)
@with_tmpdir
def test_move_dir(self, tmp):
self.file.close()
self.filter.delLogPath(self.name)
# if we rename parent dir into a new location (simulate directory-base log rotation)
tmpsub1 = os.path.join(tmp, "1")
tmpsub2 = os.path.join(tmp, "2")
os.mkdir(tmpsub1)
self.name = os.path.join(tmpsub1, os.path.basename(self.name))
os.close(os.open(self.name, os.O_CREAT|os.O_APPEND)); # create empty file
self.filter.addLogPath(self.name, autoSeek=False)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w')
self.file.close()
self._wait4failures(1)
# rotate whole directory: rename directory 1 as 2:
os.rename(tmpsub1, tmpsub2)
os.mkdir(tmpsub1)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w')
self.file.close()
self._wait4failures(2)
# stop before tmpdir deleted (just prevents many monitor events)
self.filter.stop()
def _test_move_into_file(self, interim_kill=False): def _test_move_into_file(self, interim_kill=False):
# if we move a new file into the location of an old (monitored) file # if we move a new file into the location of an old (monitored) file
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, _copy_lines_between_files(GetFailures.FILENAME_01, self.name,

Loading…
Cancel
Save