Merge pull request #1583 from sebres/_0.10/fix-datedetector-grave-fix-v2

0.10/datedetector grave fix
pull/1580/merge
Serg G. Brester 2016-11-28 17:37:36 +01:00 committed by GitHub
commit 8d9fe5d3da
61 changed files with 1184 additions and 530 deletions

View File

@ -13,6 +13,15 @@ TODO: implementing of options resp. other tasks from PR #1346
### Fixes
* [Grave] memory leak's fixed (gh-1277, gh-1234)
* [Grave] Misleading date patterns defined more precisely (using extended syntax
`%Ex[mdHMS]` for exact two-digit match or e. g. `%ExY` as more precise year
pattern, within same century of last year and the next 3 years)
* [Grave] extends date detector template with distance (position of match in
log-line), to prevent grave collision using (re)ordered template list (e.g.
find-spot of wrong date-match inside foreign input, misleading date patterns
by ambiguous formats, etc.)
* Distance collision check always prefers template with shortest distance
(left for right) if date pattern is not anchored
* Tricky bug fix: last position of log file will be never retrieved (gh-795),
because of CASCADE all log entries will be deleted from logs table together with jail,
if used "INSERT OR REPLACE" statement
@ -39,6 +48,11 @@ TODO: implementing of options resp. other tasks from PR #1346
- if fail2ban running as systemd-service, for logging to the systemd-journal,
the `logtarget` could be set to STDOUT
- value `logtarget` for system targets allowed also in lowercase (stdout, stderr, syslog, etc.)
* Fixed UTC/GMT named time zone, using `%Z` and `%z` patterns
(special case with 0 zone offset, see gh-1575)
* `filter.d/freeswitch.conf`
- Optional prefixes (server, daemon, dual time) if systemd daemon logs used (gh-1548)
- User part rewritten to accept IPv6 resp. domain after "@" (gh-1548)
### New Features
* IPv6 support:
@ -135,6 +149,22 @@ fail2ban-client set loglevel INFO
nevertheless, as long as one jail was successful configured (gh-1619)
Message about wrong jail configuration logged in client log (stdout, systemd
journal etc.) and in server log with error level
* More precise date template handling (WARNING: theoretically possible incompatibilities):
- datedetector rewritten more strict as earlier;
- default templates can be specified exacter using prefix/suffix syntax (via `datepattern`);
- more as one date pattern can be specified using option `datepattern` now
(new-line separated);
- some default options like `datepattern` can be specified directly in
section `[Definition]`, that avoids contrary usage of unnecessarily `[Init]`
section, because of performance (each extra section costs time);
- option `datepattern` can be specified in jail also (e. g. jails without filters
or custom log-format, new-line separated for multiple patterns);
- if first unnamed group specified in pattern, only this will be cut out from
search log-line (e. g.: `^date:[({DATE})]` will cut out only datetime match
pattern, and leaves `date:[] ...` for searching in filter);
- faster match and fewer searching of appropriate templates
(DateDetector.matchTime calls rarer DateTemplate.matchDate now);
- several standard filters extended with exact prefixed or anchored date templates;
* fail2ban-testcases:
- `assertLogged` extended with parameter wait (to wait up to specified timeout,
before we throw assert exception) + test cases rewritten using that

View File

@ -9,6 +9,8 @@ failregex = ^\s[+-]\d{4} \S+ \d{3}0[1-9] \S+ <HOST>:\d+ [\d.]+:\d+ \d+ \d+ \d+\s
ignoreregex =
datepattern = {^LN-BEG}
# DEV Notes:
# http://www.3proxy.ru/howtoe.asp#ERRORS indicates that 01-09 are
# all authentication problems (%E field)

View File

@ -14,6 +14,9 @@ failregex = ^<HOST> -.*"(GET|POST|HEAD).*HTTP.*"(?:%(badbots)s|%(badbotscustom)s
ignoreregex =
datepattern = ^[^\[]*\[({DATE})
{^LN-BEG}
# DEV Notes:
# List of bad bots fetched from http://www.user-agents.org
# Generated on Thu Nov 7 14:23:35 PST 2013 by files/gen_badbots.

View File

@ -10,6 +10,8 @@ after = apache-common.local
_apache_error_client = \[\] \[(:?error|\S+:\S+)\]( \[pid \d+(:\S+ \d+)?\])? \[client <HOST>(:\d{1,5})?\]
datepattern = {^LN-BEG}
# Common prefix for [error] apache messages which also would include <HOST>
# Depending on the version it could be
# 2.2: [Sat Jun 01 11:23:08 2013] [error] [client 1.2.3.4]

View File

@ -6,6 +6,8 @@ failregex = ^<HOST> .*Googlebot.*$
ignoreregex =
datepattern = ^[^\[]*\[({DATE})
{^LN-BEG}
# DEV Notes:
#

View File

@ -3,16 +3,15 @@
#
# The knocking request must have a referer.
[INCLUDES]
before = apache-common.conf
[Definition]
failregex = ^<HOST> - \w+ \[\] "GET <knocking_url> HTTP/1\.[01]" 200 \d+ ".*" "[^-].*"$
ignoreregex =
datepattern = ^[^\[]*\[({DATE})
{^LN-BEG}
[Init]
knocking_url = /knocking/

View File

@ -20,6 +20,9 @@ failregex = ^(:? \[SSL-out\])? <HOST> max sender authentication errors \(\d{,3}\
ignoreregex =
datepattern = {^LN-BEG}%%b-%%d-%%Exy %%H:%%M:%%S
{^LN-BEG}
# DEV Notes:
# V1 Examples matches:
# Apr-27-13 02:33:09 Blocking 217.194.197.97 - too much AUTH errors (41);

View File

@ -31,6 +31,7 @@ failregex = ^%(__prefix_line)s%(log_prefix)s Registration from '[^']*' failed fo
ignoreregex =
datepattern = {^LN-BEG}
# Author: Xavier Devlamynck / Daniel Black
#

View File

@ -61,4 +61,7 @@ __prefix_line = %(__date_ambit)s?\s*(?:%(__bsd_syslog_verbose)s\s+)?(?:%(__hostn
# pam_ldap
__pam_auth = pam_unix
# standardly all formats using prefix have line-begin anchored date:
datepattern = {^LN-BEG}
# Author: Yaroslav Halchenko

View File

@ -8,8 +8,6 @@ failregex = ^: Bad Rcon: "rcon \d+ "\S+" sv_contact ".*?"" from "<HOST>:\d+"$
ignoreregex =
[Init]
datepattern = ^L %%d/%%m/%%Y - %%H:%%M:%%S

View File

@ -15,5 +15,7 @@ failregex = ^%(__prefix_line)sLOGIN FAILED, user=.*, ip=\[<HOST>\]$
ignoreregex =
datepattern = {^LN-BEG}
# Author: Christoph Haas
# Modified by: Cyril Jaquier

View File

@ -13,7 +13,6 @@ failregex = ^: \'<HOST>\' \d{1,3} failed login attempt(s)?. \s*
ignoreregex =
[Init]
datepattern = ^%%Y:%%m:%%d-%%H:%%M:%%S
#

View File

@ -17,10 +17,11 @@ failregex = ^%(__prefix_line)s(%(__pam_auth)s(\(dovecot:auth\))?:)?\s+authentica
ignoreregex =
[Init]
journalmatch = _SYSTEMD_UNIT=dovecot.service
datepattern = {^LN-BEG}TAI64N
{^LN-BEG}
# DEV Notes:
# * the first regex is essentially a copy of pam-generic.conf
# * Probably doesn't do dovecot sql/ldap backends properly (resolved in edit 21/03/2016)

View File

@ -25,8 +25,6 @@ failregex = ^=INFO REPORT==== ===\nI\(<0\.\d+\.0>:ejabberd_c2s:\d+\) : \([^)]+\
#
ignoreregex =
[Init]
# "maxlines" is number of log lines to buffer for multi-line regex searches
maxlines = 2
@ -35,3 +33,8 @@ maxlines = 2
# Values: TEXT
#
journalmatch =
#datepattern = ^(?:=[^=]+={3,} )?({DATE})
# explicit time format using prefix =...==== and no date in second string begins with I(...)...
datepattern = ^(?:=[^=]+={3,} )?(%%ExY(?P<_sep>[-/.])%%m(?P=_sep)%%d[T ]%%H:%%M:%%S(?:[.,]%%f)?(?:\s*%%z)?)
^I\(()**

View File

@ -8,13 +8,26 @@
# IP addresses on your LAN.
#
[INCLUDES]
# Read common prefixes. If any customizations available -- read them from
# common.local
before = common.conf
[Definition]
failregex = ^\.\d+ \[WARNING\] sofia_reg\.c:\d+ SIP auth (failure|challenge) \((REGISTER|INVITE)\) on sofia profile \'[^']+\' for \[.*\] from ip <HOST>$
^\.\d+ \[WARNING\] sofia_reg\.c:\d+ Can't find user \[\d+@\d+\.\d+\.\d+\.\d+\] from <HOST>$
_daemon = freeswitch
# Prefix contains common prefix line (server, daemon, etc.) and 2 datetimes if used systemd backend
_pref_line = ^%(__prefix_line)s(?:\d+-\d+-\d+ \d+:\d+:\d+\.\d+)?
failregex = %(_pref_line)s \[WARNING\] sofia_reg\.c:\d+ SIP auth (failure|challenge) \((REGISTER|INVITE)\) on sofia profile \'[^']+\' for \[[^\]]*\] from ip <HOST>$
%(_pref_line)s \[WARNING\] sofia_reg\.c:\d+ Can't find user \[[^@]+@[^\]]+\] from <HOST>$
ignoreregex =
datepattern = {^LN-BEG}
# Author: Rupa SChomaker, soapee01, Daniel Black
# https://freeswitch.org/confluence/display/FREESWITCH/Fail2Ban
# Thanks to Jim on mailing list of samples and guidance

View File

@ -17,6 +17,9 @@ failregex = ^.*\nWARNING: Authentication attempt from <HOST> for user "[^"]*" fa
#
ignoreregex =
[Init]
# "maxlines" is number of log lines to buffer for multi-line regex searches
maxlines = 2
datepattern = ^%%b %%d, %%ExY %%I:%%M:%%S %%p
^WARNING:()**
{^LN-BEG}

View File

@ -9,8 +9,6 @@ failregex = ^ SMTP Spam attack detected from <HOST>,
ignoreregex =
[Init]
datepattern = ^\[%%d/%%b/%%Y %%H:%%M:%%S\]
# DEV NOTES:

View File

@ -13,7 +13,7 @@ before = common.conf
_daemon = monit
# Regexp for previous (accessing monit httpd) and new (access denied) versions
failregex = ^\[[A-Z]+\s+\]\s*error\s*:\s*Warning:\s+Client '<HOST>' supplied (?:unknown user '[^']+'|wrong password for user '[^']*') accessing monit httpd$
failregex = ^\[\s*\]\s*error\s*:\s*Warning:\s+Client '<HOST>' supplied (?:unknown user '[^']+'|wrong password for user '[^']*') accessing monit httpd$
^%(__prefix_line)s\w+: access denied -- client <HOST>: (?:unknown user '[^']+'|wrong password for user '[^']*'|empty password)$
# Ignore login with empty user (first connect, no user specified)

View File

@ -15,13 +15,14 @@ _daemon = murmurd
# variable in your server config file (murmur.ini / mumble-server.ini).
_usernameregex = [^>]+
_prefix = <W>[\n\s]*(\.\d{3})?\s+\d+ => <\d+:%(_usernameregex)s\(-1\)> Rejected connection from <HOST>:\d+:
_prefix = \s+\d+ => <\d+:%(_usernameregex)s\(-1\)> Rejected connection from <HOST>:\d+:
failregex = ^%(_prefix)s Invalid server password$
^%(_prefix)s Wrong certificate or password for existing user$
ignoreregex =
datepattern = ^<W>{DATE}
# DEV Notes:
#

View File

@ -13,6 +13,9 @@ failregex = ^<HOST> \- \S+ \[\] \"(GET|POST|HEAD) \/<block> \S+\" 404 .+$
ignoreregex =
datepattern = {^LN-BEG}%%ExY(?P<_sep>[-/.])%%m(?P=_sep)%%d[T ]%%H:%%M:%%S(?:[.,]%%f)?(?:\s*%%z)?
^[^\[]*\[({DATE})
{^LN-BEG}
# DEV Notes:
# Based on apache-botsearch filter

View File

@ -8,6 +8,8 @@ failregex = ^ \[error\] \d+#\d+: \*\d+ user "\S+":? (password mismatch|was not f
ignoreregex =
datepattern = {^LN-BEG}
# DEV NOTES:
# Based on samples in https://github.com/fail2ban/fail2ban/pull/43/files
# Extensive search of all nginx auth failures not done yet.

View File

@ -43,3 +43,4 @@ failregex = ^\s*\[error\] \d+#\d+: \*\d+ limiting requests, excess: [\d\.]+ by z
ignoreregex =
datepattern = {^LN-BEG}

View File

@ -26,3 +26,6 @@ failregex = ^%(__prefix_line)sinfo: ratelimit block .* query <HOST> TYPE255$
^%(__prefix_line)sinfo: .* <HOST> refused, no acl matches\.$
ignoreregex =
datepattern = {^LN-BEG}Epoch
{^LN-BEG}

View File

@ -9,7 +9,6 @@
[Definition]
failregex = ^<HOST>\s+-\s+-\s+\[\]\s+"[A-Z]+ .*" 401 \d+\s*$
[Init]
datepattern = %%d/%%b[^/]*/%%Y:%%H:%%M:%%S %%z

View File

@ -52,10 +52,12 @@ before = common.conf
# Note that you MUST have LOG_FORMAT=4 for this to work!
#
failregex = ^.*tr="[A-Z]+\|[0-9.]+\|\d+\|<HOST>\|\d+" ap="[^"]*" mi="Bad password" us="[^"]*" di="535 5.7.8 Bad username or password( \(Authentication failed\))?\."/>$
failregex = tr="[A-Z]+\|[0-9.]+\|\d+\|<HOST>\|\d+" ap="[^"]*" mi="Bad password" us="[^"]*" di="535 5.7.8 Bad username or password( \(Authentication failed\))?\."/>$
# Option: ignoreregex
# Notes.: regex to ignore. If this regex matches, the line is ignored.
# Values: TEXT
#
ignoreregex =
datepattern = ^<co ts="{DATE}"\s+

View File

@ -18,3 +18,6 @@ ignoreregex =
# http://blogs.buanzo.com.ar/2009/04/fail2ban-filter-for-php-injection-attacks.html#comment-1489
#
# Author: Arturo 'Buanzo' Busleiman <buanzo@buanzo.com.ar>
datepattern = ^[^\[]*\[({DATE})
{^LN-BEG}

View File

@ -8,5 +8,8 @@ failregex = \/<HOST> Port\: [0-9]+ (TCP|UDP) Blocked$
ignoreregex =
datepattern = {^LN-BEG}Epoch
{^LN-BEG}
# Author: Pacop <pacoparu@gmail.com>

View File

@ -18,4 +18,6 @@ failregex = ^type=%(_type)s msg=audit\(:\d+\): (user )?pid=\d+ uid=%(_uid)s auid
ignoreregex =
datepattern = EPOCH
# Author: Daniel Black

View File

@ -6,7 +6,12 @@
failregex = ^ sogod \[\d+\]: SOGoRootPage Login from '<HOST>' for user '.*' might not have worked( - password policy: \d* grace: -?\d* expire: -?\d* bound: -?\d*)?\s*$
ignoreregex =
ignoreregex = "^<ADDR>"
datepattern = {^LN-BEG}%%ExY(?P<_sep>[-/.])%%m(?P=_sep)%%d[T ]%%H:%%M:%%S(?:[.,]%%f)?(?:\s*%%z)?
{^LN-BEG}(?:%%a )?%%b %%d %%H:%%M:%%S(?:\.%%f)?(?: %%ExY)?
^[^\[]*\[({DATE})
{^LN-BEG}
#
# DEV Notes:

View File

@ -9,5 +9,8 @@ failregex = ^\s+\d\s<HOST>\s+[A-Z_]+_DENIED/403 .*$
ignoreregex =
datepattern = {^LN-BEG}Epoch
{^LN-BEG}
# Author: Daniel Black

View File

@ -5,8 +5,6 @@ failregex = ^ \[LOGIN_ERROR\].*from <HOST>: Unknown user or password incorrect\.
ignoreregex =
[Init]
datepattern = ^%%m/%%d/%%Y %%H:%%M:%%S
# DEV NOTES:

View File

@ -38,13 +38,13 @@ failregex = ^%(__prefix_line)s(?:error: PAM: )?[aA]uthentication (?:failure|erro
ignoreregex =
[Init]
# "maxlines" is number of log lines to buffer for multi-line regex searches
maxlines = 10
journalmatch = _SYSTEMD_UNIT=sshd.service + _COMM=sshd
datepattern = {^LN-BEG}
# DEV Notes:
#
# "Failed \S+ for .*? from <HOST>..." failregex uses non-greedy catch-all because

View File

@ -10,6 +10,9 @@ failregex = ^[\da-f]{5,} [\da-f]{5,} (-- none --|.*?)( \d+(\.\d+)?(h|m|s|ms)){0
ignoreregex =
datepattern = ^[^-]+ -- [^-]+ -- - ({DATE})
{^LN-BEG}
# Author: Mika (mkl) from Tine20.org forum: https://www.tine20.org/forum/viewtopic.php?f=2&t=15688&p=54766
# Editor: Daniel Black
# Advisor: Lars Kneschke

View File

@ -122,15 +122,15 @@ Report bugs to https://github.com/fail2ban/fail2ban/issues
p.add_options([
Option("-d", "--datepattern",
help="set custom pattern used to match date/times"),
Option("-e", "--encoding",
Option("-e", "--encoding", default=PREFER_ENC,
help="File encoding. Default: system locale"),
Option("-r", "--raw", action='store_true',
Option("-r", "--raw", action='store_true', default=False,
help="Raw hosts, don't resolve dns"),
Option("--usedns", action='store', default=None,
help="DNS specified replacement of tags <HOST> in regexp "
"('yes' - matches all form of hosts, 'no' - IP addresses only)"),
Option("-L", "--maxlines", type=int, default=0,
help="maxlines for multi-line regex"),
help="maxlines for multi-line regex."),
Option("-m", "--journalmatch",
help="journalctl style matches overriding filter file. "
"\"systemd-journal\" only"),
@ -143,6 +143,8 @@ Report bugs to https://github.com/fail2ban/fail2ban/issues
help="Increase verbosity"),
Option("--verbosity", action="store", dest="verbose", type=int,
help="Set numerical level of verbosity (0..4)"),
Option("--verbose-date", "--VD", action='store_true',
help="Verbose date patterns/regex in output"),
Option("-D", "--debuggex", action='store_true',
help="Produce debuggex.com urls for debugging there"),
Option("--print-no-missed", action='store_true',
@ -215,14 +217,8 @@ class LineStats(object):
class Fail2banRegex(object):
def __init__(self, opts):
self._verbose = opts.verbose
self._debuggex = opts.debuggex
self._maxlines = 20
self._print_no_missed = opts.print_no_missed
self._print_no_ignored = opts.print_no_ignored
self._print_all_matched = opts.print_all_matched
self._print_all_missed = opts.print_all_missed
self._print_all_ignored = opts.print_all_ignored
# set local protected memebers from given options:
self.__dict__.update(dict(('_'+o,v) for o,v in opts.__dict__.iteritems()))
self._maxlines_set = False # so we allow to override maxlines in cmdline
self._datepattern_set = False
self._journalmatch = None
@ -236,26 +232,23 @@ class Fail2banRegex(object):
if opts.maxlines:
self.setMaxLines(opts.maxlines)
else:
self._maxlines = 20
if opts.journalmatch is not None:
self.setJournalMatch(opts.journalmatch.split())
if opts.datepattern:
self.setDatePattern(opts.datepattern)
if opts.encoding:
self.encoding = opts.encoding
else:
self.encoding = PREFER_ENC
self.raw = True if opts.raw else False
if opts.usedns:
self._filter.setUseDns(opts.usedns)
self._filter.returnRawHost = self.raw
self._filter.returnRawHost = opts.raw
self._filter.checkFindTime = False
self._filter.checkAllRegex = True
def decode_line(self, line):
return FileContainer.decode_line('<LOG>', self.encoding, line)
return FileContainer.decode_line('<LOG>', self._encoding, line)
def encode_line(self, line):
return line.encode(self.encoding, 'ignore')
return line.encode(self._encoding, 'ignore')
def setDatePattern(self, pattern):
if not self._datepattern_set:
@ -483,8 +476,12 @@ class Fail2banRegex(object):
out = []
for template in self._filter.dateDetector.templates:
if self._verbose or template.hits:
out.append("[%d] %s" % (
template.hits, template.name))
out.append("[%d] %s" % (template.hits, template.name))
if self._verbose_date:
out.append(" # weight: %.3f (%.3f), pattern: %s" % (
template.weight, template.template.weight,
getattr(template, 'pattern', ''),))
out.append(" # regex: %s" % (getattr(template, 'regex', ''),))
pprint_list(out, "[# of hits] date format")
output( "\nLines: %s" % self._line_stats, )
@ -522,7 +519,7 @@ class Fail2banRegex(object):
try:
hdlr = open(cmd_log, 'rb')
output( "Use log file : %s" % cmd_log )
output( "Use encoding : %s" % self.encoding )
output( "Use encoding : %s" % self._encoding )
test_lines = self.file_lines_gen(hdlr)
except IOError as e:
output( e )

View File

@ -40,6 +40,9 @@ class FilterReader(DefinitionInitConfigReader):
_configOpts = {
"ignoreregex": ["string", None],
"failregex": ["string", ""],
"maxlines": ["int", None],
"datepattern": ["string", None],
"journalmatch": ["string", None],
}
def setFile(self, fileName):
@ -76,16 +79,16 @@ class FilterReader(DefinitionInitConfigReader):
stream.append(["multi-set", self._jailName, "add" + opt, multi])
elif len(multi):
stream.append(["set", self._jailName, "add" + opt, multi[0]])
if self._initOpts:
if 'maxlines' in self._initOpts:
elif opt == 'maxlines':
# We warn when multiline regex is used without maxlines > 1
# therefore keep sure we set this option first.
stream.insert(0, ["set", self._jailName, "maxlines", self._initOpts["maxlines"]])
if 'datepattern' in self._initOpts:
stream.append(["set", self._jailName, "datepattern", self._initOpts["datepattern"]])
stream.insert(0, ["set", self._jailName, "maxlines", value])
elif opt == 'datepattern':
stream.append(["set", self._jailName, "datepattern", value])
# Do not send a command if the match is empty.
if self._initOpts.get("journalmatch", '') != '':
for match in self._initOpts["journalmatch"].split("\n"):
elif opt == 'journalmatch':
for match in value.split("\n"):
if match == '': continue
stream.append(
["set", self._jailName, "addjournalmatch"] +
shlex.split(match))

View File

@ -112,6 +112,7 @@ class JailReader(ConfigReader):
["string", "ignorecommand", None],
["string", "ignoreip", None],
["string", "filter", ""],
["string", "datepattern", None],
["string", "action", ""]]
# Before interpolation (substitution) add static options always available as default:
@ -124,70 +125,72 @@ class JailReader(ConfigReader):
self.__opts = ConfigReader.getOptions(self, self.__name, opts1st, shouldExist=True)
if not self.__opts: # pragma: no cover
raise JailDefError("Init jail options failed")
if self.isEnabled():
# Read filter
flt = self.__opts["filter"]
if flt:
filterName, filterOpt = JailReader.extractOptions(flt)
if not filterName:
raise JailDefError("Invalid filter definition %r" % flt)
self.__filter = FilterReader(
filterName, self.__name, filterOpt, share_config=self.share_config, basedir=self.getBaseDir())
ret = self.__filter.read()
# merge options from filter as 'known/...':
self.__filter.getOptions(self.__opts)
ConfigReader.merge_section(self, self.__name, self.__filter.getCombined(), 'known/')
if not ret:
raise JailDefError("Unable to read the filter %r" % filterName)
else:
self.__filter = None
logSys.warning("No filter set for jail %s" % self.__name)
if not self.isEnabled():
return True
# Read filter
flt = self.__opts["filter"]
if flt:
filterName, filterOpt = JailReader.extractOptions(flt)
if not filterName:
raise JailDefError("Invalid filter definition %r" % flt)
self.__filter = FilterReader(
filterName, self.__name, filterOpt, share_config=self.share_config, basedir=self.getBaseDir())
ret = self.__filter.read()
# merge options from filter as 'known/...':
self.__filter.getOptions(self.__opts)
ConfigReader.merge_section(self, self.__name, self.__filter.getCombined(), 'known/')
if not ret:
raise JailDefError("Unable to read the filter %r" % filterName)
else:
self.__filter = None
logSys.warning("No filter set for jail %s" % self.__name)
# Read second all options (so variables like %(known/param) can be interpolated):
self.__opts = ConfigReader.getOptions(self, self.__name, opts)
if not self.__opts: # pragma: no cover
raise JailDefError("Read jail options failed")
# cumulate filter options again (ignore given in jail):
if self.__filter:
self.__filter.getOptions(self.__opts)
# Read action
for act in self.__opts["action"].split('\n'):
try:
if not act: # skip empty actions
continue
actName, actOpt = JailReader.extractOptions(act)
if not actName:
raise JailDefError("Invalid action definition %r" % act)
if actName.endswith(".py"):
self.__actions.append([
"set",
self.__name,
"addaction",
actOpt.pop("actname", os.path.splitext(actName)[0]),
os.path.join(
self.getBaseDir(), "action.d", actName),
json.dumps(actOpt),
])
# Read second all options (so variables like %(known/param) can be interpolated):
self.__opts = ConfigReader.getOptions(self, self.__name, opts)
if not self.__opts: # pragma: no cover
raise JailDefError("Read jail options failed")
# cumulate filter options again (ignore given in jail):
if self.__filter:
self.__filter.getOptions(self.__opts)
# Read action
for act in self.__opts["action"].split('\n'):
try:
if not act: # skip empty actions
continue
actName, actOpt = JailReader.extractOptions(act)
if not actName:
raise JailDefError("Invalid action definition %r" % act)
if actName.endswith(".py"):
self.__actions.append([
"set",
self.__name,
"addaction",
actOpt.pop("actname", os.path.splitext(actName)[0]),
os.path.join(
self.getBaseDir(), "action.d", actName),
json.dumps(actOpt),
])
else:
action = ActionReader(
actName, self.__name, actOpt,
share_config=self.share_config, basedir=self.getBaseDir())
ret = action.read()
if ret:
action.getOptions(self.__opts)
self.__actions.append(action)
else:
action = ActionReader(
actName, self.__name, actOpt,
share_config=self.share_config, basedir=self.getBaseDir())
ret = action.read()
if ret:
action.getOptions(self.__opts)
self.__actions.append(action)
else:
raise JailDefError("Unable to read action %r" % actName)
except JailDefError:
raise
except Exception as e:
logSys.debug("Caught exception: %s", e, exc_info=True)
raise ValueError("Error in action definition %r: %r" % (act, e))
if not len(self.__actions):
logSys.warning("No actions were defined for %s" % self.__name)
raise JailDefError("Unable to read action %r" % actName)
except JailDefError:
raise
except Exception as e:
logSys.debug("Caught exception: %s", e, exc_info=True)
raise ValueError("Error in action definition %r: %r" % (act, e))
if not len(self.__actions):
logSys.warning("No actions were defined for %s" % self.__name)
except JailDefError as e:
e = str(e)
@ -213,6 +216,8 @@ class JailReader(ConfigReader):
if e:
stream.extend([['config-error', "Jail '%s' skipped, because of wrong configuration: %s" % (self.__name, e)]])
return stream
if self.__filter:
stream.extend(self.__filter.convert())
for opt, value in self.__opts.iteritems():
if opt == "logpath" and \
not self.__opts.get('backend', None).startswith("systemd"):
@ -234,17 +239,9 @@ class JailReader(ConfigReader):
stream.append(["set", self.__name, "logencoding", value])
elif opt == "backend":
backend = value
elif opt == "maxretry":
stream.append(["set", self.__name, "maxretry", value])
elif opt == "ignoreip":
for ip in splitwords(value):
stream.append(["set", self.__name, "addignoreip", ip])
elif opt == "findtime":
stream.append(["set", self.__name, "findtime", value])
elif opt == "bantime":
stream.append(["set", self.__name, "bantime", value])
elif opt == "usedns":
stream.append(["set", self.__name, "usedns", value])
elif opt in ("failregex", "ignoreregex"):
multi = []
for regex in value.split('\n'):
@ -255,10 +252,8 @@ class JailReader(ConfigReader):
stream.append(["multi-set", self.__name, "add" + opt, multi])
elif len(multi):
stream.append(["set", self.__name, "add" + opt, multi[0]])
elif opt == "ignorecommand":
stream.append(["set", self.__name, "ignorecommand", value])
if self.__filter:
stream.extend(self.__filter.convert())
elif opt not in ('action', 'filter', 'enabled'):
stream.append(["set", self.__name, opt, value])
for action in self.__actions:
if isinstance(action, (ConfigReaderUnshared, ConfigReader)):
stream.extend(action.convert())

View File

@ -21,11 +21,13 @@ __author__ = "Cyril Jaquier and Fail2Ban Contributors"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import copy
import time
from threading import Lock
from .datetemplate import DatePatternRegex, DateTai64n, DateEpoch
from .datetemplate import re, DateTemplate, DatePatternRegex, DateTai64n, DateEpoch
from .utils import Utils
from ..helpers import getLogger
# Gets the instance of the logger.
@ -33,8 +35,52 @@ logSys = getLogger(__name__)
logLevel = 6
RE_DATE_PREMATCH = re.compile("\{DATE\}", re.IGNORECASE)
DD_patternCache = Utils.Cache(maxCount=1000, maxTime=60*60)
def _getPatternTemplate(pattern, key=None):
if key is None:
key = pattern
if '%' not in pattern:
key = pattern.upper()
template = DD_patternCache.get(key)
if not template:
if key in ("EPOCH", "{^LN-BEG}EPOCH", "^EPOCH"):
template = DateEpoch(lineBeginOnly=(key != "EPOCH"))
elif key in ("TAI64N", "{^LN-BEG}TAI64N", "^TAI64N"):
template = DateTai64n(wordBegin=('start' if key != "TAI64N" else False))
else:
template = DatePatternRegex(pattern)
DD_patternCache.set(key, template)
return template
def _getAnchoredTemplate(template, wrap=lambda s: '{^LN-BEG}' + s):
# wrap name:
name = wrap(template.name)
# try to find in cache (by name):
template2 = DD_patternCache.get(name)
if not template2:
# wrap pattern (or regexp if not pattern template):
regex = wrap(getattr(template, 'pattern', template.regex))
if hasattr(template, 'pattern'):
# try to find in cache (by pattern):
template2 = DD_patternCache.get(regex)
# make duplicate and set new anchored regex:
if not template2:
if not hasattr(template, 'pattern'):
template2 = _getPatternTemplate(name)
else:
template2 = _getPatternTemplate(regex)
return template2
class DateDetectorCache(object):
"""Implements the caching of the default templates list.
"""
def __init__(self):
self.__lock = Lock()
self.__templates = list()
@ -43,71 +89,115 @@ class DateDetectorCache(object):
def templates(self):
"""List of template instances managed by the detector.
"""
if self.__templates:
return self.__templates
with self.__lock:
if self.__templates:
if self.__templates: # pragma: no cover - race-condition + multi-threaded environment only
return self.__templates
self._addDefaultTemplate()
return self.__templates
def _cacheTemplate(self, template):
"""Cache Fail2Ban's default template.
"""
if isinstance(template, str):
template = DatePatternRegex(template)
self.__templates.append(template)
# exact given template with word begin-end boundary:
template = _getPatternTemplate(template)
# if not already line-begin anchored, additional template, that prefers datetime
# at start of a line (safety+performance feature):
name = template.name
if not name.startswith('{^LN-BEG}') and not name.startswith('^') and hasattr(template, 'regex'):
template2 = _getAnchoredTemplate(template)
# prevent to add duplicates:
if template2.name != name:
# increase weight of such templates, because they should be always
# preferred in template sorting process (bubble up):
template2.weight = 100.0
self.__tmpcache[0].append(template2)
# add template:
self.__tmpcache[1].append(template)
def _addDefaultTemplate(self):
"""Add resp. cache Fail2Ban's default set of date templates.
"""
self.__tmpcache = [], []
# ISO 8601, simple date, optional subsecond and timezone:
# 2005-01-23T21:59:59.981746, 2005-01-23 21:59:59
# simple date: 2005/01/23 21:59:59
# custom for syslog-ng 2006.12.21 06:43:20
self._cacheTemplate("%ExY(?P<_sep>[-/.])%m(?P=_sep)%d[T ]%H:%M:%S(?:[.,]%f)?(?:\s*%z)?")
# asctime with optional day, subsecond and/or year:
# Sun Jan 23 21:59:59.011 2005
self._cacheTemplate("(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %Y)?")
self._cacheTemplate("(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?")
# asctime with optional day, subsecond and/or year coming after day
# http://bugs.debian.org/798923
# Sun Jan 23 2005 21:59:59.011
self._cacheTemplate("(?:%a )?%b %d %Y %H:%M:%S(?:\.%f)?")
# simple date, optional subsecond (proftpd):
# 2005-01-23 21:59:59
# simple date: 2005/01/23 21:59:59
# custom for syslog-ng 2006.12.21 06:43:20
self._cacheTemplate("%Y(?P<_sep>[-/.])%m(?P=_sep)%d %H:%M:%S(?:,%f)?")
self._cacheTemplate("(?:%a )?%b %d %ExY %H:%M:%S(?:\.%f)?")
# simple date too (from x11vnc): 23/01/2005 21:59:59
# and with optional year given by 2 digits: 23/01/05 21:59:59
# (See http://bugs.debian.org/537610)
# 17-07-2008 17:23:25
self._cacheTemplate("%d(?P<_sep>[-/])%m(?P=_sep)(?:%Y|%y) %H:%M:%S")
self._cacheTemplate("%d(?P<_sep>[-/])%m(?P=_sep)(?:%ExY|%Exy) %H:%M:%S")
# Apache format optional time zone:
# [31/Oct/2006:09:22:55 -0000]
# 26-Jul-2007 15:20:52
self._cacheTemplate("%d(?P<_sep>[-/])%b(?P=_sep)%Y[ :]?%H:%M:%S(?:\.%f)?(?: %z)?")
# CPanel 05/20/2008:01:57:39
self._cacheTemplate("%m/%d/%Y:%H:%M:%S")
# named 26-Jul-2007 15:20:52.252
# named 26-Jul-2007 15:20:52.252
# roundcube 26-Jul-2007 15:20:52 +0200
self._cacheTemplate("%d(?P<_sep>[-/])%b(?P=_sep)%ExY[ :]?%H:%M:%S(?:\.%f)?(?: %z)?")
# CPanel 05/20/2008:01:57:39
self._cacheTemplate("%m/%d/%ExY:%H:%M:%S")
# 01-27-2012 16:22:44.252
# subseconds explicit to avoid possible %m<->%d confusion
# with previous
self._cacheTemplate("%m-%d-%Y %H:%M:%S\.%f")
# TAI64N
template = DateTai64n()
template.name = "TAI64N"
self._cacheTemplate(template)
# with previous ("%d-%m-%ExY %H:%M:%S" by "%d(?P<_sep>[-/])%m(?P=_sep)(?:%ExY|%Exy) %H:%M:%S")
self._cacheTemplate("%m-%d-%ExY %H:%M:%S(?:\.%f)?")
# Epoch
template = DateEpoch()
template.name = "Epoch"
self._cacheTemplate(template)
# ISO 8601
self._cacheTemplate("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?(?:%z)?")
self._cacheTemplate('EPOCH')
# Only time information in the log
self._cacheTemplate("^%H:%M:%S")
self._cacheTemplate("{^LN-BEG}%H:%M:%S")
# <09/16/08@05:03:30>
self._cacheTemplate("^<%m/%d/%y@%H:%M:%S>")
self._cacheTemplate("^<%m/%d/%Exy@%H:%M:%S>")
# MySQL: 130322 11:46:11
self._cacheTemplate("^%y%m%d ?%H:%M:%S")
self._cacheTemplate("%Exy%Exm%Exd ?%H:%M:%S")
# Apache Tomcat
self._cacheTemplate("%b %d, %Y %I:%M:%S %p")
self._cacheTemplate("%b %d, %ExY %I:%M:%S %p")
# ASSP: Apr-27-13 02:33:06
self._cacheTemplate("^%b-%d-%y %H:%M:%S")
self._cacheTemplate("^%b-%d-%Exy %H:%M:%S")
# 20050123T215959, 20050123 215959
self._cacheTemplate("%ExY%Exm%Exd[T ]%ExH%ExM%ExS(?:[.,]%f)?(?:\s*%z)?")
# prefixed with optional named time zone (monit):
# PDT Apr 16 21:05:29
self._cacheTemplate("(?:%Z )?(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?")
# +00:00 Jan 23 21:59:59.011 2005
self._cacheTemplate("(?:%z )?(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?")
# TAI64N
self._cacheTemplate("TAI64N")
#
self.__templates = self.__tmpcache[0] + self.__tmpcache[1]
del self.__tmpcache
class DateDetectorTemplate(object):
"""Used for "shallow copy" of the template object.
Prevents collectively usage of hits/lastUsed in cached templates
"""
__slots__ = ('template', 'hits', 'lastUsed', 'distance')
def __init__(self, template):
self.template = template
self.hits = 0
self.lastUsed = 0
# the last distance to date-match within the log file:
self.distance = 0x7fffffff
@property
def weight(self):
return self.hits * self.template.weight / max(1, self.distance)
def __getattr__(self, name):
""" Returns attribute of template (called for parameters not in slots)
"""
return getattr(self.template, name)
class DateDetector(object):
@ -120,19 +210,27 @@ class DateDetector(object):
_defCache = DateDetectorCache()
def __init__(self):
self.__lock = Lock()
self.__templates = list()
self.__known_names = set()
# time the template was long unused (currently 300 == 5m):
self.__unusedTime = 300
# last known distance (bypass one char collision) and end position:
self.__lastPos = 1, None
self.__lastEndPos = 0x7fffffff, None
self.__lastTemplIdx = 0x7fffffff
# first free place:
self.__firstUnused = 0
# pre-match pattern:
self.__preMatch = None
def _appendTemplate(self, template):
def _appendTemplate(self, template, ignoreDup=False):
name = template.name
if name in self.__known_names:
if ignoreDup: return
raise ValueError(
"There is already a template with name %s" % name)
self.__known_names.add(name)
self.__templates.append(template)
self.__templates.append(DateDetectorTemplate(template))
def appendTemplate(self, template):
"""Add a date template to manage and use in search of dates.
@ -150,15 +248,45 @@ class DateDetector(object):
If a template already exists with the same name.
"""
if isinstance(template, str):
template = DatePatternRegex(template)
self._appendTemplate(template)
key = pattern = template
if '%' not in pattern:
key = pattern.upper()
template = DD_patternCache.get(key)
if not template:
if key in ("{^LN-BEG}", "{DEFAULT}"):
flt = \
lambda template: template.flags & DateTemplate.LINE_BEGIN if key == "{^LN-BEG}" else None
self.addDefaultTemplate(flt)
return
elif "{DATE}" in key:
self.addDefaultTemplate(
lambda template: not template.flags & DateTemplate.LINE_BEGIN, pattern)
return
else:
template = _getPatternTemplate(pattern, key)
def addDefaultTemplate(self):
DD_patternCache.set(key, template)
self._appendTemplate(template)
logSys.info(" date pattern `%r`: `%s`",
getattr(template, 'pattern', ''), template.name)
logSys.debug(" date pattern regex for %r: %s",
getattr(template, 'pattern', ''), template.regex)
def addDefaultTemplate(self, filterTemplate=None, preMatch=None):
"""Add Fail2Ban's default set of date templates.
"""
with self.__lock:
for template in DateDetector._defCache.templates:
self._appendTemplate(template)
ignoreDup = len(self.__templates) > 0
for template in DateDetector._defCache.templates:
# filter if specified:
if filterTemplate is not None and not filterTemplate(template): continue
# if exact pattern available - create copy of template, contains replaced {DATE} with default regex:
if preMatch is not None:
# get cached or create a copy with modified name/pattern, using preMatch replacement for {DATE}:
template = _getAnchoredTemplate(template,
wrap=lambda s: RE_DATE_PREMATCH.sub(s, preMatch))
# append date detector template (ignore duplicate if some was added before default):
self._appendTemplate(template, ignoreDup=ignoreDup)
@property
def templates(self):
@ -184,22 +312,115 @@ class DateDetector(object):
The regex match returned from the first successfully matched
template.
"""
i = 0
with self.__lock:
for template in self.__templates:
# if no templates specified - default templates should be used:
if not len(self.__templates):
self.addDefaultTemplate()
logSys.log(logLevel-1, "try to match time for line: %.120s", line)
match = None
# first try to use last template with same start/end position:
ignoreBySearch = 0x7fffffff
i = self.__lastTemplIdx
if i < len(self.__templates):
ddtempl = self.__templates[i]
template = ddtempl.template
if template.flags & (DateTemplate.LINE_BEGIN|DateTemplate.LINE_END):
if logSys.getEffectiveLevel() <= logLevel-1: # pragma: no cover - very-heavy debug
logSys.log(logLevel-1, " try to match last anchored template #%02i ...", i)
match = template.matchDate(line)
if not match is None:
ignoreBySearch = i
else:
distance, endpos = self.__lastPos[0], self.__lastEndPos[0]
if logSys.getEffectiveLevel() <= logLevel-1:
logSys.log(logLevel-1, " try to match last template #%02i (from %r to %r): ...%r==%r %s %r==%r...",
i, distance, endpos,
line[distance-1:distance], self.__lastPos[1],
line[distance:endpos],
line[endpos:endpos+1], self.__lastEndPos[1])
# check same boundaries left/right, otherwise possible collision/pattern switch:
if (line[distance-1:distance] == self.__lastPos[1] and
line[endpos:endpos+1] == self.__lastEndPos[1]
):
match = template.matchDate(line, distance, endpos)
if match:
distance = match.start()
endpos = match.end()
# if different position, possible collision/pattern switch:
if (
template.flags & (DateTemplate.LINE_BEGIN|DateTemplate.LINE_END) or
(distance == self.__lastPos[0] and endpos == self.__lastEndPos[0])
):
logSys.log(logLevel, " matched last time template #%02i", i)
else:
logSys.log(logLevel, " ** last pattern collision - pattern change, search ...")
match = None
else:
logSys.log(logLevel, " ** last pattern not found - pattern change, search ...")
# search template and better match:
if not match:
logSys.log(logLevel, " search template (%i) ...", len(self.__templates))
found = None, 0x7fffffff, 0x7fffffff, -1
i = 0
for ddtempl in self.__templates:
if logSys.getEffectiveLevel() <= logLevel-1:
logSys.log(logLevel-1, " try template #%02i: %s", i, ddtempl.name)
if i == ignoreBySearch:
i += 1
continue
template = ddtempl.template
match = template.matchDate(line)
if match:
distance = match.start()
endpos = match.end()
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, "Matched time template %s", template.name)
template.hits += 1
template.lastUsed = time.time()
# if not first - try to reorder current template (bubble up), they will be not sorted anymore:
if i:
self._reorderTemplate(i)
# return tuple with match and template reference used for parsing:
return (match, template)
logSys.log(logLevel, " matched time template #%02i (at %r <= %r, %r) %s",
i, distance, ddtempl.distance, self.__lastPos[0], template.name)
## last (or single) template - fast stop:
if i+1 >= len(self.__templates):
break
## if line-begin/end anchored - stop searching:
if template.flags & (DateTemplate.LINE_BEGIN|DateTemplate.LINE_END):
break
## stop searching if next template still unused, but we had already hits:
if (distance == 0 and ddtempl.hits) and not self.__templates[i+1].template.hits:
break
## [grave] if distance changed, possible date-match was found somewhere
## in body of message, so save this template, and search further:
if distance > ddtempl.distance or distance > self.__lastPos[0]:
logSys.log(logLevel, " ** distance collision - pattern change, reserve")
## shortest of both:
if distance < found[1]:
found = match, distance, endpos, i
## search further:
match = None
i += 1
continue
## winner - stop search:
break
i += 1
# check other template was found (use this one with shortest distance):
if not match and found[0]:
match, distance, endpos, i = found
logSys.log(logLevel, " use best time template #%02i", i)
ddtempl = self.__templates[i]
template = ddtempl.template
# we've winner, incr hits, set distance, usage, reorder, etc:
if match:
ddtempl.hits += 1
ddtempl.lastUsed = time.time()
ddtempl.distance = distance
if self.__firstUnused == i:
self.__firstUnused += 1
self.__lastPos = distance, line[distance-1:distance]
self.__lastEndPos = endpos, line[endpos:endpos+1]
# if not first - try to reorder current template (bubble up), they will be not sorted anymore:
if i and i != self.__lastTemplIdx:
i = self._reorderTemplate(i)
self.__lastTemplIdx = i
# return tuple with match and template reference used for parsing:
return (match, template)
# not found:
logSys.log(logLevel, " no template.")
return (None, None)
def getTime(self, line, timeMatch=None):
@ -221,31 +442,22 @@ class DateDetector(object):
The Unix timestamp returned from the first successfully matched
template or None if not found.
"""
if timeMatch:
template = timeMatch[1]
if template is not None:
try:
date = template.getDate(line, timeMatch[0])
if date is not None:
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, "Got time %f for %r using template %s",
date[0], date[1].group(), template.name)
return date
except ValueError:
return None
with self.__lock:
for template in self.__templates:
try:
date = template.getDate(line)
if date is None:
continue
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, "Got time %f for %r using template %s",
date[0], date[1].group(), template.name)
# search match for all specified templates:
if timeMatch is None:
timeMatch = self.matchTime(line)
# convert:
template = timeMatch[1]
if template is not None:
try:
date = template.getDate(line, timeMatch[0])
if date is not None:
if logSys.getEffectiveLevel() <= logLevel: # pragma: no cover - heavy debug
logSys.log(logLevel, " got time %f for %r using template %s",
date[0], date[1].group(1), template.name)
return date
except ValueError: # pragma: no cover
pass
return None
except ValueError:
pass
return None
def _reorderTemplate(self, num):
"""Reorder template (bubble up) in template list if hits grows enough.
@ -257,18 +469,39 @@ class DateDetector(object):
"""
if num:
templates = self.__templates
template = templates[num]
ddtempl = templates[num]
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, " -> reorder template #%02i, hits: %r", num, ddtempl.hits)
## current hits and time the template was long unused:
untime = template.lastUsed - self.__unusedTime
hits = template.hits
untime = ddtempl.lastUsed - self.__unusedTime
weight = ddtempl.weight
## try to move faster (first if unused available, or half of part to current template position):
pos = self.__firstUnused if self.__firstUnused < num else num // 2
## don't move too often (multiline logs resp. log's with different date patterns),
## if template not used too long, replace it also :
if hits > templates[num-1].hits + 5 or templates[num-1].lastUsed < untime:
## try to move faster (half of part to current template):
pos = num // 2
## if not larger - move slow (exact 1 position):
if hits <= templates[pos].hits or templates[pos].lastUsed < untime:
pos = num-1
templates[pos], templates[num] = template, templates[pos]
def _moveable():
pweight = templates[pos].weight
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, " -> compare template #%02i & #%02i, weight %.3f > %.3f, hits %r > %r",
num, pos, weight, pweight, ddtempl.hits, templates[pos].hits)
return weight > pweight or untime > templates[pos].lastUsed
##
## if not moveable (smaller weight or target position recently used):
if not _moveable():
## try to move slow (exact 1 position):
if pos == num-1:
return num
pos = num-1
## if still smaller and template at position used, don't move:
if not _moveable():
return num
## move:
del templates[num]
templates[pos:0] = [ddtempl]
## correct first unused:
while self.__firstUnused < len(templates) and templates[self.__firstUnused].hits:
self.__firstUnused += 1
if logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, " -> moved template #%02i -> #%02i", num, pos)
return pos
return num

View File

@ -24,14 +24,28 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import re
import re, time
from abc import abstractmethod
from .strptime import reGroupDictStrptime, timeRE
from .strptime import reGroupDictStrptime, timeRE, getTimePatternRE
from ..helpers import getLogger
logSys = getLogger(__name__)
# check already grouped contains "(", but ignores char "\(" and conditional "(?(id)...)":
RE_GROUPED = re.compile(r'(?<!(?:\(\?))(?<!\\)\((?!\?)')
RE_GROUP = ( re.compile(r'^((?:\(\?\w+\))?\^?(?:\(\?\w+\))?)(.*?)(\$?)$'), r"\1(\2)\3" )
RE_EXLINE_BOUND_BEG = re.compile(r'^\{\^LN-BEG\}')
RE_NO_WRD_BOUND_BEG = re.compile(r'^\(*(?:\(\?\w+\))?(?:\^|\(*\*\*|\(\?:\^)')
RE_NO_WRD_BOUND_END = re.compile(r'(?<!\\)(?:\$\)?|\*\*\)*)$')
RE_DEL_WRD_BOUNDS = ( re.compile(r'^\(*(?:\(\?\w+\))?\(*\*\*|(?<!\\)\*\*\)*$'),
lambda m: m.group().replace('**', '') )
RE_LINE_BOUND_BEG = re.compile(r'^(?:\(\?\w+\))?(?:\^|\(\?:\^(?!\|))')
RE_LINE_BOUND_END = re.compile(r'(?<![\\\|])(?:\$\)?)$')
RE_ALPHA_PATTERN = re.compile(r'(?<!\%)\%[aAbBpc]')
class DateTemplate(object):
"""A template which searches for and returns a date from a log line.
@ -45,22 +59,19 @@ class DateTemplate(object):
regex
"""
LINE_BEGIN = 8
LINE_END = 4
WORD_BEGIN = 2
WORD_END = 1
def __init__(self):
self._name = ""
self.name = ""
self.weight = 1.0
self.flags = 0
self.hits = 0
self.time = 0
self._regex = ""
self._cRegex = None
self.hits = 0
self.lastUsed = 0
@property
def name(self):
"""Name assigned to template.
"""
return self._name
@name.setter
def name(self, name):
self._name = name
def getRegex(self):
return self._regex
@ -75,10 +86,12 @@ class DateTemplate(object):
wordBegin : bool
Defines whether the regex should be modified to search at beginning of a
word, by adding special boundary r'(?=^|\b|\W)' to start of regex.
Can be disabled with specifying of ** at front of regex.
Default True.
wordEnd : bool
Defines whether the regex should be modified to search at end of a word,
by adding special boundary r'(?=\b|\W|$)' to end of regex.
Can be disabled with specifying of ** at end of regex.
Default True.
Raises
@ -86,23 +99,62 @@ class DateTemplate(object):
re.error
If regular expression fails to compile
"""
# Warning: don't use lookahead for line-begin boundary,
# (e. g. r"^(?:\W{0,2})?" is much faster as r"(?:^|(?<=^\W)|(?<=^\W{2}))")
# because it may be very slow in negative case (by long log-lines not matching pattern)
regex = regex.strip()
if wordBegin and not re.search(r'^\^', regex):
regex = r'(?=^|\b|\W)' + regex
if wordEnd and not re.search(r'\$$', regex):
boundBegin = wordBegin and not RE_NO_WRD_BOUND_BEG.search(regex)
boundEnd = wordEnd and not RE_NO_WRD_BOUND_END.search(regex)
# if no group add it now, should always have a group(1):
if not RE_GROUPED.search(regex):
regex = RE_GROUP[0].sub(RE_GROUP[1], regex)
self.flags = 0
# if word or line start boundary:
if boundBegin:
self.flags |= DateTemplate.WORD_BEGIN if wordBegin != 'start' else DateTemplate.LINE_BEGIN
if wordBegin != 'start':
regex = r'(?:^|\b|\W)' + regex
else:
regex = r"^(?:\W{0,2})?" + regex
if not self.name.startswith('{^LN-BEG}'):
self.name = '{^LN-BEG}' + self.name
# if word end boundary:
if boundEnd:
self.flags |= DateTemplate.WORD_END
regex += r'(?=\b|\W|$)'
if RE_LINE_BOUND_BEG.search(regex): self.flags |= DateTemplate.LINE_BEGIN
if RE_LINE_BOUND_END.search(regex): self.flags |= DateTemplate.LINE_END
# remove possible special pattern "**" in front and end of regex:
regex = RE_DEL_WRD_BOUNDS[0].sub(RE_DEL_WRD_BOUNDS[1], regex)
self._regex = regex
logSys.debug(' constructed regex %s', regex)
self._cRegex = None
regex = property(getRegex, setRegex, doc=
"""Regex used to search for date.
""")
def matchDate(self, line):
def _compileRegex(self):
"""Compile regex by first usage.
"""
if not self._cRegex:
try:
# print('*'*10 + (' compile - %-30.30s -- %s' % (getattr(self, 'pattern', self.regex), self.name)))
self._cRegex = re.compile(self.regex)
except Exception as e:
logSys.error('Compile %r failed, expression %r', self.name, self.regex)
raise e
def matchDate(self, line, *args):
"""Check if regex for date matches on a log line.
"""
if not self._cRegex:
self._cRegex = re.compile(self.regex, re.UNICODE | re.IGNORECASE)
dateMatch = self._cRegex.search(line)
self._compileRegex()
dateMatch = self._cRegex.search(line, *args); # pos, endpos
if dateMatch:
self.hits += 1
# print('*'*10 + ('[%s] - %-30.30s -- %s' % ('*' if dateMatch else ' ', getattr(self, 'pattern', self.regex), self.name)))
return dateMatch
@abstractmethod
@ -138,9 +190,15 @@ class DateEpoch(DateTemplate):
regex
"""
def __init__(self):
def __init__(self, lineBeginOnly=False):
DateTemplate.__init__(self)
self.regex = r"(?:^|(?P<square>(?<=^\[))|(?P<selinux>(?<=audit\()))\d{10,11}\b(?:\.\d{3,6})?(?:(?(selinux)(?=:\d+\)))|(?(square)(?=\])))"
self.name = "Epoch"
if not lineBeginOnly:
regex = r"((?:^|(?P<square>(?<=^\[))|(?P<selinux>(?<=\baudit\()))\d{10,11}\b(?:\.\d{3,6})?)(?:(?(selinux)(?=:\d+\)))|(?(square)(?=\])))"
self.setRegex(regex, wordBegin=False) ;# already line begin resp. word begin anchored
else:
regex = r"((?P<square>(?<=^\[))?\d{10,11}\b(?:\.\d{3,6})?)(?(square)(?=\]))"
self.setRegex(regex, wordBegin='start', wordEnd=True)
def getDate(self, line, dateMatch=None):
"""Method to return the date for a log line.
@ -160,8 +218,7 @@ class DateEpoch(DateTemplate):
dateMatch = self.matchDate(line)
if dateMatch:
# extract part of format which represents seconds since epoch
return (float(dateMatch.group()), dateMatch)
return None
return (float(dateMatch.group(1)), dateMatch)
class DatePatternRegex(DateTemplate):
@ -178,21 +235,15 @@ class DatePatternRegex(DateTemplate):
regex
pattern
"""
_patternRE = re.compile(r"%%(%%|[%s])" % "".join(timeRE.keys()))
_patternName = {
'a': "DAY", 'A': "DAYNAME", 'b': "MON", 'B': "MONTH", 'd': "Day",
'H': "24hour", 'I': "12hour", 'j': "Yearday", 'm': "Month",
'M': "Minute", 'p': "AMPM", 'S': "Second", 'U': "Yearweek",
'w': "Weekday", 'W': "Yearweek", 'y': 'Year2', 'Y': "Year", '%': "%",
'z': "Zone offset", 'f': "Microseconds", 'Z': "Zone name"}
for _key in set(timeRE) - set(_patternName): # may not have them all...
_patternName[_key] = "%%%s" % _key
_patternRE, _patternName = getTimePatternRE()
_patternRE = re.compile(_patternRE)
def __init__(self, pattern=None):
def __init__(self, pattern=None, **kwargs):
super(DatePatternRegex, self).__init__()
self._pattern = None
if pattern is not None:
self.pattern = pattern
self.setRegex(pattern, **kwargs)
@property
def pattern(self):
@ -208,17 +259,23 @@ class DatePatternRegex(DateTemplate):
@pattern.setter
def pattern(self, pattern):
self.setRegex(pattern)
def setRegex(self, pattern, wordBegin=True, wordEnd=True):
# original pattern:
self._pattern = pattern
# if explicit given {^LN-BEG} - remove it from pattern and set 'start' in wordBegin:
if wordBegin and RE_EXLINE_BOUND_BEG.search(pattern):
pattern = RE_EXLINE_BOUND_BEG.sub('', pattern)
wordBegin = 'start'
# wrap to regex:
fmt = self._patternRE.sub(r'%(\1)s', pattern)
self._name = fmt % self._patternName
super(DatePatternRegex, self).setRegex(fmt % timeRE)
def setRegex(self, value):
raise NotImplementedError("Regex derived from pattern")
@DateTemplate.name.setter
def name(self, value):
raise NotImplementedError("Name derived from pattern")
self.name = fmt % self._patternName
regex = fmt % timeRE
# if expected add (?iu) for "ignore case" and "unicode":
if RE_ALPHA_PATTERN.search(pattern):
regex = r'(?iu)' + regex
super(DatePatternRegex, self).setRegex(regex, wordBegin, wordEnd)
def getDate(self, line, dateMatch=None):
"""Method to return the date for a log line.
@ -240,11 +297,7 @@ class DatePatternRegex(DateTemplate):
if not dateMatch:
dateMatch = self.matchDate(line)
if dateMatch:
groupdict = dict(
(key, value)
for key, value in dateMatch.groupdict().iteritems()
if value is not None)
return reGroupDictStrptime(groupdict), dateMatch
return reGroupDictStrptime(dateMatch.groupdict()), dateMatch
class DateTai64n(DateTemplate):
@ -256,11 +309,11 @@ class DateTai64n(DateTemplate):
regex
"""
def __init__(self):
def __init__(self, wordBegin=False):
DateTemplate.__init__(self)
self.name = "TAI64N"
# We already know the format for TAI64N
# yoh: we should not add an additional front anchor
self.setRegex("@[0-9a-f]{24}", wordBegin=False)
self.setRegex("@[0-9a-f]{24}", wordBegin=wordBegin)
def getDate(self, line, dateMatch=None):
"""Method to return the date for a log line.
@ -280,8 +333,7 @@ class DateTai64n(DateTemplate):
dateMatch = self.matchDate(line)
if dateMatch:
# extract part of format which represents seconds since epoch
value = dateMatch.group()
value = dateMatch.group(1)
seconds_since_epoch = value[2:17]
# convert seconds from HEX into local time stamp
return (int(seconds_since_epoch, 16), dateMatch)
return None

View File

@ -35,7 +35,6 @@ from .ipdns import DNSUtils, IPAddr
from .ticket import FailTicket
from .jailthread import JailThread
from .datedetector import DateDetector
from .datetemplate import DatePatternRegex, DateEpoch, DateTai64n
from .mytime import MyTime
from .failregex import FailRegex, Regex, RegexException
from .action import CommandAction
@ -100,7 +99,6 @@ class Filter(JailThread):
self.ticks = 0
self.dateDetector = DateDetector()
self.dateDetector.addDefaultTemplate()
logSys.debug("Created %s" % self)
def __repr__(self):
@ -263,20 +261,13 @@ class Filter(JailThread):
if pattern is None:
self.dateDetector = None
return
elif pattern.upper() == "EPOCH":
template = DateEpoch()
template.name = "Epoch"
elif pattern.upper() == "TAI64N":
template = DateTai64n()
template.name = "TAI64N"
else:
template = DatePatternRegex(pattern)
self.dateDetector = DateDetector()
self.dateDetector.appendTemplate(template)
logSys.info(" date pattern `%r`: `%s`",
pattern, template.name)
logSys.debug(" date pattern regex for %r: %s",
pattern, template.regex)
dd = DateDetector()
if not isinstance(pattern, (list, tuple)):
pattern = filter(bool, map(str.strip, re.split('\n+', pattern)))
for pattern in pattern:
dd.appendTemplate(pattern)
self.dateDetector = dd
##
# Get the date detector pattern, or Default Detectors if not changed
@ -286,14 +277,16 @@ class Filter(JailThread):
def getDatePattern(self):
if self.dateDetector is not None:
templates = self.dateDetector.templates
if len(templates) > 1:
# lazy template init, by first match
if not len(templates) or len(templates) > 2:
return None, "Default Detectors"
elif len(templates) == 1:
elif len(templates):
if hasattr(templates[0], "pattern"):
pattern = templates[0].pattern
else:
pattern = None
return pattern, templates[0].name
return None
##
# Set the maximum retry value.
@ -474,9 +467,9 @@ class Filter(JailThread):
(timeMatch, template) = self.dateDetector.matchTime(l)
if timeMatch:
tupleLine = (
l[:timeMatch.start()],
l[timeMatch.start():timeMatch.end()],
l[timeMatch.end():],
l[:timeMatch.start(1)],
l[timeMatch.start(1):timeMatch.end(1)],
l[timeMatch.end(1):],
(timeMatch, template)
)
else:

View File

@ -41,6 +41,21 @@ class MyTime:
"""
myTime = None
alternateNowTime = None
alternateNow = None
@staticmethod
def setAlternateNow(t):
"""Set current time.
Use None in order to always get the real current time.
@param t the time to set or None
"""
MyTime.alternateNowTime = t
MyTime.alternateNow = \
datetime.datetime.fromtimestamp(t) if t is not None else None
@staticmethod
def setTime(t):
@ -84,8 +99,9 @@ class MyTime:
"""
if MyTime.myTime is None:
return datetime.datetime.now()
else:
return datetime.datetime.fromtimestamp(MyTime.myTime)
if MyTime.myTime == MyTime.alternateNowTime:
return MyTime.alternateNow
return datetime.datetime.fromtimestamp(MyTime.myTime)
@staticmethod
def localtime(x=None):

View File

@ -26,10 +26,59 @@ from .mytime import MyTime
locale_time = LocaleTime()
timeRE = TimeRE()
timeRE['z'] = r"(?P<z>Z|[+-]\d{2}(?::?[0-5]\d)?)"
def _getYearCentRE(cent=(0,3), distance=3, now=(MyTime.now(), MyTime.alternateNow)):
""" Build century regex for last year and the next years (distance).
Thereby respect possible run in the test-cases (alternate date used there)
"""
cent = lambda year, f=cent[0], t=cent[1]: str(year)[f:t]
exprset = set( cent(now[0].year + i) for i in (-1, distance) )
if len(now) and now[1]:
exprset |= set( cent(now[1].year + i) for i in (-1, distance) )
return "(?:%s)" % "|".join(exprset) if len(exprset) > 1 else "".join(exprset)
def reGroupDictStrptime(found_dict):
#todo: implement literal time zone support like CET, PST, PDT, etc (via pytz):
#timeRE['z'] = r"%s?(?P<z>Z|[+-]\d{2}(?::?[0-5]\d)?|[A-Z]{3})?" % timeRE['Z']
timeRE['Z'] = r"(?P<Z>[A-Z]{3,5})"
timeRE['z'] = r"(?P<z>Z|UTC|GMT|[+-]\d{2}(?::?[0-5]\d)?)"
# Extend build-in TimeRE with some exact patterns
# exact two-digit patterns:
timeRE['Exd'] = r"(?P<d>3[0-1]|[1-2]\d|0[1-9])"
timeRE['Exm'] = r"(?P<m>1[0-2]|0[1-9])"
timeRE['ExH'] = r"(?P<H>2[0-3]|[0-1]\d)"
timeRE['ExM'] = r"(?P<M>[0-5]\d)"
timeRE['ExS'] = r"(?P<S>6[0-1]|[0-5]\d)"
# more precise year patterns, within same century of last year and
# the next 3 years (for possible long uptime of fail2ban); thereby
# respect possible run in the test-cases (alternate date used there):
timeRE['ExY'] = r"(?P<Y>%s\d)" % _getYearCentRE(cent=(0,3), distance=3)
timeRE['Exy'] = r"(?P<y>%s\d)" % _getYearCentRE(cent=(2,3), distance=3)
def getTimePatternRE():
keys = timeRE.keys()
patt = (r"%%(%%|%s|[%s])" % (
"|".join([k for k in keys if len(k) > 1]),
"".join([k for k in keys if len(k) == 1]),
))
names = {
'a': "DAY", 'A': "DAYNAME", 'b': "MON", 'B': "MONTH", 'd': "Day",
'H': "24hour", 'I': "12hour", 'j': "Yearday", 'm': "Month",
'M': "Minute", 'p': "AMPM", 'S': "Second", 'U': "Yearweek",
'w': "Weekday", 'W': "Yearweek", 'y': 'Year2', 'Y': "Year", '%': "%",
'z': "Zone offset", 'f': "Microseconds", 'Z': "Zone name",
}
for key in set(keys) - set(names): # may not have them all...
if key.startswith('Ex'):
kn = names.get(key[2:])
if kn:
names[key] = "Ex" + kn
continue
names[key] = "%%%s" % key
return (patt, names)
def reGroupDictStrptime(found_dict, msec=False):
"""Return time from dictionary of strptime fields
This is tweaked from python built-in _strptime.
@ -58,14 +107,15 @@ def reGroupDictStrptime(found_dict):
# weekday and julian defaulted to -1 so as to signal need to calculate
# values
weekday = julian = -1
for group_key in found_dict.keys():
for key, val in found_dict.iteritems():
if val is None: continue
# Directives not explicitly handled below:
# c, x, X
# handled by making out of other directives
# U, W
# worthless without day of the week
if group_key == 'y':
year = int(found_dict['y'])
if key == 'y':
year = int(val)
# Open Group specification for strptime() states that a %y
#value in the range of [00, 68] is in the century 2000, while
#[69,99] is in the century 1900
@ -73,20 +123,20 @@ def reGroupDictStrptime(found_dict):
year += 2000
else:
year += 1900
elif group_key == 'Y':
year = int(found_dict['Y'])
elif group_key == 'm':
month = int(found_dict['m'])
elif group_key == 'B':
month = locale_time.f_month.index(found_dict['B'].lower())
elif group_key == 'b':
month = locale_time.a_month.index(found_dict['b'].lower())
elif group_key == 'd':
day = int(found_dict['d'])
elif group_key == 'H':
hour = int(found_dict['H'])
elif group_key == 'I':
hour = int(found_dict['I'])
elif key == 'Y':
year = int(val)
elif key == 'm':
month = int(val)
elif key == 'B':
month = locale_time.f_month.index(val.lower())
elif key == 'b':
month = locale_time.a_month.index(val.lower())
elif key == 'd':
day = int(val)
elif key == 'H':
hour = int(val)
elif key == 'I':
hour = int(val)
ampm = found_dict.get('p', '').lower()
# If there was no AM/PM indicator, we'll treat this like AM
if ampm in ('', locale_time.am_pm[0]):
@ -101,38 +151,39 @@ def reGroupDictStrptime(found_dict):
# 12 noon == 12 PM == hour 12
if hour != 12:
hour += 12
elif group_key == 'M':
minute = int(found_dict['M'])
elif group_key == 'S':
second = int(found_dict['S'])
elif group_key == 'f':
s = found_dict['f']
# Pad to always return microseconds.
s += "0" * (6 - len(s))
fraction = int(s)
elif group_key == 'A':
weekday = locale_time.f_weekday.index(found_dict['A'].lower())
elif group_key == 'a':
weekday = locale_time.a_weekday.index(found_dict['a'].lower())
elif group_key == 'w':
weekday = int(found_dict['w'])
elif key == 'M':
minute = int(val)
elif key == 'S':
second = int(val)
elif key == 'f':
if msec:
s = val
# Pad to always return microseconds.
s += "0" * (6 - len(s))
fraction = int(s)
elif key == 'A':
weekday = locale_time.f_weekday.index(val.lower())
elif key == 'a':
weekday = locale_time.a_weekday.index(val.lower())
elif key == 'w':
weekday = int(val)
if weekday == 0:
weekday = 6
else:
weekday -= 1
elif group_key == 'j':
julian = int(found_dict['j'])
elif group_key in ('U', 'W'):
week_of_year = int(found_dict[group_key])
if group_key == 'U':
elif key == 'j':
julian = int(val)
elif key in ('U', 'W'):
week_of_year = int(val)
if key == 'U':
# U starts week on Sunday.
week_of_year_start = 6
else:
# W starts week on Monday.
week_of_year_start = 0
elif group_key == 'z':
z = found_dict['z']
if z == "Z":
elif key == 'z':
z = val
if z in ("Z", "UTC", "GMT"):
tzoffset = 0
else:
tzoffset = int(z[1:3]) * 60 # Hours...
@ -140,6 +191,10 @@ def reGroupDictStrptime(found_dict):
tzoffset += int(z[-2:]) # ...and minutes
if z.startswith("-"):
tzoffset = -tzoffset
elif key == 'Z':
z = val
if z in ("UTC", "GMT"):
tzoffset = 0
# Fail2Ban will assume it's this year
assume_year = False
@ -176,7 +231,7 @@ def reGroupDictStrptime(found_dict):
# Actully create date
date_result = datetime.datetime(
year, month, day, hour, minute, second, fraction)
if gmtoff:
if gmtoff is not None:
date_result = date_result - datetime.timedelta(seconds=gmtoff)
if date_result > now and assume_today:
@ -189,7 +244,9 @@ def reGroupDictStrptime(found_dict):
year=year-1, month=month, day=day)
if gmtoff is not None:
return calendar.timegm(date_result.utctimetuple())
tm = calendar.timegm(date_result.utctimetuple())
else:
return time.mktime(date_result.timetuple())
tm = time.mktime(date_result.timetuple())
if msec:
tm += fraction/1000000.0
return tm

View File

@ -306,7 +306,7 @@ class Transmitter:
actionvalue = command[4]
setattr(action, actionkey, actionvalue)
return getattr(action, actionkey)
raise Exception("Invalid command (no set action or not yet implemented)")
raise Exception("Invalid command %r (no set action or not yet implemented)" % (command[1],))
def __commandGet(self, command):
name = command[0]

View File

@ -32,6 +32,7 @@ if sys.version_info >= (2,7): # pragma: no cover - may be unavailable
def setUp(self):
"""Call before every test case."""
super(BadIPsActionTest, self).setUp()
unittest.F2B.SkipIfNoNetwork()
self.jail = DummyJail()

View File

@ -48,6 +48,7 @@ class SMTPActionTest(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(SMTPActionTest, self).setUp()
self.jail = DummyJail()
pythonModule = os.path.join(CONFIG_DIR, "action.d", "smtp.py")
pythonModuleName = os.path.basename(pythonModule.rstrip(".py"))

View File

@ -32,6 +32,7 @@ from ..server.ticket import BanTicket
class AddFailure(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(AddFailure, self).setUp()
self.__ticket = BanTicket('193.168.0.128', 1167605999.0)
self.__banManager = BanManager()
@ -134,6 +135,7 @@ class AddFailure(unittest.TestCase):
class StatusExtendedCymruInfo(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(StatusExtendedCymruInfo, self).setUp()
unittest.F2B.SkipIfNoNetwork()
self.__ban_ip = "93.184.216.34"
self.__asn = "15133"

View File

@ -32,6 +32,7 @@ class BeautifierTest(unittest.TestCase):
def setUp(self):
""" Call before every test case """
super(BeautifierTest, self).setUp()
self.b = Beautifier()
def tearDown(self):

View File

@ -55,6 +55,7 @@ class ConfigReaderTest(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(ConfigReaderTest, self).setUp()
self.d = tempfile.mkdtemp(prefix="f2b-temp")
self.c = ConfigReaderUnshared(basedir=self.d)

View File

@ -20,3 +20,8 @@ failregex = ^%(__prefix_line)sF2B: failure from <HOST>$
# just to test multiple ignoreregex:
ignoreregex = ^%(__prefix_line)sF2B: error from 192.0.2.251$
^%(__prefix_line)sF2B: error from 192.0.2.252$
# specify only exact date patterns, +1 with %%Y to test usage of last known date by wrong dates like 0000-00-00...
datepattern = {^LN-BEG}%%ExY(?P<_sep>[-/.])%%m(?P=_sep)%%d[T ]%%H:%%M:%%S(?:[.,]%%f)?(?:\s*%%z)?
{^LN-BEG}(?:%%a )?%%b %%d %%H:%%M:%%S(?:\.%%f)?(?: %%ExY)?
{^LN-BEG}%%Y(?P<_sep>[-/.])%%m(?P=_sep)%%d[T ]%%H:%%M:%%S(?:[.,]%%f)?(?:\s*%%z)?

View File

@ -30,7 +30,7 @@ import datetime
from ..server.datedetector import DateDetector
from ..server import datedetector
from ..server.datetemplate import DateTemplate
from ..server.datetemplate import DatePatternRegex, DateTemplate
from .utils import setUpMyTime, tearDownMyTime, LogCaptureTestCase
from ..helpers import getLogger
@ -42,35 +42,40 @@ class DateDetectorTest(LogCaptureTestCase):
def setUp(self):
"""Call before every test case."""
LogCaptureTestCase.setUp(self)
self.__old_eff_level = datedetector.logLevel
datedetector.logLevel = logSys.getEffectiveLevel()
setUpMyTime()
self.__datedetector = DateDetector()
self.__datedetector.addDefaultTemplate()
self.__datedetector = None
def tearDown(self):
"""Call after every test case."""
LogCaptureTestCase.tearDown(self)
datedetector.logLevel = self.__old_eff_level
tearDownMyTime()
@property
def datedetector(self):
if self.__datedetector is None:
self.__datedetector = DateDetector()
self.__datedetector.addDefaultTemplate()
return self.__datedetector
def testGetEpochTime(self):
self.__datedetector = DateDetector()
self.__datedetector.appendTemplate('EPOCH')
# correct epoch time, using all variants:
for dateUnix in (1138049999, 32535244799):
for date in ("%s", "[%s]", "[%s.555]", "audit(%s.555:101)"):
date = date % dateUnix
log = date + " [sshd] error: PAM: Authentication failure"
datelog = self.__datedetector.getTime(log)
datelog = self.datedetector.getTime(log)
self.assertTrue(datelog, "Parse epoch time for %s failed" % (date,))
( datelog, matchlog ) = datelog
self.assertEqual(int(datelog), dateUnix)
self.assertIn(matchlog.group(), (str(dateUnix), str(dateUnix)+'.555'))
self.assertIn(matchlog.group(1), (str(dateUnix), str(dateUnix)+'.555'))
# wrong, no epoch time (< 10 digits, more as 11 digits, begin/end of word) :
for dateUnix in ('123456789', '9999999999999999', '1138049999A', 'A1138049999'):
for date in ("%s", "[%s]", "[%s.555]", "audit(%s.555:101)"):
date = date % dateUnix
log = date + " [sshd] error: PAM: Authentication failure"
datelog = self.__datedetector.getTime(log)
datelog = self.datedetector.getTime(log)
self.assertFalse(datelog)
def testGetTime(self):
@ -80,109 +85,331 @@ class DateDetectorTest(LogCaptureTestCase):
# is not correctly determined atm, since year is not present
# in the log entry. Since this doesn't effect the operation
# of fail2ban -- we just ignore incorrect day of the week
( datelog, matchlog ) = self.__datedetector.getTime(log)
( datelog, matchlog ) = self.datedetector.getTime(log)
self.assertEqual(datelog, dateUnix)
self.assertEqual(matchlog.group(), 'Jan 23 21:59:59')
self.assertEqual(matchlog.group(1), 'Jan 23 21:59:59')
def testVariousTimes(self):
"""Test detection of various common date/time formats f2b should understand
"""
dateUnix = 1106513999.0
for anchored, sdate in (
(False, "Jan 23 21:59:59"),
(False, "Sun Jan 23 21:59:59 2005"),
(False, "Sun Jan 23 21:59:59"),
(False, "Sun Jan 23 2005 21:59:59"),
(False, "2005/01/23 21:59:59"),
(False, "2005.01.23 21:59:59"),
(False, "23/01/2005 21:59:59"),
(False, "23/01/05 21:59:59"),
(False, "23/Jan/2005:21:59:59"),
(False, "23/Jan/2005:21:59:59 +0100"),
(False, "01/23/2005:21:59:59"),
(False, "2005-01-23 21:59:59"),
(False, "2005-01-23 21:59:59,000"), # proftpd
(False, "23-Jan-2005 21:59:59"),
(False, "23-Jan-2005 21:59:59.02"),
(False, "23-Jan-2005 21:59:59 +0100"),
(False, "23-01-2005 21:59:59"),
(True, "1106513999"), # Portsetry
(False, "01-23-2005 21:59:59.252"), # reported on f2b, causes Feb29 fix to break
(False, "@4000000041f4104f00000000"), # TAI64N
(False, "2005-01-23T20:59:59.252Z"), #ISO 8601 (UTC)
(False, "2005-01-23T15:59:59-05:00"), #ISO 8601 with TZ
(False, "2005-01-23T21:59:59"), #ISO 8601 no TZ, assume local
(True, "<01/23/05@21:59:59>"),
(True, "050123 21:59:59"), # MySQL
(True, "Jan-23-05 21:59:59"), # ASSP like
(False, "Jan 23, 2005 9:59:59 PM"), # Apache Tomcat
(True, "1106513999"), # Regular epoch
(True, "1106513999.000"), # Regular epoch with millisec
(False, "audit(1106513999.000:987)"), # SELinux
# anchored - matching expression (pattern) is anchored
# bound - pattern can be tested using word boundary (e.g. False if contains in front some optional part)
# sdate - date string used in test log-line
# rdate - if specified, the result match, which differs from sdate
for anchored, bound, sdate, rdate in (
(False, True, "Jan 23 21:59:59", None),
(False, False, "Sun Jan 23 21:59:59 2005", None),
(False, False, "Sun Jan 23 21:59:59", None),
(False, False, "Sun Jan 23 2005 21:59:59", None),
(False, True, "2005/01/23 21:59:59", None),
(False, True, "2005.01.23 21:59:59", None),
(False, True, "23/01/2005 21:59:59", None),
(False, True, "23/01/05 21:59:59", None),
(False, True, "23/Jan/2005:21:59:59", None),
(False, True, "23/Jan/2005:21:59:59 +0100", None),
(False, True, "01/23/2005:21:59:59", None),
(False, True, "2005-01-23 21:59:59", None),
(False, True, "2005-01-23 21:59:59,000", None), # proftpd
(False, True, "23-Jan-2005 21:59:59", None),
(False, True, "23-Jan-2005 21:59:59.02", None),
(False, True, "23-Jan-2005 21:59:59 +0100", None),
(False, True, "23-01-2005 21:59:59", None),
(True, True, "1106513999", None), # Portsetry
(False, True, "01-23-2005 21:59:59.252", None), # reported on f2b, causes Feb29 fix to break
(False, False, "@4000000041f4104f00000000", None), # TAI64N
(False, True, "2005-01-23T20:59:59.252Z", None), #ISO 8601 (UTC)
(False, True, "2005-01-23T15:59:59-05:00", None), #ISO 8601 with TZ
(False, True, "2005-01-23 21:59:59", None), #ISO 8601 no TZ, assume local
(False, True, "20050123T215959", None), #Short ISO with T
(False, True, "20050123 215959", None), #Short ISO with space
(True, True, "<01/23/05@21:59:59>", None),
(False, True, "050123 21:59:59", None), # MySQL
(True, True, "Jan-23-05 21:59:59", None), # ASSP like
(False, True, "Jan 23, 2005 9:59:59 PM", None), # Apache Tomcat
(True, True, "1106513999", None), # Regular epoch
(True, True, "1106513999.000", None), # Regular epoch with millisec
(True, True, "[1106513999.000]", "1106513999.000"), # epoch squared (brackets are not in match)
(False, True, "audit(1106513999.000:987)", "1106513999.000"), # SELinux
(True, True, "no date line", None), # no date in string
):
if rdate is None and sdate != "no date line": rdate = sdate
logSys.debug('== test %r', (anchored, bound, sdate, rdate))
for should_match, prefix in (
(rdate is not None, ""),
(not anchored, "bogus-prefix "),
(False, "word-boundary")
):
for should_match, prefix in ((True, ""),
(not anchored, "bogus-prefix ")):
log = prefix + sdate + "[sshd] error: PAM: Authentication failure"
# if not allowed boundary test:
if not bound and prefix == "word-boundary": continue
logSys.debug(' -- test %-5s for %r', should_match, log)
# with getTime:
logtime = self.__datedetector.getTime(log)
logtime = self.datedetector.getTime(log)
if should_match:
self.assertNotEqual(logtime, None, "getTime retrieved nothing: failure for %s, anchored: %r, log: %s" % ( sdate, anchored, log))
self.assertNotEqual(logtime, None,
"getTime retrieved nothing: failure for %s by prefix %r, anchored: %r, log: %s" % ( sdate, prefix, anchored, log))
( logUnix, logMatch ) = logtime
self.assertEqual(logUnix, dateUnix, "getTime comparison failure for %s: \"%s\" is not \"%s\"" % (sdate, logUnix, dateUnix))
if sdate.startswith('audit('):
# yes, special case, the group only matches the number
self.assertEqual(logMatch.group(), '1106513999.000')
else:
self.assertEqual(logMatch.group(), sdate)
self.assertEqual(logUnix, dateUnix,
"getTime comparison failure for %s: by prefix %r \"%s\" is not \"%s\"" % (sdate, prefix, logUnix, dateUnix))
self.assertEqual(logMatch.group(1), rdate)
else:
self.assertEqual(logtime, None, "getTime should have not matched for %r Got: %s" % (sdate, logtime))
self.assertEqual(logtime, None,
"getTime should have not matched for %r by prefix %r Got: %s" % (sdate, prefix, logtime))
# with getTime(matchTime) - this combination used in filter:
matchTime = self.__datedetector.matchTime(log)
logtime = self.__datedetector.getTime(log, matchTime)
(timeMatch, template) = matchTime = self.datedetector.matchTime(log)
logtime = self.datedetector.getTime(log, matchTime)
logSys.debug(' -- found - %r', template.name if timeMatch else False)
if should_match:
self.assertNotEqual(logtime, None, "getTime retrieved nothing: failure for %s, anchored: %r, log: %s" % ( sdate, anchored, log))
self.assertNotEqual(logtime, None,
"getTime retrieved nothing: failure for %s by prefix %r, anchored: %r, log: %s" % ( sdate, prefix, anchored, log))
( logUnix, logMatch ) = logtime
self.assertEqual(logUnix, dateUnix, "getTime comparison failure for %s: \"%s\" is not \"%s\"" % (sdate, logUnix, dateUnix))
if sdate.startswith('audit('):
# yes, special case, the group only matches the number
self.assertEqual(logMatch.group(), '1106513999.000')
else:
self.assertEqual(logMatch.group(), sdate)
self.assertEqual(logUnix, dateUnix,
"getTime comparison failure for %s by prefix %r: \"%s\" is not \"%s\"" % (sdate, prefix, logUnix, dateUnix))
self.assertEqual(logMatch.group(1), rdate)
else:
self.assertEqual(logtime, None, "getTime should have not matched for %r Got: %s" % (sdate, logtime))
self.assertEqual(logtime, None,
"getTime should have not matched for %r by prefix %r Got: %s" % (sdate, prefix, logtime))
logSys.debug(' -- OK')
def testAllUniqueTemplateNames(self):
self.assertRaises(ValueError, self.__datedetector.appendTemplate,
self.__datedetector.templates[0])
self.assertRaises(ValueError, self.datedetector.appendTemplate,
self.datedetector.templates[0])
def testFullYearMatch_gh130(self):
# see https://github.com/fail2ban/fail2ban/pull/130
# yoh: unfortunately this test is not really effective to reproduce the
# situation but left in place to assure consistent behavior
mu = time.mktime(datetime.datetime(2012, 10, 11, 2, 37, 17).timetuple())
logdate = self.__datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')
logdate = self.datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')
self.assertNotEqual(logdate, None)
( logTime, logMatch ) = logdate
self.assertEqual(logTime, mu)
self.assertEqual(logMatch.group(), '2012/10/11 02:37:17')
self.assertEqual(logMatch.group(1), '2012/10/11 02:37:17')
# confuse it with year being at the end
for i in xrange(10):
( logTime, logMatch ) = self.__datedetector.getTime('11/10/2012 02:37:17 [error] 18434#0')
( logTime, logMatch ) = self.datedetector.getTime('11/10/2012 02:37:17 [error] 18434#0')
self.assertEqual(logTime, mu)
self.assertEqual(logMatch.group(), '11/10/2012 02:37:17')
self.assertEqual(logMatch.group(1), '11/10/2012 02:37:17')
# and now back to the original
( logTime, logMatch ) = self.__datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')
( logTime, logMatch ) = self.datedetector.getTime('2012/10/11 02:37:17 [error] 18434#0')
self.assertEqual(logTime, mu)
self.assertEqual(logMatch.group(), '2012/10/11 02:37:17')
self.assertEqual(logMatch.group(1), '2012/10/11 02:37:17')
def testDateTemplate(self):
t = DateTemplate()
t.setRegex('^a{3,5}b?c*$')
self.assertEqual(t.getRegex(), '^a{3,5}b?c*$')
self.assertRaises(Exception, t.getDate, '')
self.assertEqual(t.matchDate('aaaac').group(), 'aaaac')
t = DateTemplate()
t.setRegex('^a{3,5}b?c*$')
self.assertEqual(t.regex, '^(a{3,5}b?c*)$')
self.assertRaises(Exception, t.getDate, '')
self.assertEqual(t.matchDate('aaaac').group(1), 'aaaac')
## no word boundaries left and right:
t = DatePatternRegex()
t.pattern = '(?iu)**time:%ExY%Exm%ExdT%ExH%ExM%ExS**'
# ** was removed from end-regex:
self.assertFalse('**' in t.regex)
# match date:
dt = 'TIME:20050102T010203'
self.assertEqual(t.matchDate('X' + dt + 'X').group(1), dt)
self.assertEqual(t.matchDate(dt).group(1), dt)
# wrong year (for exact %ExY):
dt = 'TIME:50050102T010203'
self.assertFalse(t.matchDate(dt))
## start boundary left and word boundary right (automatically if not **):
t = DatePatternRegex()
t.pattern = '{^LN-BEG}time:%ExY%Exm%ExdT%ExH%ExM%ExS'
self.assertTrue('^' in t.regex)
# try match date:
dt = 'time:20050102T010203'
self.assertFalse(t.matchDate('X' + dt))
self.assertFalse(t.matchDate(dt + 'X'))
self.assertEqual(t.matchDate('##' + dt + '...').group(1), dt)
self.assertEqual(t.matchDate(dt).group(1), dt)
# case sensitive:
dt = 'TIME:20050102T010203'
self.assertFalse(t.matchDate(dt))
## auto-switching "ignore case" and "unicode"
t = DatePatternRegex()
t.pattern = '^%Y %b %d'
self.assertTrue('(?iu)' in t.regex)
dt = '2005 jun 03'; self.assertEqual(t.matchDate(dt).group(1), dt)
dt = '2005 Jun 03'; self.assertEqual(t.matchDate(dt).group(1), dt)
dt = '2005 JUN 03'; self.assertEqual(t.matchDate(dt).group(1), dt)
def testAmbiguousInOrderedTemplates(self):
dd = self.datedetector
for (debit, line, cnt) in (
# shortest distance to datetime should win:
("030324 0:03:59", "some free text 030324 0:03:59 -- 2003-03-07 17:05:01 ...", 1),
# some free text with datetime:
("2003-03-07 17:05:01", "some free text 2003-03-07 17:05:01 test ...", 15),
# distance collision detection (date from foreign input should not be found):
("030324 0:04:00", "server mysqld[1000]: 030324 0:04:00 [Warning] Access denied ..."
" foreign-input just some free text 2003-03-07 17:05:01 test", 10),
# distance collision detection (first date should be found):
("Sep 16 21:30:26", "server mysqld[1020]: Sep 16 21:30:26 server mysqld: 030916 21:30:26 [Warning] Access denied", 15),
# just to test sorting:
("2005-10-07 06:09:42", "server mysqld[5906]: 2005-10-07 06:09:42 5907 [Warning] Access denied", 20),
("2005-10-08T15:26:18.237955", "server mysqld[5906]: 2005-10-08T15:26:18.237955 6 [Note] Access denied", 20),
# date format changed again:
("051009 10:05:30", "server mysqld[1000]: 051009 10:05:30 [Warning] Access denied ...", 50),
):
logSys.debug('== test: %r', (debit, line, cnt))
for i in range(cnt):
logSys.debug('Line: %s', line)
match, template = dd.matchTime(line)
self.assertTrue(match)
self.assertEqual(match.group(1), debit)
def testLowLevelLogging(self):
# test coverage for the deep (heavy) debug messages:
try:
self.__old_eff_level = datedetector.logLevel
if datedetector.logLevel < logSys.getEffectiveLevel()+1:
datedetector.logLevel = logSys.getEffectiveLevel()+1
dd = self.datedetector
i = 0
for (line, cnt) in (
("server mysqld[5906]: 2005-10-07 06:09:%02i 5907 [Warning] Access denied", 2),
("server mysqld[5906]: 051007 06:10:%02i 5907 [Warning] Access denied", 5),
("server mysqld[5906]: 2005-10-07 06:09:%02i 5907 [Warning] Access denied", 10),
):
for i in range(i, i+cnt+1):
logSys.debug('== test: %r', (line % i, cnt))
match, template = dd.matchTime(line % i)
self.assertTrue(match)
finally:
datedetector.logLevel = self.__old_eff_level
def testWrongTemplate(self):
t = DatePatternRegex('(%ExY%Exm%Exd')
# lazy compiling used, so try match:
self.assertRaises(Exception, t.matchDate, '(20050101')
self.assertLogged("Compile %r failed" % t.name)
# abstract:
t = DateTemplate()
self.assertRaises(Exception, t.getDate, 'no date line')
iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z")
class CustomDateFormatsTest(unittest.TestCase):
def testIso8601(self):
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00Z")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 12, 0))
self.assertRaises(TypeError, iso8601.getDate, None)
self.assertRaises(TypeError, iso8601.getDate, date)
self.assertEqual(iso8601.getDate(""), None)
self.assertEqual(iso8601.getDate("Z"), None)
self.assertEqual(iso8601.getDate("2007-01-01T120:00:00Z"), None)
self.assertEqual(iso8601.getDate("2007-13-01T12:00:00Z"), None)
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00+0400")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 8, 0))
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00+04:00")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 8, 0))
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00-0400")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 16, 0))
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00-04")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 16, 0))
def testAmbiguousDatePattern(self):
defDD = DateDetector()
defDD.addDefaultTemplate()
for (matched, dp, line) in (
# positive case:
('Jan 23 21:59:59', None, 'Test failure Jan 23 21:59:59 for 192.0.2.1'),
# ambiguous "unbound" patterns (missed):
(False, None, 'Test failure TestJan 23 21:59:59.011 2015 for 192.0.2.1'),
(False, None, 'Test failure Jan 23 21:59:59123456789 for 192.0.2.1'),
# ambiguous "no optional year" patterns (matched):
('Aug 8 11:25:50', None, 'Aug 8 11:25:50 20030f2329b8 Authentication failed from 192.0.2.1'),
('Aug 8 11:25:50', None, '[Aug 8 11:25:50] 20030f2329b8 Authentication failed from 192.0.2.1'),
('Aug 8 11:25:50 2014', None, 'Aug 8 11:25:50 2014 20030f2329b8 Authentication failed from 192.0.2.1'),
# direct specified patterns:
('20:00:00 01.02.2003', r'%H:%M:%S %d.%m.%Y$', '192.0.2.1 at 20:00:00 01.02.2003'),
('[20:00:00 01.02.2003]', r'\[%H:%M:%S %d.%m.%Y\]', '192.0.2.1[20:00:00 01.02.2003]'),
('[20:00:00 01.02.2003]', r'\[%H:%M:%S %d.%m.%Y\]', '[20:00:00 01.02.2003]192.0.2.1'),
('[20:00:00 01.02.2003]', r'\[%H:%M:%S %d.%m.%Y\]$', '192.0.2.1[20:00:00 01.02.2003]'),
('[20:00:00 01.02.2003]', r'^\[%H:%M:%S %d.%m.%Y\]', '[20:00:00 01.02.2003]192.0.2.1'),
('[17/Jun/2011 17:00:45]', r'^\[%d/%b/%Y %H:%M:%S\]', '[17/Jun/2011 17:00:45] Attempt, IP address 192.0.2.1'),
('[17/Jun/2011 17:00:45]', r'\[%d/%b/%Y %H:%M:%S\]', 'Attempt [17/Jun/2011 17:00:45] IP address 192.0.2.1'),
('[17/Jun/2011 17:00:45]', r'\[%d/%b/%Y %H:%M:%S\]', 'Attempt IP address 192.0.2.1, date: [17/Jun/2011 17:00:45]'),
# direct specified patterns (begin/end, missed):
(False, r'%H:%M:%S %d.%m.%Y', '192.0.2.1x20:00:00 01.02.2003'),
(False, r'%H:%M:%S %d.%m.%Y', '20:00:00 01.02.2003x192.0.2.1'),
# direct specified unbound patterns (no begin/end boundary):
('20:00:00 01.02.2003', r'**%H:%M:%S %d.%m.%Y**', '192.0.2.1x20:00:00 01.02.2003'),
('20:00:00 01.02.2003', r'**%H:%M:%S %d.%m.%Y**', '20:00:00 01.02.2003x192.0.2.1'),
# pattern enclosed with stars (in comparison to example above):
('*20:00:00 01.02.2003*', r'\**%H:%M:%S %d.%m.%Y\**', 'test*20:00:00 01.02.2003*test'),
# direct specified patterns (begin/end, matched):
('20:00:00 01.02.2003', r'%H:%M:%S %d.%m.%Y', '192.0.2.1 20:00:00 01.02.2003'),
('20:00:00 01.02.2003', r'%H:%M:%S %d.%m.%Y', '20:00:00 01.02.2003 192.0.2.1'),
# wrong year in 1st date, so failed by convert using not precise year (filter used last known date),
# in the 2nd and 3th tests (with precise year) it should find correct the 2nd date:
(None, r'%Y-%Exm-%Exd %ExH:%ExM:%ExS', "0000-12-30 00:00:00 - 2003-12-30 00:00:00"),
('2003-12-30 00:00:00', r'%ExY-%Exm-%Exd %ExH:%ExM:%ExS', "0000-12-30 00:00:00 - 2003-12-30 00:00:00"),
('2003-12-30 00:00:00', None, "0000-12-30 00:00:00 - 2003-12-30 00:00:00"),
# wrong date recognized short month/day (unbounded date pattern without separator between parts),
# in the 2nd and 3th tests (with precise month and day) it should find correct the 2nd date:
('200333 010203', r'%Y%m%d %H%M%S', "text:200333 010203 | date:20031230 010203"),
('20031230 010203', r'%ExY%Exm%Exd %ExH%ExM%ExS', "text:200333 010203 | date:20031230 010203"),
('20031230 010203', None, "text:200333 010203 | date:20031230 010203"),
# Explicit bound in start of the line using {^LN-BEG} key,
# (negative) in the 1st case without line begin boundary - wrong date may be found,
# (positive) in the 2nd case with line begin boundary - unexpected date / log line (not found)
# (positive) and in 3th case with line begin boundary - find the correct date
("20030101 000000", "%ExY%Exm%Exd %ExH%ExM%ExS", "00001230 010203 - 20030101 000000"),
(None, "{^LN-BEG}%ExY%Exm%Exd %ExH%ExM%ExS", "00001230 010203 - 20030101 000000"),
("20031230 010203", "{^LN-BEG}%ExY%Exm%Exd %ExH%ExM%ExS", "20031230 010203 - 20030101 000000"),
# Explicit bound in start of the line using {^LN-BEG} key,
# up to 2 non-alphanumeric chars front, ** - no word boundary on the right
("20031230010203", "{^LN-BEG}%ExY%Exm%Exd%ExH%ExM%ExS**", "2003123001020320030101000000"),
("20031230010203", "{^LN-BEG}%ExY%Exm%Exd%ExH%ExM%ExS**", "#2003123001020320030101000000"),
("20031230010203", "{^LN-BEG}%ExY%Exm%Exd%ExH%ExM%ExS**", "##2003123001020320030101000000"),
("20031230010203", "{^LN-BEG}%ExY%Exm%Exd%ExH%ExM%ExS", "[20031230010203]20030101000000"),
# UTC/GMT time zone offset (with %z and %Z):
(1072746123.0 - 3600, "{^LN-BEG}%ExY-%Exm-%Exd %ExH:%ExM:%ExS(?: %z)?", "[2003-12-30 01:02:03] server ..."),
(1072746123.0 - 3600, "{^LN-BEG}%ExY-%Exm-%Exd %ExH:%ExM:%ExS(?: %Z)?", "[2003-12-30 01:02:03] server ..."),
(1072746123.0, "{^LN-BEG}%ExY-%Exm-%Exd %ExH:%ExM:%ExS(?: %z)?", "[2003-12-30 01:02:03 UTC] server ..."),
(1072746123.0, "{^LN-BEG}%ExY-%Exm-%Exd %ExH:%ExM:%ExS(?: %Z)?", "[2003-12-30 01:02:03 UTC] server ..."),
):
logSys.debug('== test: %r', (matched, dp, line))
if dp is None:
dd = defDD
else:
dd = DateDetector()
dd.appendTemplate(dp)
date = dd.getTime(line)
if matched:
self.assertTrue(date)
if isinstance(matched, basestring):
self.assertEqual(matched, date[1].group(1))
else:
self.assertEqual(matched, date[0])
else:
self.assertEqual(date, None)
# def testDefaultTempate(self):

View File

@ -138,8 +138,8 @@ def _start_params(tmp, use_stock=False, logtarget="/dev/null", db=":memory:"):
"""Filters list of 'files' to contain only directories (under dir)"""
return [f for f in files if isdir(pjoin(dir, f))]
shutil.copytree(STOCK_CONF_DIR, cfg, ignore=ig_dirs)
os.symlink(pjoin(STOCK_CONF_DIR, "action.d"), pjoin(cfg, "action.d"))
os.symlink(pjoin(STOCK_CONF_DIR, "filter.d"), pjoin(cfg, "filter.d"))
os.symlink(os.path.abspath(pjoin(STOCK_CONF_DIR, "action.d")), pjoin(cfg, "action.d"))
os.symlink(os.path.abspath(pjoin(STOCK_CONF_DIR, "filter.d")), pjoin(cfg, "filter.d"))
# replace fail2ban params (database with memory):
r = re.compile(r'^dbfile\s*=')
for line in fileinput.input(pjoin(cfg, "fail2ban.conf"), inplace=True):
@ -424,7 +424,7 @@ class Fail2banClientTest(Fail2banClientServerBase):
self.execSuccess(startparams, "-vvd")
self.assertLogged("Loading files")
self.assertLogged("logtarget")
@with_tmpdir
@with_kill_srv
def testClientStartBackgroundInside(self, tmp):
@ -773,6 +773,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
"maxretry = 3",
"findtime = 10m",
"failregex = ^\s*failure (401|403) from <HOST>",
"datepattern = {^LN-BEG}EPOCH",
"",
"[test-jail1]", "backend = " + backend, "filter =",
"action = ",

View File

@ -101,6 +101,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWrongIngnoreRE(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "{^LN-BEG}EPOCH",
"test", r".*? from <HOST>$", r".**"
)
self.assertFalse(fail2banRegex.start(opts, args))
@ -108,6 +109,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectFound(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--print-no-missed",
"Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
r"Authentication failure for .*? from <HOST>$"
@ -136,6 +138,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_1(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched",
Fail2banRegexTest.FILENAME_01,
Fail2banRegexTest.RE_00
@ -151,6 +154,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_1raw(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw",
Fail2banRegexTest.FILENAME_01,
Fail2banRegexTest.RE_00
@ -160,6 +164,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_1raw_noDns(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched", "--raw", "--usedns=no",
Fail2banRegexTest.FILENAME_01,
Fail2banRegexTest.RE_00
@ -169,6 +174,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testDirectRE_2(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--print-all-matched",
Fail2banRegexTest.FILENAME_02,
Fail2banRegexTest.RE_00
@ -178,7 +184,8 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testVerbose(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--verbose", "--print-no-missed",
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--verbose", "--verbose-date", "--print-no-missed",
Fail2banRegexTest.FILENAME_02,
Fail2banRegexTest.RE_00
)
@ -190,6 +197,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronChar(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
)
self.assertTrue(fail2banRegex.start(opts, args))
@ -203,6 +211,7 @@ class Fail2banRegexTest(LogCaptureTestCase):
def testWronCharDebuggex(self):
(opts, args, fail2banRegex) = _Fail2banRegex(
"--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
"--debuggex", "--print-all-matched",
Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
)

View File

@ -36,6 +36,7 @@ class AddFailure(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(AddFailure, self).setUp()
self.__items = None
self.__failManager = FailManager()

View File

@ -33,7 +33,6 @@ failregex = ^%(__prefix_line)s(?:error: PAM: )?Authentication failure for .* fro
#
ignoreregex = ^.+ john from host 192.168.1.1\s*$
[Init]
# "maxlines" is number of log lines to buffer for multi-line regex searches
maxlines = 1

View File

@ -9,3 +9,8 @@
2013-12-31 17:39:54.767815 [WARNING] sofia_reg.c:2531 Can't find user [1001@192.168.2.51] from 5.11.47.236
# failJSON: { "time": "2013-12-31T17:39:54", "match": true, "host": "185.24.234.141" }
2013-12-31 17:39:54.767815 [WARNING] sofia_reg.c:2531 Can't find user [100@192.168.2.51] from 185.24.234.141
# failJSON: { "time": "2016-09-25T18:57:58", "match": true, "host": "192.0.2.1", "desc": "Systemd dual time with prefix - 1st expr" }
2016-09-25T18:57:58.150982 www.srv.tld freeswitch[122921]: 2016-09-25 18:57:58.150982 [WARNING] sofia_reg.c:2889 Can't find user [201@::1] from 192.0.2.1
# failJSON: { "time": "2016-09-25T18:57:58", "match": true, "host": "192.0.2.2", "desc": "Systemd dual time with prefix - 2nd expr" }
2016-09-25T18:57:58.150982 www.srv.tld freeswitch[122921]: 2016-09-25 18:57:58.150982 [WARNING] sofia_reg.c:1720 SIP auth failure (INVITE) on sofia profile 'sipinterface_1' for [9810972597751739@::1] from ip 192.0.2.2

View File

@ -46,6 +46,14 @@ Jun 22 20:37:04 server test-demo[402]: writeToStorage plist={
# failJSON: { "time": "2005-06-22T20:37:04", "match": true , "host": "192.0.2.2" }
0000-12-30 00:00:00 server test-demo[47831]: F2B: failure from 192.0.2.2
# -- test no zone and UTC/GMT named zone "2005-06-21T14:55:10 UTC" == "2005-06-21T16:55:10 CEST" (diff +2h in CEST):
# failJSON: { "time": "2005-06-21T16:55:09", "match": true , "host": "192.0.2.09" }
2005-06-21 16:55:09 machine test-demo(pam_unix)[13709] F2B: error from 192.0.2.09
# failJSON: { "time": "2005-06-21T16:55:10", "match": true , "host": "192.0.2.10" }
2005-06-21 14:55:10 UTC machine test-demo(pam_unix)[13709] F2B: error from 192.0.2.10
# failJSON: { "time": "2005-06-21T16:55:11", "match": true , "host": "192.0.2.11" }
2005-06-21 14:55:11 GMT machine test-demo(pam_unix)[13709] F2B: error from 192.0.2.11
# failJSON: { "time": "2005-06-21T16:56:02", "match": true , "host": "192.0.2.250" }
[Jun 21 16:56:02] machine test-demo(pam_unix)[13709] F2B: error from 192.0.2.250
# failJSON: { "match": false, "desc": "test 1st ignoreregex" }

View File

@ -270,6 +270,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p
class BasicFilter(unittest.TestCase):
def setUp(self):
super(BasicFilter, self).setUp()
self.filter = Filter('name')
def testGetSetUseDNS(self):
@ -283,10 +284,10 @@ class BasicFilter(unittest.TestCase):
def testGetSetDatePattern(self):
self.assertEqual(self.filter.getDatePattern(),
(None, "Default Detectors"))
self.filter.setDatePattern("^%Y-%m-%d-%H%M%S.%f %z")
self.filter.setDatePattern("^%Y-%m-%d-%H%M%S.%f %z **")
self.assertEqual(self.filter.getDatePattern(),
("^%Y-%m-%d-%H%M%S.%f %z",
"^Year-Month-Day-24hourMinuteSecond.Microseconds Zone offset"))
("^%Y-%m-%d-%H%M%S.%f %z **",
"^Year-Month-Day-24hourMinuteSecond.Microseconds Zone offset **"))
def testAssertWrongTime(self):
self.assertRaises(AssertionError,
@ -363,6 +364,7 @@ class IgnoreIP(LogCaptureTestCase):
setUpMyTime()
self.filter.addIgnoreIP('192.168.1.0/25')
self.filter.addFailRegex('<HOST>')
self.filter.setDatePattern('{^LN-BEG}EPOCH')
self.filter.processLineAndAdd('1387203300.222 192.168.1.32')
self.assertLogged('Ignore 192.168.1.32')
tearDownMyTime()
@ -465,6 +467,7 @@ class LogFileFilterPoll(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(LogFileFilterPoll, self).setUp()
self.filter = FilterPoll(DummyJail())
self.filter.addLogPath(LogFileFilterPoll.FILENAME)
@ -480,6 +483,8 @@ class LogFileFilterPoll(unittest.TestCase):
self.assertFalse(self.filter.isModified(LogFileFilterPoll.FILENAME))
def testSeekToTimeSmallFile(self):
# speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
time = 1417512352
f = open(fname, 'w')
@ -564,6 +569,8 @@ class LogFileFilterPoll(unittest.TestCase):
_killfile(f, fname)
def testSeekToTimeLargeFile(self):
# speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
time = 1417512352
f = open(fname, 'w')
@ -653,6 +660,8 @@ class LogFileMonitor(LogCaptureTestCase):
self.assertLogged('Unable to open %s' % self.name)
def testErrorProcessLine(self):
# speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
self.filter.sleeptime /= 1000.0
## produce error with not callable processLine:
_org_processLine = self.filter.processLine
@ -715,6 +724,8 @@ class LogFileMonitor(LogCaptureTestCase):
pass
def testNewChangeViaGetFailures_simple(self):
# speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
# suck in lines from this sample log file
self.filter.getFailures(self.name)
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
@ -730,6 +741,8 @@ class LogFileMonitor(LogCaptureTestCase):
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
def testNewChangeViaGetFailures_rewrite(self):
# speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
#
# if we rewrite the file at once
self.file.close()
@ -748,6 +761,8 @@ class LogFileMonitor(LogCaptureTestCase):
_assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
def testNewChangeViaGetFailures_move(self):
# speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
#
# if we move file into a new location while it has been open already
self.file.close()
@ -769,6 +784,7 @@ class CommonMonitorTestCase(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(CommonMonitorTestCase, self).setUp()
self._failTotal = 0
def waitFailTotal(self, count, delay=1.):
@ -819,6 +835,8 @@ def get_monitor_failures_testcase(Filter_):
self.jail = DummyJail()
self.filter = Filter_(self.jail)
self.filter.addLogPath(self.name, autoSeek=False)
# speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
self.filter.active = True
self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
self.filter.start()
@ -1223,6 +1241,8 @@ class GetFailures(LogCaptureTestCase):
self.jail = DummyJail()
self.filter = FileFilter(self.jail)
self.filter.active = True
# speedup search using exact date pattern:
self.filter.setDatePattern('^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
# TODO Test this
#self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
#self.filter.setTimePattern("%b %d %H:%M:%S")
@ -1329,6 +1349,11 @@ class GetFailures(LogCaptureTestCase):
output = (('212.41.96.186', 4, 1124013600.0),
('212.41.96.185', 2, 1124013598.0))
# speedup search using exact date pattern:
self.filter.setDatePattern(('^%ExY(?P<_sep>[-/.])%m(?P=_sep)%d[T ]%H:%M:%S(?:[.,]%f)?(?:\s*%z)?',
'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?',
'^EPOCH'
))
self.filter.setMaxRetry(2)
self.filter.addLogPath(GetFailures.FILENAME_04, autoSeek=0)
self.filter.addFailRegex("Invalid user .* <HOST>")
@ -1358,6 +1383,8 @@ class GetFailures(LogCaptureTestCase):
if enc is not None:
self.tearDown();self.setUp();
self.filter.setLogEncoding(enc);
# speedup search using exact date pattern:
self.filter.setDatePattern('^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
self.assertNotLogged('Error decoding line');
self.filter.addLogPath(fname)
self.filter.addFailRegex(failregex)
@ -1533,6 +1560,7 @@ class DNSUtilsNetworkTests(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(DNSUtilsNetworkTests, self).setUp()
unittest.F2B.SkipIfNoNetwork()
def test_IPAddr(self):

View File

@ -23,13 +23,11 @@ __license__ = "GPL"
import logging
import os
import re
import sys
import unittest
import tempfile
import shutil
import fnmatch
import datetime
from glob import glob
from StringIO import StringIO
@ -37,8 +35,6 @@ from utils import LogCaptureTestCase, logSys as DefLogSys
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, uni_decode
from ..helpers import splitwords
from ..server.datedetector import DateDetector
from ..server.datetemplate import DatePatternRegex
from ..server.mytime import MyTime
@ -90,6 +86,7 @@ def _getSysPythonVersion():
class SetupTest(unittest.TestCase):
def setUp(self):
super(SetupTest, self).setUp()
unittest.F2B.SkipIfFast()
setup = os.path.join(os.path.dirname(__file__), '..', '..', 'setup.py')
self.setup = os.path.exists(setup) and setup or None
@ -320,91 +317,6 @@ class TestsUtilsTest(LogCaptureTestCase):
self.assertRaisesRegexp(Exception, 'not all arguments converted', lambda: logSys.debug('test', 1, 2, 3))
iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z")
class CustomDateFormatsTest(unittest.TestCase):
def testIso8601(self):
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00Z")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 12, 0))
self.assertRaises(TypeError, iso8601.getDate, None)
self.assertRaises(TypeError, iso8601.getDate, date)
self.assertEqual(iso8601.getDate(""), None)
self.assertEqual(iso8601.getDate("Z"), None)
self.assertEqual(iso8601.getDate("2007-01-01T120:00:00Z"), None)
self.assertEqual(iso8601.getDate("2007-13-01T12:00:00Z"), None)
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00+0400")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 8, 0))
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00+04:00")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 8, 0))
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00-0400")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 16, 0))
date = datetime.datetime.utcfromtimestamp(
iso8601.getDate("2007-01-25T12:00:00-04")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 16, 0))
def testAmbiguousDatePattern(self):
defDD = DateDetector()
defDD.addDefaultTemplate()
logSys = DefLogSys
for (matched, dp, line) in (
# positive case:
('Jan 23 21:59:59', None, 'Test failure Jan 23 21:59:59 for 192.0.2.1'),
# ambiguous "unbound" patterns (missed):
(False, None, 'Test failure TestJan 23 21:59:59.011 2015 for 192.0.2.1'),
(False, None, 'Test failure Jan 23 21:59:59123456789 for 192.0.2.1'),
# ambiguous "no optional year" patterns (matched):
('Aug 8 11:25:50', None, 'Aug 8 11:25:50 14430f2329b8 Authentication failed from 192.0.2.1'),
('Aug 8 11:25:50', None, '[Aug 8 11:25:50] 14430f2329b8 Authentication failed from 192.0.2.1'),
('Aug 8 11:25:50 2014', None, 'Aug 8 11:25:50 2014 14430f2329b8 Authentication failed from 192.0.2.1'),
# direct specified patterns:
('20:00:00 01.02.2003', r'%H:%M:%S %d.%m.%Y$', '192.0.2.1 at 20:00:00 01.02.2003'),
('[20:00:00 01.02.2003]', r'\[%H:%M:%S %d.%m.%Y\]', '192.0.2.1[20:00:00 01.02.2003]'),
('[20:00:00 01.02.2003]', r'\[%H:%M:%S %d.%m.%Y\]', '[20:00:00 01.02.2003]192.0.2.1'),
('[20:00:00 01.02.2003]', r'\[%H:%M:%S %d.%m.%Y\]$', '192.0.2.1[20:00:00 01.02.2003]'),
('[20:00:00 01.02.2003]', r'^\[%H:%M:%S %d.%m.%Y\]', '[20:00:00 01.02.2003]192.0.2.1'),
('[17/Jun/2011 17:00:45]', r'^\[%d/%b/%Y %H:%M:%S\]', '[17/Jun/2011 17:00:45] Attempt, IP address 192.0.2.1'),
('[17/Jun/2011 17:00:45]', r'\[%d/%b/%Y %H:%M:%S\]', 'Attempt [17/Jun/2011 17:00:45] IP address 192.0.2.1'),
('[17/Jun/2011 17:00:45]', r'\[%d/%b/%Y %H:%M:%S\]', 'Attempt IP address 192.0.2.1, date: [17/Jun/2011 17:00:45]'),
# direct specified patterns (begin/end, missed):
(False, r'%H:%M:%S %d.%m.%Y', '192.0.2.1x20:00:00 01.02.2003'),
(False, r'%H:%M:%S %d.%m.%Y', '20:00:00 01.02.2003x192.0.2.1'),
# direct specified patterns (begin/end, matched):
('20:00:00 01.02.2003', r'%H:%M:%S %d.%m.%Y', '192.0.2.1 20:00:00 01.02.2003'),
('20:00:00 01.02.2003', r'%H:%M:%S %d.%m.%Y', '20:00:00 01.02.2003 192.0.2.1'),
):
logSys.debug('== test: %r', (matched, dp, line))
if dp is None:
dd = defDD
else:
dp = DatePatternRegex(dp)
dd = DateDetector()
dd.appendTemplate(dp)
date = dd.getTime(line)
if matched:
self.assertTrue(date)
self.assertEqual(matched, date[1].group())
else:
self.assertEqual(date, None)
class MyTimeTest(unittest.TestCase):
def testStr2Seconds(self):

View File

@ -43,6 +43,7 @@ class FilterSamplesRegex(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(FilterSamplesRegex, self).setUp()
self.filter = Filter(None)
self.filter.returnRawHost = True
self.filter.checkAllRegex = True

View File

@ -66,7 +66,7 @@ class TransmitterBase(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
#super(TransmitterBase, self).setUp()
super(TransmitterBase, self).setUp()
self.transm = self.server._Server__transm
# To test thransmitter we don't need to start server...
#self.server.start('/dev/null', '/dev/null', force=False)
@ -303,6 +303,8 @@ class Transmitter(TransmitterBase):
jail=self.jailName)
self.setGetTest(
"datepattern", "Epoch", (None, "Epoch"), jail=self.jailName)
self.setGetTest(
"datepattern", "^Epoch", (None, "{^LN-BEG}Epoch"), jail=self.jailName)
self.setGetTest(
"datepattern", "TAI64N", (None, "TAI64N"), jail=self.jailName)
self.setGetTestNOK("datepattern", "%Cat%a%%%g", jail=self.jailName)
@ -1108,7 +1110,7 @@ class ServerConfigReaderTests(LogCaptureTestCase):
# (we don't use it in this test at all):
elif unittest.F2B.fast and (
len(cmd) > 3 and cmd[0] in ('set', 'multi-set') and cmd[2] == 'addfailregex'
):
): # pragma: no cover
cmd[0] = "set"
cmd[3] = "DUMMY-REGEX <HOST>"
# command to server, use cmdHandler direct instead of `transm.proceed(cmd)`:

View File

@ -41,6 +41,7 @@ class Socket(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
super(Socket, self).setUp()
self.server = AsyncServer(self)
sock_fd, sock_name = tempfile.mkstemp('fail2ban.sock', 'socket')
os.close(sock_fd)

View File

@ -48,6 +48,8 @@ from ..version import version
logSys = getLogger(__name__)
TEST_NOW = 1124013600
CONFIG_DIR = os.environ.get('FAIL2BAN_CONFIG_DIR', None)
if not CONFIG_DIR:
@ -257,6 +259,14 @@ def initTests(opts):
def F2B_SkipIfNoNetwork():
raise unittest.SkipTest('Skip test because of "--no-network"')
unittest.F2B.SkipIfNoNetwork = F2B_SkipIfNoNetwork
# persistently set time zone to CET (used in zone-related test-cases),
# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
os.environ['TZ'] = 'Europe/Zurich'
time.tzset()
# set alternate now for time related test cases:
MyTime.setAlternateNow(TEST_NOW)
# precache all invalid ip's (TEST-NET-1, ..., TEST-NET-3 according to RFC 5737):
c = DNSUtils.CACHE_ipToName
for i in xrange(255):
@ -286,17 +296,10 @@ old_TZ = os.environ.get('TZ', None)
def setUpMyTime():
# Set the time to a fixed, known value
# Sun Aug 14 12:00:00 CEST 2005
# yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
os.environ['TZ'] = 'Europe/Zurich'
time.tzset()
MyTime.setTime(1124013600)
MyTime.setTime(TEST_NOW)
def tearDownMyTime():
os.environ.pop('TZ')
if old_TZ: # pragma: no cover
os.environ['TZ'] = old_TZ
time.tzset()
MyTime.myTime = None
@ -384,7 +387,6 @@ def gatherTests(regexps=None, opts=None):
tests.addTest(unittest.makeSuite(misctestcase.HelpersTest))
tests.addTest(unittest.makeSuite(misctestcase.SetupTest))
tests.addTest(unittest.makeSuite(misctestcase.TestsUtilsTest))
tests.addTest(unittest.makeSuite(misctestcase.CustomDateFormatsTest))
tests.addTest(unittest.makeSuite(misctestcase.MyTimeTest))
# Database
tests.addTest(unittest.makeSuite(databasetestcase.DatabaseTest))
@ -404,6 +406,7 @@ def gatherTests(regexps=None, opts=None):
# DateDetector
tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
tests.addTest(unittest.makeSuite(datedetectortestcase.CustomDateFormatsTest))
# Filter Regex tests with sample logs
tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
@ -520,6 +523,16 @@ if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
self.fail(msg)
unittest.TestCase.assertNotIn = assertNotIn
_org_setUp = unittest.TestCase.setUp
def _customSetUp(self):
# print('=='*10, self)
if unittest.F2B.log_level <= logging.DEBUG: # so if DEBUG etc -- show them (and log it in travis)!
print("")
logSys.debug('='*10 + ' %s ' + '='*20, self.id())
_org_setUp(self)
unittest.TestCase.setUp = _customSetUp
class LogCaptureTestCase(unittest.TestCase):
@ -595,12 +608,11 @@ class LogCaptureTestCase(unittest.TestCase):
# Let's log everything into a string
self._log = LogCaptureTestCase._MemHandler(unittest.F2B.log_lazy)
logSys.handlers = [self._log]
if self._old_level <= logging.DEBUG: # so if DEBUG etc -- show them (and log it in travis)!
print("")
if self._old_level <= logging.DEBUG:
logSys.handlers += self._old_handlers
logSys.debug('='*10 + ' %s ' + '='*20, self.id())
else:
else: # lowest log level to capture messages
logSys.setLevel(logging.DEBUG)
super(LogCaptureTestCase, self).setUp()
def tearDown(self):
"""Call after every test case."""
@ -609,6 +621,7 @@ class LogCaptureTestCase(unittest.TestCase):
logSys = getLogger("fail2ban")
logSys.handlers = self._old_handlers
logSys.level = self._old_level
super(LogCaptureTestCase, self).tearDown()
def _is_logged(self, *s, **kwargs):
logged = self._log.getvalue()