Fixed detection of directory-based log-rotation of pyinotify backend.

If directory moved and the target is not watched path, so the monitoring of it could not be continued.

Now fixed with pending files await a monitoring if there (resp. its directories) appears again (respawn).

Closes gh-1769
pull/1778/head
sebres 2017-05-12 16:51:08 +02:00
parent 31627b796b
commit e340d0d2b2
3 changed files with 124 additions and 17 deletions

View File

@ -895,7 +895,8 @@ class FileFilter(Filter):
# see http://python.org/dev/peps/pep-3151/
except IOError as e:
logSys.error("Unable to open %s", filename)
logSys.exception(e)
if e.errno != 2:
logSys.exception(e)
return False
except OSError as e: # pragma: no cover - requires race condition to tigger this
logSys.error("Error opening %s", filename)

View File

@ -25,13 +25,14 @@ __license__ = "GPL"
import logging
from distutils.version import LooseVersion
import os
from os.path import dirname, sep as pathsep
import pyinotify
from .failmanager import FailManagerEmpty
from .filter import FileFilter
from .mytime import MyTime
from .mytime import MyTime, time
from .utils import Utils
from ..helpers import getLogger
@ -52,6 +53,11 @@ except Exception as e:
# Gets the instance of the logger.
logSys = getLogger(__name__)
# Override pyinotify default logger/init-handler:
def _pyinotify_logger_init():
return logSys
pyinotify._logger_init = _pyinotify_logger_init
pyinotify.log = logSys
##
# Log reader class.
@ -73,6 +79,9 @@ class FilterPyinotify(FileFilter):
# Pyinotify watch manager
self.__monitor = pyinotify.WatchManager()
self.__watches = dict()
self.__pending = dict()
self.__pendingChkTime = 0
self.__pendingNextTime = 0
logSys.debug("Created FilterPyinotify")
def callback(self, event, origin=''):
@ -84,15 +93,36 @@ class FilterPyinotify(FileFilter):
logSys.debug("Ignoring creation of directory %s", path)
return
# check if that is a file we care about
if not path in self.__watches:
if path not in self.__watches:
logSys.debug("Ignoring creation of %s we do not monitor", path)
return
else:
# we need to substitute the watcher with a new one, so first
# remove old one
self._delFileWatcher(path)
# place a new one
self._addFileWatcher(path)
self._refreshFileWatcher(path)
elif event.mask & (pyinotify.IN_IGNORED | pyinotify.IN_MOVE_SELF | pyinotify.IN_DELETE_SELF):
# fix pyinotify behavior with '-unknown-path' (if target not watched also):
if (event.mask & pyinotify.IN_MOVE_SELF and path not in self.__watches and
path.endswith('-unknown-path')
):
path = path[:-len('-unknown-path')]
# watch was removed for some reasons (log-rotate?):
if not os.path.isfile(path):
for log in self.getLogs():
logpath = log.getFileName()
if logpath.startswith(path):
# check exists (rotated):
if event.mask & pyinotify.IN_MOVE_SELF or not os.path.isfile(logpath):
self._addPendingFile(logpath, event)
else:
path = logpath
break
if path not in self.__watches:
logSys.debug("Ignoring event of %s we do not monitor", path)
return
if not os.path.isfile(path):
if self.containsLogPath(path):
self._addPendingFile(path, event)
logSys.debug("Ignoring watching/rotation event (%s) for %s", event.maskname, path)
return
self._refreshFileWatcher(path)
# do nothing if idle:
if self.idle:
return
@ -113,6 +143,44 @@ class FilterPyinotify(FileFilter):
self.failManager.cleanup(MyTime.time())
self.__modified = False
def _addPendingFile(self, path, event):
if path not in self.__pending:
self.__pending[path] = self.sleeptime / 10;
logSys.log(logging.MSG, "Log absence detected (possibly rotation) for %s, reason: %s of %s",
path, event.maskname, event.pathname)
def _checkPendingFiles(self):
if self.__pending:
ntm = time.time()
if ntm > self.__pendingNextTime:
found = {}
minTime = 60
for path, retardTM in self.__pending.iteritems():
if ntm - self.__pendingChkTime > retardTM:
if not os.path.isfile(path): # not found - prolong for next time
if retardTM < 60: retardTM *= 2
if minTime > retardTM: minTime = retardTM
self.__pending[path] = retardTM
continue
found[path] = 1
self._refreshFileWatcher(path)
for path in found:
try:
del self.__pending[path]
except KeyError: pass
self.__pendingChkTime = time.time()
self.__pendingNextTime = self.__pendingChkTime + minTime
# process now because we'he missed it in monitoring:
for path in found:
self._process_file(path)
def _refreshFileWatcher(self, oldPath, newPath=None):
# we need to substitute the watcher with a new one, so first
# remove old one
self._delFileWatcher(oldPath)
# place a new one
self._addFileWatcher(newPath or oldPath)
def _addFileWatcher(self, path):
wd = self.__monitor.add_watch(path, pyinotify.IN_MODIFY)
self.__watches.update(wd)
@ -139,7 +207,9 @@ class FilterPyinotify(FileFilter):
if not (path_dir in self.__watches):
# we need to watch also the directory for IN_CREATE
self.__watches.update(
self.__monitor.add_watch(path_dir, pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO))
self.__monitor.add_watch(path_dir, pyinotify.IN_CREATE |
pyinotify.IN_MOVED_TO | pyinotify.IN_MOVE_SELF |
pyinotify.IN_DELETE_SELF | pyinotify.IN_ISDIR))
logSys.debug("Added monitor for the parent directory %s", path_dir)
self._addFileWatcher(path)
@ -177,6 +247,9 @@ class FilterPyinotify(FileFilter):
# slow check events while idle:
def __check_events(self, *args, **kwargs):
# check pending files (logrotate ready):
self._checkPendingFiles()
if self.idle:
if Utils.wait_for(lambda: not self.active or not self.idle,
self.sleeptime * 10, self.sleeptime
@ -209,6 +282,7 @@ class FilterPyinotify(FileFilter):
super(FilterPyinotify, self).stop()
# Stop the notifier thread
self.__notifier.stop()
self.__notifier.stop = lambda *args: 0; # prevent dual stop
##
# Wait for exit with cleanup.

View File

@ -43,7 +43,7 @@ from ..server.failmanager import FailManagerEmpty
from ..server.ipdns import DNSUtils, IPAddr
from ..server.mytime import MyTime
from ..server.utils import Utils, uni_decode
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, LogCaptureTestCase
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_tmpdir, LogCaptureTestCase
from .dummyjail import DummyJail
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
@ -942,17 +942,21 @@ def get_monitor_failures_testcase(Filter_):
skip=3, mode='w')
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
def test_move_file(self):
# if we move file into a new location while it has been open already
self.file.close()
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
n=14, mode='w')
def _wait4failures(self, count=2):
# Poll might need more time
self.assertTrue(self.isEmpty(_maxWaitTime(5)),
"Queue must be empty but it is not: %s."
% (', '.join([str(x) for x in self.jail.queue])))
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
Utils.wait_for(lambda: self.filter.failManager.getFailTotal() == 2, _maxWaitTime(10))
Utils.wait_for(lambda: self.filter.failManager.getFailTotal() >= count, _maxWaitTime(10))
self.assertEqual(self.filter.failManager.getFailTotal(), count)
def test_move_file(self):
# if we move file into a new location while it has been open already
self.file.close()
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
n=14, mode='w')
self._wait4failures()
self.assertEqual(self.filter.failManager.getFailTotal(), 2)
# move aside, but leaving the handle still open...
@ -967,6 +971,34 @@ def get_monitor_failures_testcase(Filter_):
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
@with_tmpdir
def test_move_dir(self, tmp):
self.file.close()
self.filter.delLogPath(self.name)
# if we rename parent dir into a new location (simulate directory-base log rotation)
tmpsub1 = os.path.join(tmp, "1")
tmpsub2 = os.path.join(tmp, "2")
os.mkdir(tmpsub1)
self.name = os.path.join(tmpsub1, os.path.basename(self.name))
os.close(os.open(self.name, os.O_CREAT|os.O_APPEND)); # create empty file
self.filter.addLogPath(self.name, autoSeek=False)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w')
self.file.close()
self._wait4failures(1)
# rotate whole directory: rename directory 1 as 2:
os.rename(tmpsub1, tmpsub2)
os.mkdir(tmpsub1)
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=12, n=1, mode='w')
self.file.close()
self._wait4failures(2)
# stop before tmpdir deleted (just prevents many monitor events)
self.filter.stop()
def _test_move_into_file(self, interim_kill=False):
# if we move a new file into the location of an old (monitored) file
_copy_lines_between_files(GetFailures.FILENAME_01, self.name,