Merge pull request #1523 from sebres/_0.10/systemd-journal-path-gh-1408 with 0.10

pull/1523/head
sebres 2016-09-06 09:15:06 +02:00
commit 00c08f0cfa
4 changed files with 95 additions and 30 deletions

View File

@ -224,12 +224,15 @@ class DateDetector(object):
if timeMatch:
template = timeMatch[1]
if template is not None:
date = template.getDate(line, timeMatch[0])
if date is not None:
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, "Got time %f for \"%r\" using template %s",
date[0], date[1].group(), template.name)
return date
try:
date = template.getDate(line, timeMatch[0])
if date is not None:
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, "Got time %f for %r using template %s",
date[0], date[1].group(), template.name)
return date
except ValueError:
return None
with self.__lock:
for template in self.__templates:
try:
@ -237,7 +240,7 @@ class DateDetector(object):
if date is None:
continue
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, "Got time %f for \"%r\" using template %s",
logSys.log(logLevel, "Got time %f for %r using template %s",
date[0], date[1].group(), template.name)
return date
except ValueError: # pragma: no cover

View File

@ -29,6 +29,7 @@ import logging
import os
import re
import sys
import time
from .failmanager import FailManagerEmpty, FailManager
from .ipdns import DNSUtils, IPAddr
@ -85,6 +86,8 @@ class Filter(JailThread):
self.__lastDate = None
## External command
self.__ignoreCommand = False
## Error counter
self.__errors = 0
## Ticks counter
self.ticks = 0
@ -416,26 +419,41 @@ class Filter(JailThread):
def processLineAndAdd(self, line, date=None):
"""Processes the line for failures and populates failManager
"""
for element in self.processLine(line, date)[1]:
ip = element[1]
unixTime = element[2]
lines = element[3]
fail = {}
if len(element) > 4:
fail = element[4]
logSys.debug("Processing line with time:%s and ip:%s",
unixTime, ip)
if unixTime < MyTime.time() - self.getFindTime():
logSys.debug("Ignore line since time %s < %s - %s",
unixTime, MyTime.time(), self.getFindTime())
break
if self.inIgnoreIPList(ip, log_ignore=True):
continue
logSys.info(
"[%s] Found %s - %s", self.jail.name, ip, datetime.datetime.fromtimestamp(unixTime).strftime("%Y-%m-%d %H:%M:%S")
)
tick = FailTicket(ip, unixTime, lines, data=fail)
self.failManager.addFailure(tick)
try:
for element in self.processLine(line, date)[1]:
ip = element[1]
unixTime = element[2]
lines = element[3]
fail = {}
if len(element) > 4:
fail = element[4]
logSys.debug("Processing line with time:%s and ip:%s",
unixTime, ip)
if unixTime < MyTime.time() - self.getFindTime():
logSys.debug("Ignore line since time %s < %s - %s",
unixTime, MyTime.time(), self.getFindTime())
break
if self.inIgnoreIPList(ip, log_ignore=True):
continue
logSys.info(
"[%s] Found %s - %s", self.jail.name, ip, datetime.datetime.fromtimestamp(unixTime).strftime("%Y-%m-%d %H:%M:%S")
)
tick = FailTicket(ip, unixTime, lines, data=fail)
self.failManager.addFailure(tick)
# reset (halve) error counter (successfully processed line):
if self.__errors:
self.__errors //= 2
except Exception as e:
logSys.error("Failed to process line: %r, caught exception: %r", line, e,
exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
# incr error counter, stop processing (going idle) after 100th error :
self.__errors += 1
# sleep a little bit (to get around time-related errors):
time.sleep(self.sleeptime)
if self.__errors >= 100:
logSys.error("Too many errors at once (%s), going idle", self.__errors)
self.__errors //= 2
self.idle = True
##
# Returns true if the line should be ignored.

View File

@ -29,3 +29,19 @@ Jun 21 16:55:02 <auth.info> machine kernel: [ 970.699396] @vserver_demo test-
# -- the same as above with brackets as date ambit --
# failJSON: { "time": "2005-06-21T16:55:03", "match": true , "host": "192.0.2.3" }
[Jun 21 16:55:03] <auth.info> machine kernel: [ 970.699396] @vserver_demo test-demo(pam_unix)[13709] [ID 255 test] F2B: failure from 192.0.2.3
# -- wrong time direct in journal-line (used last known date):
# failJSON: { "time": "2005-06-21T16:55:03", "match": true , "host": "192.0.2.1" }
0000-12-30 00:00:00 server test-demo[47831]: F2B: failure from 192.0.2.1
# -- wrong time after newline in message (plist without escaped newlines):
# failJSON: { "match": false }
Jun 22 20:37:04 server test-demo[402]: writeToStorage plist={
# failJSON: { "match": false }
absentCircleWithNoReason = 0;
# failJSON: { "match": false }
applicationDate = "0000-12-30 00:00:00 +0000";
# failJSON: { "match": false }
}
# -- wrong time direct in journal-line (used last known date):
# failJSON: { "time": "2005-06-22T20:37:04", "match": true , "host": "192.0.2.2" }
0000-12-30 00:00:00 server test-demo[47831]: F2B: failure from 192.0.2.2

View File

@ -604,6 +604,28 @@ class LogFileMonitor(LogCaptureTestCase):
self.filter.getFailures(self.name)
self.assertLogged('Unable to open %s' % self.name)
def testErrorProcessLine(self):
self.filter.sleeptime /= 1000.0
## produce error with not callable processLine:
_org_processLine = self.filter.processLine
self.filter.processLine = None
for i in range(100):
self.file.write("line%d\n" % 1)
self.file.flush()
for i in range(100):
self.filter.getFailures(self.name)
self.assertLogged('Failed to process line:')
self.assertLogged('Too many errors at once')
self.pruneLog()
self.assertTrue(self.filter.idle)
self.filter.idle = False
self.filter.getFailures(self.name)
self.filter.processLine = _org_processLine
self.file.write("line%d\n" % 1)
self.file.flush()
self.filter.getFailures(self.name)
self.assertNotLogged('Failed to process line:')
def testRemovingFailRegex(self):
self.filter.delFailRegex(0)
self.assertNotLogged('Cannot remove regular expression. Index 0 is not valid')
@ -917,13 +939,17 @@ def get_monitor_failures_testcase(Filter_):
# and now remove the LogPath
self.filter.delLogPath(self.name)
# wait a bit for filter (backend-threads):
self.waitForTicks(2)
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
# so we should get no more failures detected
self.assertTrue(self.isEmpty(_maxWaitTime(10)))
self.assertTrue(self.isEmpty(10))
# but then if we add it back again (no seek to time in FileFilter's, because in file used the same time)
self.filter.addLogPath(self.name, autoSeek=False)
# wait a bit for filter (backend-threads):
self.waitForTicks(2)
# Tricky catch here is that it should get them from the
# tail written before, so let's not copy anything yet
#_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
@ -932,8 +958,10 @@ def get_monitor_failures_testcase(Filter_):
# now copy and get even more
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
# yoh: not sure why count here is not 9... TODO
self.assert_correct_last_attempt(GetFailures.FAILURES_01)#, count=9)
# check for 3 failures (not 9), because 6 already get above...
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
# total count in this test:
self.assertEqual(self.filter.failManager.getFailTotal(), 12)
MonitorFailures.__name__ = "MonitorFailures<%s>(%s)" \
% (Filter_.__name__, testclass_name) # 'tempfile')