Merge commit '0.8.6-90-g08564bd' into debian-devel

* commit '0.8.6-90-g08564bd':
  ENH: fail2ban-testcases -- custom logging format to ease debugging, non-0 exit code in case of failure
  ENH: Filter's testcases -- rename, del + list again --- a bit unstable, might still fail from time to time
  BF: pyinotify -- monitor the parent directory for IN_CREATE + process freshly added file (Closes gh-44)
  ENH: first working unittest for checking polling and inotify backends
  RF/BF: just use pyinotify.ThreadedNotifier thread in filterpyinotify
  RF: filter.py -- single readline in a loop
  ENH: FilterPoll -- adjusted some msgs + allowed to operate without jail (for testing)
  Minor additional comment to DEVELOP
  ENH: extended test LogfileMonitor
  ENH: add more verbosity levels to be controlled while running unittests
  Added few tests of FileFilter.  yet to place them into a Jail-ed execution test
  DOC: distilling some of server "design" into DEVELOP notes for common good
  ENH: minor, just  trailing spaces/tabs + reformated a string
  ENH: added a basic test for FilterPoll for detection of modifications
  clarified that the are existing test cases and the 'coming soon' is about creating new ones.
  Added beginnings of documentation for developers
  BF: usedns=no was not working at all
  RF: filtertestcase.py to put common testing into a helping subroutine
  ENH: be able to control verbosity from cmdline for fail2ban-testcases
pull/808/head
Yaroslav Halchenko 2012-07-19 14:06:14 -04:00
commit 09dd317b20
7 changed files with 899 additions and 278 deletions

156
DEVELOP Normal file
View File

@ -0,0 +1,156 @@
__ _ _ ___ _
/ _|__ _(_) |_ ) |__ __ _ _ _
| _/ _` | | |/ /| '_ \/ _` | ' \
|_| \__,_|_|_/___|_.__/\__,_|_||_|
================================================================================
How to develop for Fail2Ban
================================================================================
Fail2Ban uses GIT (http://git-scm.com/) distributed source control. This gives
each developer their own complete copy of the entire repository. Developers can
add and switch branches and commit changes when ever they want and then ask a
maintainer to merge their changes.
Fail2Ban uses GitHub (https://github.com/fail2ban/fail2ban) to manage access to
the Git repository. GitHub provides free hosting for open-source projects as
well as a web-based Git repository browser and an issue tracker.
If you are familiar with Python and you have a bug fix or a feature that you
would like to add to Fail2Ban, the best way to do so it to use the GitHub Pull
Request feature. You can find more details on the Fail2Ban wiki
(http://www.fail2ban.org/wiki/index.php/Get_Involved)
Testing
=======
Existing tests can be run by executing `fail2ban-testcases`.
Documentation about creating tests (when tests are required and some guidelines
for creating good tests) will be added soon.
Coding Standards
================
Coming Soon.
Design
======
Fail2Ban was initially developed with Python 2.3 (IIRC). It should
still be compatible with Python 2.4 and such compatibility assurance
makes code ... old-fashioned in many places (RF-Note). In 0.7 the
design went through major refactoring into client/server,
a-thread-per-jail design which made it a bit difficult to follow.
Below you can find a sketchy description of the main components of the
system to orient yourself better.
server/
------
Core classes hierarchy (feel welcome to draw a better/more complete
one)::
-> inheritance
+ delegation
* storage of multiple instances
RF-Note just a note which might be useful to address while doing RF
JailThread -> Filter -> FileFilter -> {FilterPoll, FilterPyinotify, ...}
| | * FileContainer
| + FailManager
| + DateDetector
\- -> Actions
* Actions
+ BanManager
Server
+ Jails
* Jail
+ Filter
* tickets (in __queue)
failmanager.py
~~~~~~~~~~~~~~
FailManager
Keeps track of failures, recorded as 'tickets'. All operations are
done via acquiring a lock
FailManagerEmpty(Exception)
raised by FailManager.toBan after reaching the list of tickets
(RF-Note: asks to become a generator ;) )
filter.py
~~~~~~~~~~
Filter(JailThread)
Wraps (non-threaded) FailManager (and proxies to it quite a bit),
and provides all primary logic for processing new lines, what IPs to
ignore, etc
.failManager [FailManager]
.dateDetector [DateDetector]
.__failRegex [list]
.__ignoreRegex [list]
Contains regular expressions for failures and ignores
.__findTime [numeric]
Used in `processLineAndAdd` to skip old lines
FileFilter(Filter):
Files-aware Filter
.__logPath [list]
keeps the tracked files (added 1-by-1 using addLogPath)
stored as FileContainer's
.getFailures
actually just returns
True
if managed to open and get lines (until empty)
False
if failed to open or absent container matching the filename
FileContainer
Adapter for a file to deal with log rotation.
.open,.close,.readline
RF-Note: readline returns "" with handler absent... shouldn't it be None?
.__pos
Keeps the position pointer
DNSUtils
Utility class for DNS and IP handling
RF-Note: convert to functions within a separate submodule
filter*.py
~~~~~~~~~~
Implementations of FileFilter's for specific backends. Derived
classes should provide an implementation of `run` and usually
override `addLogPath`, `delLogPath` methods. In run() method they all
one way or another provide
try:
while True:
ticket = self.failManager.toBan()
self.jail.putFailTicket(ticket)
except FailManagerEmpty:
self.failManager.cleanup(MyTime.time())
thus channeling "ban tickets" from their failManager to a
corresponding jail.
action.py
~~~~~~~~~
Takes care about executing start/check/ban/unban/stop commands

View File

@ -1,6 +1,8 @@
#!/usr/bin/python
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :
"""Script to run Fail2Ban tests battery
"""
# This file is part of Fail2Ban.
#
@ -19,8 +21,6 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Author: Cyril Jaquier
#
# $Revision$
__author__ = "Cyril Jaquier"
__version__ = "$Revision$"
@ -41,31 +41,73 @@ from testcases import datedetectortestcase
from testcases import actiontestcase
from server.mytime import MyTime
# Set the time to a fixed, known value
# Sun Aug 14 12:00:00 CEST 2005
from optparse import OptionParser, Option
# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
old_TZ = os.environ.get('TZ', None)
os.environ['TZ'] = 'Europe/Zurich'
time.tzset()
MyTime.setTime(1124013600)
def get_opt_parser():
# use module docstring for help output
p = OptionParser(
usage="%s [OPTIONS]\n" % sys.argv[0] + __doc__,
version="%prog " + version)
# Gets the instance of the logger.
p.add_options([
Option('-l', "--log-level", type="choice",
dest="log_level",
choices=('debug', 'info', 'warn', 'error', 'fatal'),
default=None,
help="Log level for the logger to use during running tests"),
])
return p
parser = get_opt_parser()
(opts, files) = parser.parse_args()
assert(not len(files))
#
# Logging
#
logSys = logging.getLogger("fail2ban")
# Numerical level of verbosity corresponding to a log "level"
verbosity = {'debug': 3,
'info': 2,
'warn': 1,
'error': 1,
'fatal': 0,
None: 1}[opts.log_level]
if opts.log_level is not None:
# so we had explicit settings
logSys.setLevel(getattr(logging, opts.log_level.upper()))
else:
# suppress the logging but it would leave unittests' progress dots
# ticking, unless like with '-l fatal' which would be silent
# unless error occurs
logSys.setLevel(getattr(logging, 'FATAL'))
# Add the default logging handler
stdout = logging.StreamHandler(sys.stdout)
# Custom log format for the verbose tests runs
if verbosity > 1:
stdout.setFormatter(logging.Formatter(' %(asctime)-15s %(thread)s %(message)s'))
else:
# just prefix with the space
stdout.setFormatter(logging.Formatter(' %(message)s'))
logSys.addHandler(stdout)
logSys.setLevel(logging.FATAL)
print "Fail2ban " + version + " test suite. Please wait..."
#
# Let know the version
#
if not opts.log_level or opts.log_level != 'fatal':
print "Fail2ban " + version + " test suite. Please wait..."
#
# Gather the tests
#
tests = unittest.TestSuite()
# Filter
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
# Server
#tests.addTest(unittest.makeSuite(servertestcase.StartStop))
#tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
@ -76,16 +118,70 @@ tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
# ClientReader
tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
from server.filterpoll import FilterPoll
filters = [FilterPoll] # always available
# Additional filters available only if external modules are available
# yoh: Since I do not know better way for parametric tests
# with good old unittest
try:
from server.filtergamin import FilterGamin
# That shmug plain doesn't work and stalls things ATM
# filters.append(FilterGamin)
except:
pass
try:
from server.filterpyinotify import FilterPyinotify
filters.append(FilterPyinotify)
except:
pass
for Filter_ in filters:
tests.addTest(unittest.makeSuite(
filtertestcase.get_monitor_failures_testcase(Filter_)))
# yoh: adding them (in particular datadetectortestscase before above
# get_monitor_failures_testcase's makes them fail (probably due
# to additional thread making it busier or smth like
# that)... TODO
# Filter
tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
# DateDetector
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
# Tests runner
testRunner = unittest.TextTestRunner()
testRunner.run(tests)
# Just for the sake of it reset the TZ
# yoh is planing to move all this into setup/teardown methods within tests
os.environ.pop('TZ')
if old_TZ:
os.environ['TZ'] = old_TZ
time.tzset()
#
# Run the tests
#
testRunner = unittest.TextTestRunner(verbosity=verbosity)
try:
# Set the time to a fixed, known value
# Sun Aug 14 12:00:00 CEST 2005
# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
old_TZ = os.environ.get('TZ', None)
os.environ['TZ'] = 'Europe/Zurich'
time.tzset()
MyTime.setTime(1124013600)
tests_results = testRunner.run(tests)
finally:
# Just for the sake of it reset the TZ
# yoh: move all this into setup/teardown methods within tests
os.environ.pop('TZ')
if old_TZ:
os.environ['TZ'] = old_TZ
time.tzset()
if not tests_results.wasSuccessful():
sys.exit(1)

View File

@ -18,7 +18,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Author: Cyril Jaquier
#
#
# $Revision$
__author__ = "Cyril Jaquier"
@ -53,8 +53,8 @@ class Filter(JailThread):
#
# Initialize the filter object with default values.
# @param jail the jail object
def __init__(self, jail):
def __init__(self, jail, useDns='warn'):
JailThread.__init__(self)
## The jail which contains this filter.
self.jail = jail
@ -65,12 +65,12 @@ class Filter(JailThread):
## The regular expression list with expressions to ignore.
self.__ignoreRegex = list()
## Use DNS setting
self.__useDns = "warn"
self.setUseDns(useDns)
## The amount of time to look back.
self.__findTime = 6000
## The ignore IP list.
self.__ignoreIpList = []
self.dateDetector = DateDetector()
self.dateDetector.addDefaultTemplate()
logSys.debug("Created %s" % self)
@ -85,14 +85,14 @@ class Filter(JailThread):
# The regular expression can also match any other pattern than failures
# and thus can be used for many purporse.
# @param value the regular expression
def addFailRegex(self, value):
try:
regex = FailRegex(value)
self.__failRegex.append(regex)
except RegexException, e:
logSys.error(e)
def delFailRegex(self, index):
try:
@ -100,102 +100,109 @@ class Filter(JailThread):
except IndexError:
logSys.error("Cannot remove regular expression. Index %d is not "
"valid" % index)
##
# Get the regular expression which matches the failure.
#
# @return the regular expression
def getFailRegex(self):
failRegex = list()
for regex in self.__failRegex:
failRegex.append(regex.getRegex())
return failRegex
##
# Add the regular expression which matches the failure.
#
# The regular expression can also match any other pattern than failures
# and thus can be used for many purporse.
# @param value the regular expression
def addIgnoreRegex(self, value):
try:
regex = Regex(value)
self.__ignoreRegex.append(regex)
except RegexException, e:
logSys.error(e)
def delIgnoreRegex(self, index):
try:
del self.__ignoreRegex[index]
except IndexError:
logSys.error("Cannot remove regular expression. Index %d is not "
"valid" % index)
##
# Get the regular expression which matches the failure.
#
# @return the regular expression
def getIgnoreRegex(self):
ignoreRegex = list()
for regex in self.__ignoreRegex:
ignoreRegex.append(regex.getRegex())
return ignoreRegex
##
# Set the Use DNS mode
# @param value the usedns mode
def setUseDns(self, value):
if isinstance(value, bool):
value = {True: 'yes', False: 'no'}[value]
value = value.lower() # must be a string by now
if not (value in ('yes', 'no', 'warn')):
logSys.error("Incorrect value %r specified for usedns. "
"Using safe 'no'" % (value,))
value = 'no'
logSys.debug("Setting usedns = %s for %s" % (value, self))
self.__useDns = value
##
# Get the usedns mode
# @return the usedns mode
def getUseDns(self):
return self.__useDns
##
# Set the time needed to find a failure.
#
# This value tells the filter how long it has to take failures into
# account.
# @param value the time
def setFindTime(self, value):
self.__findTime = value
self.failManager.setMaxTime(value)
logSys.info("Set findtime = %s" % value)
##
# Get the time needed to find a failure.
#
# @return the time
def getFindTime(self):
return self.__findTime
##
# Set the maximum retry value.
#
# @param value the retry value
def setMaxRetry(self, value):
self.failManager.setMaxRetry(value)
logSys.info("Set maxRetry = %s" % value)
##
# Get the maximum retry value.
#
# @return the retry value
def getMaxRetry(self):
return self.failManager.getMaxRetry()
##
# Main loop.
#
@ -205,38 +212,38 @@ class Filter(JailThread):
def run(self):
raise Exception("run() is abstract")
##
# Ban an IP - http://blogs.buanzo.com.ar/2009/04/fail2ban-patch-ban-ip-address-manually.html
# Arturo 'Buanzo' Busleiman <buanzo@buanzo.com.ar>
#
# to enable banip fail2ban-client BAN command
def addBannedIP(self, ip):
unixTime = time.time()
for i in xrange(self.failManager.getMaxRetry()):
self.failManager.addFailure(FailTicket(ip, unixTime))
return ip
##
# Add an IP/DNS to the ignore list.
#
# IP addresses in the ignore list are not taken into account
# when finding failures. CIDR mask and DNS are also accepted.
# @param ip IP address to ignore
def addIgnoreIP(self, ip):
logSys.debug("Add " + ip + " to ignore list")
self.__ignoreIpList.append(ip)
def delIgnoreIP(self, ip):
logSys.debug("Remove " + ip + " from ignore list")
self.__ignoreIpList.remove(ip)
def getIgnoreIP(self):
return self.__ignoreIpList
##
# Check if IP address/DNS is in the ignore list.
#
@ -244,7 +251,7 @@ class Filter(JailThread):
# mask in the ignore list.
# @param ip IP address
# @return True if IP address is in ignore list
def inIgnoreIPList(self, ip):
for i in self.__ignoreIpList:
# An empty string is always false
@ -268,9 +275,11 @@ class Filter(JailThread):
if a == b:
return True
return False
def processLine(self, line):
"""Split the time portion from log msg and return findFailures on them
"""
try:
# Decode line to UTF-8
l = line.decode('utf-8')
@ -290,6 +299,8 @@ class Filter(JailThread):
return self.findFailure(timeLine, logLine)
def processLineAndAdd(self, line):
"""Processes the line for failures and populates failManager
"""
for element in self.processLine(line):
ip = element[0]
unixTime = element[1]
@ -339,11 +350,11 @@ class Filter(JailThread):
# The failregex matched.
date = self.dateDetector.getUnixTime(timeLine)
if date == None:
logSys.debug("Found a match for '" + logLine +"' but no "
+ "valid date/time found for '"
+ timeLine + "'. Please contact the "
+ "author in order to get support for this "
+ "format")
logSys.debug("Found a match for %r but no valid date/time "
"found for %r. Please file a detailed issue on"
" https://github.com/fail2ban/fail2ban/issues "
"in order to get support for this format."
% (logLine, timeLine))
else:
try:
host = failRegex.getHost()
@ -356,7 +367,7 @@ class Filter(JailThread):
except RegexException, e:
logSys.error(e)
return failList
##
# Get the status of the filter.
@ -364,20 +375,20 @@ class Filter(JailThread):
# Get some informations about the filter state such as the total
# number of failures.
# @return a list with tuple
def status(self):
ret = [("Currently failed", self.failManager.size()),
("Total failed", self.failManager.getFailTotal())]
ret = [("Currently failed", self.failManager.size()),
("Total failed", self.failManager.getFailTotal())]
return ret
class FileFilter(Filter):
def __init__(self, jail):
Filter.__init__(self, jail)
def __init__(self, jail, **kwargs):
Filter.__init__(self, jail, **kwargs)
## The log file path.
self.__logPath = []
##
# Add a log file path
#
@ -386,12 +397,12 @@ class FileFilter(Filter):
def addLogPath(self, path, tail = False):
container = FileContainer(path, tail)
self.__logPath.append(container)
##
# Delete a log path
#
# @param path the log file to delete
def delLogPath(self, path):
for log in self.__logPath:
if log.getFileName() == path:
@ -402,35 +413,35 @@ class FileFilter(Filter):
# Get the log file path
#
# @return log file path
def getLogPath(self):
return self.__logPath
##
# Check whether path is already monitored.
#
# @param path The path
# @return True if the path is already monitored else False
def containsLogPath(self, path):
for log in self.__logPath:
if log.getFileName() == path:
return True
return False
def getFileContainer(self, path):
for log in self.__logPath:
if log.getFileName() == path:
return log
return None
##
# Gets all the failure in the log file.
#
# Gets all the failure in the log file which are newer than
# MyTime.time()-self.findTime. When a failure is detected, a FailTicket
# is created and is added to the FailManager.
def getFailures(self, filename):
container = self.getFileContainer(filename)
if container == None:
@ -443,18 +454,16 @@ class FileFilter(Filter):
logSys.error("Unable to open %s" % filename)
logSys.exception(e)
return False
line = container.readline()
while not line == "":
if not self._isActive():
# The jail has been stopped
while True:
line = container.readline()
if (line == "") or not self._isActive():
# The jail reached the bottom or has been stopped
break
self.processLineAndAdd(line)
# Read a new line.
line = container.readline()
container.close()
return True
def status(self):
ret = Filter.status(self)
path = [m.getFileName() for m in self.getLogPath()]
@ -478,7 +487,7 @@ except ImportError:
md5sum = md5.new
class FileContainer:
def __init__(self, filename, tail = False):
self.__filename = filename
self.__tail = tail
@ -499,10 +508,10 @@ class FileContainer:
self.__pos = 0
finally:
handler.close()
def getFileName(self):
return self.__filename
def open(self):
self.__handler = open(self.__filename)
# Set the file descriptor to be FD_CLOEXEC
@ -520,12 +529,12 @@ class FileContainer:
self.__pos = 0
# Sets the file pointer to the last position.
self.__handler.seek(self.__pos)
def readline(self):
if self.__handler == None:
return ""
return self.__handler.readline()
def close(self):
if not self.__handler == None:
# Saves the last position.
@ -545,9 +554,9 @@ class FileContainer:
import socket, struct
class DNSUtils:
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")
#@staticmethod
def dnsToIp(dns):
""" Convert a DNS into an IP address using the Python socket module.
@ -560,7 +569,7 @@ class DNSUtils:
% dns)
return list()
dnsToIp = staticmethod(dnsToIp)
#@staticmethod
def searchIP(text):
""" Search if an IP address if directly available and return
@ -572,7 +581,7 @@ class DNSUtils:
else:
return None
searchIP = staticmethod(searchIP)
#@staticmethod
def isValidIP(string):
""" Return true if str is a valid IP
@ -584,32 +593,31 @@ class DNSUtils:
except socket.error:
return False
isValidIP = staticmethod(isValidIP)
#@staticmethod
def textToIp(text, useDns):
""" Return the IP of DNS found in a given text.
"""
if useDns == "no":
return None
else:
ipList = list()
# Search for plain IP
plainIP = DNSUtils.searchIP(text)
if not plainIP is None:
plainIPStr = plainIP.group(0)
if DNSUtils.isValidIP(plainIPStr):
ipList.append(plainIPStr)
if not ipList:
# Try to get IP from possible DNS
ip = DNSUtils.dnsToIp(text)
for e in ip:
ipList.append(e)
if useDns == "warn":
logSys.warning("Determined IP using DNS Reverse Lookup: %s = %s",
text, ipList)
return ipList
ipList = list()
# Search for plain IP
plainIP = DNSUtils.searchIP(text)
if not plainIP is None:
plainIPStr = plainIP.group(0)
if DNSUtils.isValidIP(plainIPStr):
ipList.append(plainIPStr)
# If we are allowed to resolve -- give it a try if nothing was found
if useDns in ("yes", "warn") and not ipList:
# Try to get IP from possible DNS
ip = DNSUtils.dnsToIp(text)
ipList.extend(ip)
if ip and useDns == "warn":
logSys.warning("Determined IP using DNS Reverse Lookup: %s = %s",
text, ipList)
return ipList
textToIp = staticmethod(textToIp)
#@staticmethod
def cidr(i, n):
""" Convert an IP address string with a CIDR mask into a 32-bit
@ -619,14 +627,14 @@ class DNSUtils:
MASK = 0xFFFFFFFFL
return ~(MASK >> n) & MASK & DNSUtils.addr2bin(i)
cidr = staticmethod(cidr)
#@staticmethod
def addr2bin(string):
""" Convert a string IPv4 address into an unsigned integer.
"""
return struct.unpack("!L", socket.inet_aton(string))[0]
addr2bin = staticmethod(addr2bin)
#@staticmethod
def bin2addr(addr):
""" Convert a numeric IPv4 address into string n.n.n.n form.

View File

@ -17,14 +17,13 @@
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Author: Cyril Jaquier
#
# $Revision$
# Author: Cyril Jaquier, Yaroslav Halchenko
#
__author__ = "Cyril Jaquier"
__author__ = "Cyril Jaquier, Yaroslav Halchenko"
__version__ = "$Revision$"
__date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
__license__ = "GPL"
from failmanager import FailManagerEmpty
@ -50,7 +49,7 @@ class FilterPoll(FileFilter):
#
# Initialize the filter object with default values.
# @param jail the jail object
def __init__(self, jail):
FileFilter.__init__(self, jail)
self.__modified = False
@ -71,13 +70,13 @@ class FilterPoll(FileFilter):
self.__lastModTime[path] = 0
self.__file404Cnt[path] = 0
FileFilter.addLogPath(self, path, tail)
logSys.info("Added logfile = %s" % path)
logSys.info("Added logfile = %s" % path)
##
# Delete a log path
#
# @param path the log file to delete
def delLogPath(self, path):
if not self.containsLogPath(path):
logSys.error(path + " is not monitored")
@ -86,7 +85,7 @@ class FilterPoll(FileFilter):
del self.__file404Cnt[path]
FileFilter.delLogPath(self, path)
logSys.info("Removed logfile = %s" % path)
##
# Main loop.
#
@ -100,8 +99,9 @@ class FilterPoll(FileFilter):
if not self.getIdle():
# Get file modification
for container in self.getLogPath():
if self.isModified(container.getFileName()):
self.getFailures(container.getFileName())
filename = container.getFileName()
if self.isModified(filename):
self.getFailures(filename)
self.__modified = True
if self.__modified:
@ -116,7 +116,8 @@ class FilterPoll(FileFilter):
time.sleep(self.getSleepTime())
else:
time.sleep(self.getSleepTime())
logSys.debug(self.jail.getName() + ": filter terminated")
logSys.debug((self.jail and self.jail.getName() or "jailless") +
" filter terminated")
return True
##
@ -124,7 +125,7 @@ class FilterPoll(FileFilter):
#
# Checks if the log file has been modified using os.stat().
# @return True if log file has been modified
def isModified(self, filename):
try:
logStats = os.stat(filename)
@ -135,11 +136,15 @@ class FilterPoll(FileFilter):
logSys.debug(filename + " has been modified")
self.__lastModTime[filename] = logStats.st_mtime
return True
except OSError:
logSys.error("Unable to get stat on " + filename)
self.__file404Cnt[filename] = self.__file404Cnt[filename] + 1
except OSError, e:
logSys.error("Unable to get stat on %s because of: %s"
% (filename, e))
self.__file404Cnt[filename] += 1
if self.__file404Cnt[filename] > 2:
logSys.warn("Too much read error. Set the jail idle")
self.jail.setIdle(True)
logSys.warn("Too many errors. Setting the jail idle")
if self.jail:
self.jail.setIdle(True)
else:
logSys.warn("No jail is assigned to %s" % self)
self.__file404Cnt[filename] = 0
return False

View File

@ -29,6 +29,8 @@ from mytime import MyTime
import time, logging, pyinotify
from os.path import dirname, sep as pathsep
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.filter")
@ -77,10 +79,21 @@ class FilterPyinotify(FileFilter):
else:
wd = self.__monitor.add_watch(path, pyinotify.IN_MODIFY)
self.__watches.update(wd)
FileFilter.addLogPath(self, path, tail)
logSys.info("Added logfile = %s" % path)
##
path_dir = dirname(path)
if not (path_dir in self.__watches):
# we need to watch also the directory for IN_CREATE
self.__watches.update(
self.__monitor.add_watch(path_dir, pyinotify.IN_CREATE))
logSys.debug("Monitor also parent directory %s" % path_dir)
# sniff the file
self.callback(path)
##
# Delete a log path
#
# @param path the log file to delete
@ -98,45 +111,49 @@ class FilterPyinotify(FileFilter):
else:
logSys.error("Failed to remove watch on path: %s", path)
path_dir = dirname(path)
if not len([k for k in self.__watches
if k.startswith(path_dir + pathsep)]):
# Remove watches for the directory
# since there is no other monitored file under this directory
wdInt = self.__watches.pop(path_dir)
_ = self.__monitor.rm_watch(wdInt)
logSys.debug("Remove monitor for the parent directory %s" % path_dir)
##
# Main loop.
#
# This function is the main loop of the thread. It checks if the
# file has been modified and looks for failures.
# @return True when the thread exits nicely
# Since all detection is offloaded to pyinotifier -- no manual
# loop is necessary
def run(self):
self.setActive(True)
self.__notifier = pyinotify.ThreadedNotifier(self.__monitor,
ProcessPyinotify(self))
self.__notifier.start()
while self._isActive():
if not self.getIdle():
self.__notifier.process_events()
if self.__notifier.check_events():
self.__notifier.read_events()
else:
time.sleep(self.getSleepTime())
# Cleanup pyinotify
self.__cleanup()
logSys.debug(self.jail.getName() + ": filter terminated")
logSys.debug("pyinotifier started for %s." % self.jail.getName())
# TODO: verify that there is nothing really to be done for
# idle jails
return True
##
# Call super.stop() and then stop the 'Notifier'
def stop(self):
# Call super to set __isRunning
super(FilterPyinotify, self).stop()
# Now stop the Notifier, otherwise we're deadlocked
# Stop the notifier thread
self.__notifier.stop()
self.__notifier.join() # to not exit before notifier does
self.__cleanup() # for pedantic ones
##
# Deallocates the resources used by pyinotify.
def __cleanup(self):
del self.__notifier
del self.__monitor
self.__notifier = None
self.__monitor = None
class ProcessPyinotify(pyinotify.ProcessEvent):

View File

@ -0,0 +1,2 @@
Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2
Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2

View File

@ -17,29 +17,103 @@
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Author: Cyril Jaquier
#
# $Revision$
# Fail2Ban developers
__author__ = "Cyril Jaquier"
__version__ = "$Revision$"
__date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
__license__ = "GPL"
import unittest
import os
import time
import tempfile
from server.filterpoll import FilterPoll
from server.filter import FileFilter, DNSUtils
from server.failmanager import FailManager
from server.failmanager import FailManagerEmpty
#
# Useful helpers
#
def _killfile(f, name):
try:
f.close()
except:
pass
try:
os.unlink(name)
except:
pass
def _assert_equal_entries(utest, found, output, count=None):
"""Little helper to unify comparisons with the target entries
and report helpful failure reports instead of millions of seconds ;)
"""
utest.assertEqual(found[0], output[0]) # IP
utest.assertEqual(found[1], count or output[1]) # count
found_time, output_time = \
time.localtime(found[2]),\
time.localtime(output[2])
utest.assertEqual(found_time, output_time)
if len(output) > 3 and count is None: # match matches
# do not check if custom count (e.g. going through them twice)
utest.assertEqual(repr(found[3]), repr(output[3]))
def _assert_correct_last_attempt(utest, filter_, output, count=None):
"""Additional helper to wrap most common test case
Test filter to contain target ticket
"""
if isinstance(filter_, DummyJail):
ticket = filter_.getFailTicket()
else:
# when we are testing without jails
ticket = filter_.failManager.toBan()
attempts = ticket.getAttempt()
date = ticket.getTime()
ip = ticket.getIP()
matches = ticket.getMatches()
found = (ip, attempts, date, matches)
_assert_equal_entries(utest, found, output, count)
def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line=""):
"""Copy lines from one file to another (which might be already open)
Returns open fout
"""
if isinstance(fin, str):
fin = open(fin, 'r')
if isinstance(fout, str):
fout = open(fout, mode)
# Skip
for i in xrange(skip):
_ = fin.readline()
# Read/Write
i = 0
while n is None or i < n:
l = fin.readline()
if terminal_line is not None and l == terminal_line:
break
fout.write(l)
fout.flush()
i += 1
# to give other threads possibly some time to crunch
time.sleep(0.1)
return fout
#
# Actual tests
#
class IgnoreIP(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.__filter = FileFilter(None)
self.filter = FileFilter(None)
def tearDown(self):
"""Call after every test case."""
@ -47,20 +121,22 @@ class IgnoreIP(unittest.TestCase):
def testIgnoreIPOK(self):
ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99"
for ip in ipList:
self.__filter.addIgnoreIP(ip)
self.assertTrue(self.__filter.inIgnoreIPList(ip))
self.filter.addIgnoreIP(ip)
self.assertTrue(self.filter.inIgnoreIPList(ip))
# Test DNS
self.__filter.addIgnoreIP("www.epfl.ch")
self.assertTrue(self.__filter.inIgnoreIPList("128.178.50.12"))
self.filter.addIgnoreIP("www.epfl.ch")
self.assertTrue(self.filter.inIgnoreIPList("128.178.50.12"))
def testIgnoreIPNOK(self):
ipList = "", "999.999.999.999", "abcdef", "192.168.0."
for ip in ipList:
self.__filter.addIgnoreIP(ip)
self.assertFalse(self.__filter.inIgnoreIPList(ip))
self.filter.addIgnoreIP(ip)
self.assertFalse(self.filter.inIgnoreIPList(ip))
# Test DNS
self.__filter.addIgnoreIP("www.epfl.ch")
self.assertFalse(self.__filter.inIgnoreIPList("127.177.50.10"))
self.filter.addIgnoreIP("www.epfl.ch")
self.assertFalse(self.filter.inIgnoreIPList("127.177.50.10"))
class LogFile(unittest.TestCase):
@ -69,17 +145,304 @@ class LogFile(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.__filter = FilterPoll(None)
self.__filter.addLogPath(LogFile.FILENAME)
self.filter = FilterPoll(None)
self.filter.addLogPath(LogFile.FILENAME)
def tearDown(self):
"""Call after every test case."""
pass
#def testOpen(self):
# self.__filter.openLogFile(LogFile.FILENAME)
# self.filter.openLogFile(LogFile.FILENAME)
def testIsModified(self):
self.assertTrue(self.__filter.isModified(LogFile.FILENAME))
self.assertTrue(self.filter.isModified(LogFile.FILENAME))
class LogFileMonitor(unittest.TestCase):
"""Few more tests for FilterPoll API
"""
def setUp(self):
"""Call before every test case."""
self.filter = self.name = 'NA'
_, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
self.file = open(self.name, 'a')
self.filter = FilterPoll(None)
self.filter.addLogPath(self.name)
self.filter.setActive(True)
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
def tearDown(self):
_killfile(self.file, self.name)
pass
def isModified(self, delay=2.):
"""Wait up to `delay` sec to assure that it was modified or not
"""
time0 = time.time()
while time.time() < time0 + delay:
if self.filter.isModified(self.name):
return True
time.sleep(0.1)
return False
def notModified(self):
# shorter wait time for not modified status
return not self.isModified(0.4)
def testNewChangeViaIsModified(self):
# it is a brand new one -- so first we think it is modified
self.assertTrue(self.isModified())
# but not any longer
self.assertTrue(self.notModified())
self.assertTrue(self.notModified())
for i in range(4): # few changes
# unless we write into it
self.file.write("line%d\n" % i)
self.file.flush()
self.assertTrue(self.isModified())
self.assertTrue(self.notModified())
os.rename(self.name, self.name + '.old')
# we are not signaling as modified whenever
# it gets away
self.assertTrue(self.notModified())
f = open(self.name, 'a')
self.assertTrue(self.isModified())
self.assertTrue(self.notModified())
f.write("line%d\n" % i)
f.flush()
self.assertTrue(self.isModified())
self.assertTrue(self.notModified())
_killfile(f, self.name)
_killfile(self.name, self.name + '.old')
pass
def testNewChangeViaGetFailures_simple(self):
# suck in lines from this sample log file
self.filter.getFailures(self.name)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
# Now let's feed it with entries from the file
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
self.filter.getFailures(self.name)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
# and it should have not been enough
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
self.filter.getFailures(self.name)
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
def testNewChangeViaGetFailures_rewrite(self):
#
# if we rewrite the file at once
self.file.close()
_copy_lines_between_files(GetFailures.FILENAME_01, self.name)
self.filter.getFailures(self.name)
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
# What if file gets overridden
# yoh: skip so we skip those 2 identical lines which our
# filter "marked" as the known beginning, otherwise it
# would not detect "rotation"
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=3, mode='w')
self.filter.getFailures(self.name)
#self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
def testNewChangeViaGetFailures_move(self):
#
# if we move file into a new location while it has been open already
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
n=14, mode='w')
self.filter.getFailures(self.name)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
self.assertEqual(self.filter.failManager.getFailTotal(), 2)
# move aside, but leaving the handle still open...
os.rename(self.name, self.name + '.bak')
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
self.filter.getFailures(self.name)
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
from threading import Lock
class DummyJail(object):
"""A simple 'jail' to suck in all the tickets generated by Filter's
"""
def __init__(self):
self.lock = Lock()
self.queue = []
def __len__(self):
try:
self.lock.acquire()
return len(self.queue)
finally:
self.lock.release()
def putFailTicket(self, ticket):
try:
self.lock.acquire()
self.queue.append(ticket)
finally:
self.lock.release()
def getFailTicket(self):
try:
self.lock.acquire()
return self.queue.pop()
finally:
self.lock.release()
def getName(self):
return "DummyJail #%s with %d tickets" % (id(self), len(self))
def get_monitor_failures_testcase(Filter_):
"""Generator of TestCase's for different filters/backends
"""
class MonitorFailures(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.filter = self.name = 'NA'
_, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
self.file = open(self.name, 'a')
self.jail = DummyJail()
self.filter = Filter_(self.jail)
self.filter.addLogPath(self.name)
self.filter.setActive(True)
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
self.filter.start()
#print "D: started filter %s" % self.filter
def tearDown(self):
#print "D: SLEEPING A BIT"
#import time; time.sleep(5)
#print "D: TEARING DOWN"
self.filter.stop()
#print "D: WAITING FOR FILTER TO STOP"
self.filter.join() # wait for the thread to terminate
#print "D: KILLING THE FILE"
#_killfile(self.file, self.name)
pass
def __str__(self):
return "MonitorFailures%s(%s)" \
% (Filter_, hasattr(self, 'name') and self.name or 'tempfile')
def isFilled(self, delay=2.):
"""Wait up to `delay` sec to assure that it was modified or not
"""
time0 = time.time()
while time.time() < time0 + delay:
if len(self.jail):
return True
time.sleep(0.1)
return False
def isEmpty(self, delay=0.4):
# shorter wait time for not modified status
return not self.isFilled(delay)
def assert_correct_last_attempt(self, failures, count=None):
self.assertTrue(self.isFilled(10)) # give Filter a chance to react
_assert_correct_last_attempt(self, self.jail, failures, count=count)
def test_grow_file(self):
# suck in lines from this sample log file
#self.filter.getFailures(self.name)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
# Now let's feed it with entries from the file
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
# and our dummy jail is empty as well
self.assertFalse(len(self.jail))
# since it should have not been enough
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
self.isFilled(6)
# so we sleep for up to 2 sec for it not to become empty,
# and meanwhile pass to other thread(s) and filter should
# have gathered new failures and passed them into the
# DummyJail
self.assertEqual(len(self.jail), 1)
# and there should be no "stuck" ticket in failManager
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(len(self.jail), 0)
#return
# just for fun let's copy all of them again and see if that results
# in a new ban
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
def test_rewrite_file(self):
#
# if we rewrite the file at once
self.file.close()
_copy_lines_between_files(GetFailures.FILENAME_01, self.name)
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
# What if file gets overridden
# yoh: skip so we skip those 2 identical lines which our
# filter "marked" as the known beginning, otherwise it
# would not detect "rotation"
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
skip=3, mode='w')
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
def test_move_file(self):
#
# if we move file into a new location while it has been open already
self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
n=14, mode='w')
self.isFilled(6)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
self.assertEqual(self.filter.failManager.getFailTotal(), 2) # Fails with Poll from time to time
# move aside, but leaving the handle still open...
os.rename(self.name, self.name + '.bak')
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14)
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
def test_delLogPath(self):
# Smoke test for removing of the path from being watched
# basic full test
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
# and now remove the LogPath
self.filter.delLogPath(self.name)
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
# so we should get no more failures detected
self.assertTrue(self.isEmpty(2))
# but then if we add it back again
self.filter.addLogPath(self.name)
# Tricky catch here is that it should get them from the
# tail written before, so let's not copy anything yet
#_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
# we should detect the failures
self.assert_correct_last_attempt(GetFailures.FAILURES_01, count=6) # was needed if we write twice above
# now copy and get even more
_copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
# yoh: not sure why count here is not 9... TODO
self.assert_correct_last_attempt(GetFailures.FAILURES_01)#, count=9)
return MonitorFailures
class GetFailures(unittest.TestCase):
@ -88,148 +451,122 @@ class GetFailures(unittest.TestCase):
FILENAME_02 = "testcases/files/testcase02.log"
FILENAME_03 = "testcases/files/testcase03.log"
FILENAME_04 = "testcases/files/testcase04.log"
FILENAME_USEDNS = "testcases/files/testcase-usedns.log"
# so that they could be reused by other tests
FAILURES_01 = ('193.168.0.128', 3, 1124013599.0,
['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3)
def setUp(self):
"""Call before every test case."""
self.__filter = FileFilter(None)
self.__filter.setActive(True)
self.filter = FileFilter(None)
self.filter.setActive(True)
# TODO Test this
#self.__filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
#self.__filter.setTimePattern("%b %d %H:%M:%S")
#self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
#self.filter.setTimePattern("%b %d %H:%M:%S")
def tearDown(self):
"""Call after every test case."""
def _assertEqualEntries(self, found, output):
"""Little helper to unify comparisons with the target entries
and report helpful failure reports instead of millions of seconds ;)
"""
self.assertEqual(found[:2], output[:2])
found_time, output_time = \
time.localtime(found[2]),\
time.localtime(output[2])
self.assertEqual(found_time, output_time)
if len(found) > 3: # match matches
self.assertEqual(found[3], output[3])
def testGetFailures01(self):
output = ('193.168.0.128', 3, 1124013599.0,
['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128\n']*3)
self.filter.addLogPath(GetFailures.FILENAME_01)
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
self.filter.getFailures(GetFailures.FILENAME_01)
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
self.__filter.addLogPath(GetFailures.FILENAME_01)
self.__filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
self.__filter.getFailures(GetFailures.FILENAME_01)
ticket = self.__filter.failManager.toBan()
attempts = ticket.getAttempt()
date = ticket.getTime()
ip = ticket.getIP()
matches = ticket.getMatches()
found = (ip, attempts, date, matches)
self._assertEqualEntries(found, output)
def testGetFailures02(self):
output = ('141.3.81.106', 4, 1124013539.0,
['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2\n'
% m for m in 53, 54, 57, 58])
self.__filter.addLogPath(GetFailures.FILENAME_02)
self.__filter.addFailRegex("Failed .* from <HOST>")
self.__filter.getFailures(GetFailures.FILENAME_02)
ticket = self.__filter.failManager.toBan()
attempts = ticket.getAttempt()
date = ticket.getTime()
ip = ticket.getIP()
matches = ticket.getMatches()
found = (ip, attempts, date, matches)
self._assertEqualEntries(found, output)
self.filter.addLogPath(GetFailures.FILENAME_02)
self.filter.addFailRegex("Failed .* from <HOST>")
self.filter.getFailures(GetFailures.FILENAME_02)
_assert_correct_last_attempt(self, self.filter, output)
def testGetFailures03(self):
output = ('203.162.223.135', 6, 1124013544.0)
self.__filter.addLogPath(GetFailures.FILENAME_03)
self.__filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
self.__filter.getFailures(GetFailures.FILENAME_03)
ticket = self.__filter.failManager.toBan()
attempts = ticket.getAttempt()
date = ticket.getTime()
ip = ticket.getIP()
found = (ip, attempts, date)
self._assertEqualEntries(found, output)
self.filter.addLogPath(GetFailures.FILENAME_03)
self.filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
self.filter.getFailures(GetFailures.FILENAME_03)
_assert_correct_last_attempt(self, self.filter, output)
def testGetFailures04(self):
output = [('212.41.96.186', 4, 1124013600.0),
('212.41.96.185', 4, 1124013598.0)]
self.__filter.addLogPath(GetFailures.FILENAME_04)
self.__filter.addFailRegex("Invalid user .* <HOST>")
self.__filter.getFailures(GetFailures.FILENAME_04)
self.filter.addLogPath(GetFailures.FILENAME_04)
self.filter.addFailRegex("Invalid user .* <HOST>")
self.filter.getFailures(GetFailures.FILENAME_04)
try:
for i in range(2):
ticket = self.__filter.failManager.toBan()
attempts = ticket.getAttempt()
date = ticket.getTime()
ip = ticket.getIP()
found = (ip, attempts, date)
self.assertEqual(found, output[i])
for i, out in enumerate(output):
_assert_correct_last_attempt(self, self.filter, out)
except FailManagerEmpty:
pass
def testGetFailuresUseDNS(self):
# We should still catch failures with usedns = no ;-)
output_yes = ('192.0.43.10', 2, 1124013539.0,
['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2\n',
'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
output_no = ('192.0.43.10', 1, 1124013539.0,
['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
# Actually no exception would be raised -- it will be just set to 'no'
#self.assertRaises(ValueError,
# FileFilter, None, useDns='wrong_value_for_useDns')
for useDns, output in (('yes', output_yes),
('no', output_no),
('warn', output_yes)):
filter_ = FileFilter(None, useDns=useDns)
filter_.setActive(True)
filter_.failManager.setMaxRetry(1) # we might have just few failures
filter_.addLogPath(GetFailures.FILENAME_USEDNS)
filter_.addFailRegex("Failed .* from <HOST>")
filter_.getFailures(GetFailures.FILENAME_USEDNS)
_assert_correct_last_attempt(self, filter_, output)
def testGetFailuresMultiRegex(self):
output = ('141.3.81.106', 8, 1124013541.0)
self.__filter.addLogPath(GetFailures.FILENAME_02)
self.__filter.addFailRegex("Failed .* from <HOST>")
self.__filter.addFailRegex("Accepted .* from <HOST>")
self.__filter.getFailures(GetFailures.FILENAME_02)
ticket = self.__filter.failManager.toBan()
self.filter.addLogPath(GetFailures.FILENAME_02)
self.filter.addFailRegex("Failed .* from <HOST>")
self.filter.addFailRegex("Accepted .* from <HOST>")
self.filter.getFailures(GetFailures.FILENAME_02)
_assert_correct_last_attempt(self, self.filter, output)
attempts = ticket.getAttempt()
date = ticket.getTime()
ip = ticket.getIP()
found = (ip, attempts, date)
self._assertEqualEntries(found, output)
def testGetFailuresIgnoreRegex(self):
output = ('141.3.81.106', 8, 1124013541.0)
self.__filter.addLogPath(GetFailures.FILENAME_02)
self.__filter.addFailRegex("Failed .* from <HOST>")
self.__filter.addFailRegex("Accepted .* from <HOST>")
self.__filter.addIgnoreRegex("for roehl")
self.__filter.getFailures(GetFailures.FILENAME_02)
self.assertRaises(FailManagerEmpty, self.__filter.failManager.toBan)
self.filter.addLogPath(GetFailures.FILENAME_02)
self.filter.addFailRegex("Failed .* from <HOST>")
self.filter.addFailRegex("Accepted .* from <HOST>")
self.filter.addIgnoreRegex("for roehl")
self.filter.getFailures(GetFailures.FILENAME_02)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
class DNSUtilsTests(unittest.TestCase):
def testUseDns(self):
res = DNSUtils.textToIp('www.example.com', 'no')
self.assertEqual(res, None)
self.assertEqual(res, [])
res = DNSUtils.textToIp('www.example.com', 'warn')
self.assertEqual(res, ['192.0.43.10'])
res = DNSUtils.textToIp('www.example.com', 'yes')
self.assertEqual(res, ['192.0.43.10'])
def testTextToIp(self):
# Test hostnames
hostnames = [