RF: Refactor date detector and date template elements

Changes include to use Python class properties, merge some date
patterns, and change ISO8601 date template to DatePatternRegex class.
pull/598/head
Steven Hiscocks 2014-01-26 22:03:55 +00:00
parent 1e1261ccb4
commit f2ddb3e3d0
8 changed files with 332 additions and 443 deletions

View File

@ -24,82 +24,94 @@ __license__ = "GPL"
import sys, time, logging
from threading import Lock
from .datetemplate import DatePatternRegex, DateTai64n, DateEpoch, DateISO8601
from .datetemplate import DatePatternRegex, DateTai64n, DateEpoch
# Gets the instance of the logger.
logSys = logging.getLogger(__name__)
class DateDetector:
class DateDetector(object):
"""Manages one or more date templates to find a date within a log line.
"""
def __init__(self):
"""Initialise the date detector.
"""
self.__lock = Lock()
self.__templates = list()
self.__known_names = set()
def _appendTemplate(self, template):
name = template.getName()
name = template.name
if name in self.__known_names:
raise ValueError("There is already a template with name %s" % name)
raise ValueError(
"There is already a template with name %s" % name)
self.__known_names.add(name)
self.__templates.append(template)
def appendTemplate(self, template):
"""Add a date template to manage and use in search of dates.
Parameters
----------
template : DateTemplate or str
Can be either a `DateTemplate` instance, or a string which will
be used as the pattern for the `DatePatternRegex` template. The
template will then be added to the detector.
Raises
------
ValueError
If a template already exists with the same name.
"""
if isinstance(template, str):
template = DatePatternRegex(template)
DateDetector._appendTemplate(self, template)
self._appendTemplate(template)
def addDefaultTemplate(self):
"""Add Fail2Ban's default set of date templates.
"""
self.__lock.acquire()
try:
# asctime with subsecond: Sun Jan 23 21:59:59.011 2005
self.appendTemplate("%a %b %d %H:%M:%S\.%f %Y")
# asctime: Sun Jan 23 21:59:59 2005
self.appendTemplate("%a %b %d %H:%M:%S %Y")
# asctime without year: Sun Jan 23 21:59:59
self.appendTemplate("%a %b %d %H:%M:%S")
# standard: Jan 23 21:59:59
self.appendTemplate("%b %d %H:%M:%S")
# proftpd date: 2005-01-23 21:59:59,333
self.appendTemplate("%Y-%m-%d %H:%M:%S,%f")
# simple date: 2005-01-23 21:59:59
self.appendTemplate("%Y-%m-%d %H:%M:%S")
# asctime with optional day, subsecond and/or year:
# Sun Jan 23 21:59:59.011 2005
self.appendTemplate("(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %Y)?")
# simple date, optional subsecond (proftpd):
# 2005-01-23 21:59:59
self.appendTemplate("%Y-%m-%d %H:%M:%S(?:,%f)?")
# simple date: 2005/01/23 21:59:59
self.appendTemplate("%Y/%m/%d %H:%M:%S")
# simple date too (from x11vnc): 23/01/2005 21:59:59
self.appendTemplate("%d/%m/%Y %H:%M:%S")
# previous one but with year given by 2 digits: 23/01/05 21:59:59
# and with optional year given by 2 digits: 23/01/05 21:59:59
# (See http://bugs.debian.org/537610)
self.appendTemplate("%d/%m/%y %H:%M:%S")
# Apache format [31/Oct/2006:09:22:55 -0000]
self.appendTemplate("%d/%b/%Y:%H:%M:%S %z")
# [31/Oct/2006:09:22:55]
self.appendTemplate("%d/%b/%Y:%H:%M:%S")
self.appendTemplate("%d/%m/(?:%Y|%y) %H:%M:%S")
# Apache format optional time zone:
# [31/Oct/2006:09:22:55 -0000]
self.appendTemplate("%d/%b/%Y:%H:%M:%S(?: %z)?")
# CPanel 05/20/2008:01:57:39
self.appendTemplate("%m/%d/%Y:%H:%M:%S")
# custom for syslog-ng 2006.12.21 06:43:20
self.appendTemplate("%Y\.%m\.%d %H:%M:%S")
# named 26-Jul-2007 15:20:52.252
self.appendTemplate("%d-%b-%Y %H:%M:%S\.%f")
# roundcube 26-Jul-2007 15:20:52 +0200
self.appendTemplate("%d-%b-%Y %H:%M:%S %z")
self.appendTemplate("%d-%b-%Y %H:%M:%S(?:\.%f)?(?: %z)?")
# 26-Jul-2007 15:20:52
self.appendTemplate("%d-%b-%Y %H:%M:%S")
# 17-07-2008 17:23:25
self.appendTemplate("%d-%m-%Y %H:%M:%S")
# 01-27-2012 16:22:44.252
# subseconds explicit to avoid possible %m<->%d confusion
# with previous
self.appendTemplate("%m-%d-%Y %H:%M:%S\.%f")
# TAI64N
template = DateTai64n()
template.setName("TAI64N")
template.name = "TAI64N"
self.appendTemplate(template)
# Epoch
template = DateEpoch()
template.setName("Epoch")
template.name = "Epoch"
self.appendTemplate(template)
# ISO 8601
template = DateISO8601()
template.setName("ISO 8601")
self.appendTemplate(template)
self.appendTemplate("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?(?:%z)?")
# Only time information in the log
self.appendTemplate("^%H:%M:%S")
# <09/16/08@05:03:30>
@ -112,25 +124,60 @@ class DateDetector:
self.appendTemplate("^%b-%d-%y %H:%M:%S")
finally:
self.__lock.release()
def getTemplates(self):
@property
def templates(self):
"""List of template instances managed by the detector.
"""
return self.__templates
def matchTime(self, line, incHits=True):
def matchTime(self, line):
"""Attempts to find date on a log line using templates.
This uses the templates' `matchDate` method in an attempt to find
a date. It also increments the match hit count for the winning
template.
Parameters
----------
line : str
Line which is searched by the date templates.
Returns
-------
re.MatchObject
The regex match returned from the first successfully matched
template.
"""
self.__lock.acquire()
try:
for template in self.__templates:
match = template.matchDate(line)
if not match is None:
logSys.debug("Matched time template %s" % template.getName())
if incHits:
template.incHits()
logSys.debug("Matched time template %s" % template.name)
template.hits += 1
return match
return None
finally:
self.__lock.release()
def getTime(self, line):
"""Attempts to return the date on a log line using templates.
This uses the templates' `getDate` method in an attempt to find
a date.
Parameters
----------
line : str
Line which is searched by the date templates.
Returns
-------
float
The Unix timestamp returned from the first successfully matched
template.
"""
self.__lock.acquire()
try:
for template in self.__templates:
@ -138,7 +185,8 @@ class DateDetector:
date = template.getDate(line)
if date is None:
continue
logSys.debug("Got time %f for \"%r\" using template %s" % (date[0], date[1].group(), template.getName()))
logSys.debug("Got time %f for \"%r\" using template %s" %
(date[0], date[1].group(), template.name))
return date
except ValueError:
pass
@ -146,16 +194,19 @@ class DateDetector:
finally:
self.__lock.release()
##
# Sort the template lists using the hits score. This method is not called
# in this object and thus should be called from time to time.
def sortTemplate(self):
"""Sort the date templates by number of hits
Sort the template lists using the hits score. This method is not
called in this object and thus should be called from time to time.
This ensures the most commonly matched templates are checked first,
improving performance of matchTime and getTime.
"""
self.__lock.acquire()
try:
logSys.debug("Sorting the template list")
self.__templates.sort(key=lambda x: x.getHits(), reverse=True)
self.__templates.sort(key=lambda x: x.hits, reverse=True)
t = self.__templates[0]
logSys.debug("Winning template: %s with %d hits" % (t.getName(), t.getHits()))
logSys.debug("Winning template: %s with %d hits" % (t.name, t.hits))
finally:
self.__lock.release()

View File

@ -26,211 +26,132 @@ __license__ = "GPL"
import re, time, calendar
import logging
from abc import abstractmethod
from datetime import datetime
from datetime import timedelta
from .mytime import MyTime
from . import iso8601
from .strptime import reGroupDictStrptime, timeRE
logSys = logging.getLogger(__name__)
class DateTemplate(object):
"""A template which searches for and returns a date from a log line.
This is an not functional abstract class which other templates should
inherit from.
"""
def __init__(self):
self.__name = ""
self.__regex = ""
self.__cRegex = None
self.__hits = 0
def setName(self, name):
self.__name = name
def getName(self):
return self.__name
"""Initialise the date template.
"""
self._name = ""
self._regex = ""
self._cRegex = None
self.hits = 0
@property
def name(self):
"""Name assigned to template.
"""
return self._name
@name.setter
def name(self, name):
self._name = name
def getRegex(self):
return self._regex
def setRegex(self, regex, wordBegin=True):
#logSys.debug(u"setRegex for %s is %r" % (self.__name, regex))
"""Sets regex to use for searching for date in log line.
Parameters
----------
regex : str
The regex the template will use for searching for a date.
wordBegin : bool
Defines whether the regex should be modified to search at
begining of a word, by adding "\\b" to start of regex.
Default True.
Raises
------
re.error
If regular expression fails to compile
"""
regex = regex.strip()
if (wordBegin and not re.search(r'^\^', regex)):
regex = r'\b' + regex
self.__regex = regex
self.__cRegex = re.compile(regex, re.UNICODE | re.IGNORECASE)
def getRegex(self):
return self.__regex
def getHits(self):
return self.__hits
self._regex = regex
self._cRegex = re.compile(regex, re.UNICODE | re.IGNORECASE)
def incHits(self):
self.__hits += 1
regex = property(getRegex, setRegex, doc=
"""Regex used to search for date.
""")
def resetHits(self):
self.__hits = 0
def matchDate(self, line):
dateMatch = self.__cRegex.search(line)
"""Check if regex for date matches on a log line.
"""
dateMatch = self._cRegex.search(line)
return dateMatch
@abstractmethod
def getDate(self, line):
raise Exception("matchDate() is abstract")
"""Abstract method, which should return the date for a log line
This should return the date for a log line, typically taking the
date from the part of the line which matched the templates regex.
This requires abstraction, therefore just raises exception.
Parameters
----------
line : str
Log line, of which the date should be extracted from.
Raises
------
NotImplementedError
Abstract method, therefore always returns this.
"""
raise NotImplementedError("getDate() is abstract")
class DateEpoch(DateTemplate):
"""A date template which searches for Unix timestamps.
This includes Unix timestamps which appear at start of a line, optionally
within square braces (nsd), or on SELinux audit log lines.
"""
def __init__(self):
"""Initialise the date template.
"""
DateTemplate.__init__(self)
self.setRegex("(?:^|(?P<square>(?<=^\[))|(?P<selinux>(?<=audit\()))\d{10}(?:\.\d{3,6})?(?(selinux)(?=:\d+\))(?(square)(?=\])))")
self.regex = "(?:^|(?P<square>(?<=^\[))|(?P<selinux>(?<=audit\()))\d{10}(?:\.\d{3,6})?(?(selinux)(?=:\d+\))(?(square)(?=\])))"
def getDate(self, line):
"""Method to return the date for a log line.
Parameters
----------
line : str
Log line, of which the date should be extracted from.
Returns
-------
(float, str)
Tuple containing a Unix timestamp, and the string of the date
which was matched and in turned used to calculated the timestamp.
"""
dateMatch = self.matchDate(line)
if dateMatch:
# extract part of format which represents seconds since epoch
return (float(dateMatch.group()), dateMatch)
return None
##
# Use strptime() to parse a date. Our current locale is the 'C'
# one because we do not set the locale explicitly. This is POSIX
# standard.
class DateStrptime(DateTemplate):
TABLE = dict()
TABLE["Jan"] = ["Sty"]
TABLE["Feb"] = [u"Fév", "Lut"]
TABLE["Mar"] = [u"Mär", "Mar"]
TABLE["Apr"] = ["Avr", "Kwi"]
TABLE["May"] = ["Mai", "Maj"]
TABLE["Jun"] = ["Lip"]
TABLE["Jul"] = ["Sie"]
TABLE["Aug"] = ["Aou", "Wrz"]
TABLE["Sep"] = ["Sie"]
TABLE["Oct"] = [u"Paź"]
TABLE["Nov"] = ["Lis"]
TABLE["Dec"] = [u"Déc", "Dez", "Gru"]
def __init__(self):
DateTemplate.__init__(self)
self._pattern = ""
self._unsupportedStrptimeBits = False
def setPattern(self, pattern):
self._unsupported_f = not DateStrptime._f and re.search('%f', pattern)
self._unsupported_z = not DateStrptime._z and re.search('%z', pattern)
self._pattern = pattern
def getPattern(self):
return self._pattern
#@staticmethod
def convertLocale(date):
for t in DateStrptime.TABLE:
for m in DateStrptime.TABLE[t]:
if date.find(m) >= 0:
logSys.debug(u"Replacing %r with %r in %r" %
(m, t, date))
return date.replace(m, t)
return date
convertLocale = staticmethod(convertLocale)
def getDate(self, line):
dateMatch = self.matchDate(line)
if dateMatch:
datePattern = self.getPattern()
if self._unsupported_f:
if dateMatch.group('_f'):
datePattern = re.sub(r'%f', dateMatch.group('_f'), datePattern)
logSys.debug(u"Replacing %%f with %r now %r" % (dateMatch.group('_f'), datePattern))
if self._unsupported_z:
if dateMatch.group('_z'):
datePattern = re.sub(r'%z', dateMatch.group('_z'), datePattern)
logSys.debug(u"Replacing %%z with %r now %r" % (dateMatch.group('_z'), datePattern))
try:
# Try first with 'C' locale
date = datetime.strptime(dateMatch.group(), datePattern)
except ValueError:
# Try to convert date string to 'C' locale
conv = self.convertLocale(dateMatch.group())
try:
date = datetime.strptime(conv, self.getPattern())
except (ValueError, re.error), e:
# Try to add the current year to the pattern. Should fix
# the "Feb 29" issue.
opattern = self.getPattern()
# makes sense only if %Y is not in already:
if not '%Y' in opattern:
pattern = "%s %%Y" % opattern
conv += " %s" % MyTime.gmtime()[0]
date = datetime.strptime(conv, pattern)
else:
# we are helpless here
raise ValueError(
"Given pattern %r does not match. Original "
"exception was %r and Feb 29 workaround could not "
"be tested due to already present year mark in the "
"pattern" % (opattern, e))
if self._unsupported_z:
z = dateMatch.group('_z')
if z:
delta = timedelta(hours=int(z[1:3]),minutes=int(z[3:]))
direction = z[0]
logSys.debug(u"Altering %r by removing time zone offset (%s)%s" % (date, direction, delta))
# here we reverse the effect of the timezone and force it to UTC
if direction == '+':
date -= delta
else:
date += delta
date = date.replace(tzinfo=iso8601.Utc())
else:
logSys.warning("No _z group captured and %%z is not supported on current platform"
" - timezone ignored and assumed to be localtime. date: %s on line: %s"
% (date, line))
if date.year < 2000:
# There is probably no year field in the logs
# NOTE: Possibly makes week/year day incorrect
date = date.replace(year=MyTime.gmtime()[0])
# Bug fix for #1241756
# If the date is greater than the current time, we suppose
# that the log is not from this year but from the year before
if date > MyTime.now():
logSys.debug(
u"Correcting deduced year by one since %s > now (%s)" %
(date, MyTime.time()))
date = date.replace(year=date.year-1)
elif date.month == 1 and date.day == 1:
# If it is Jan 1st, it is either really Jan 1st or there
# is neither month nor day in the log.
# NOTE: Possibly makes week/year day incorrect
date = date.replace(
month=MyTime.gmtime()[1], day=MyTime.gmtime()[2])
if date.tzinfo:
return ( calendar.timegm(date.utctimetuple()), dateMatch )
else:
return ( time.mktime(date.utctimetuple()), dateMatch )
return None
try:
time.strptime("26-Jul-2007 15:20:52.252","%d-%b-%Y %H:%M:%S.%f")
DateStrptime._f = True
except (ValueError, KeyError):
DateTemplate._f = False
try:
time.strptime("24/Mar/2013:08:58:32 -0500","%d/%b/%Y:%H:%M:%S %z")
DateStrptime._z = True
except ValueError:
DateStrptime._z = False
class DatePatternRegex(DateStrptime):
class DatePatternRegex(DateTemplate):
_patternRE = r"%%(%%|[%s])" % "".join(timeRE.keys())
_patternName = {
'a': "DAY", 'A': "DAYNAME", 'b': "MON", 'B': "MONTH", 'd': "Day",
@ -241,39 +162,97 @@ class DatePatternRegex(DateStrptime):
for key in set(timeRE) - set(_patternName): # may not have them all...
_patternName[key] = "%%%s" % key
def __init__(self, pattern=None, **kwargs):
super(DatePatternRegex, self).__init__()
if pattern:
self.setPattern(pattern, **kwargs)
def __init__(self, pattern=None):
"""Initialise date template, with optional regex/pattern
def setPattern(self, pattern):
super(DatePatternRegex, self).setPattern(pattern)
super(DatePatternRegex, self).setName(
re.sub(self._patternRE, r'%(\1)s', pattern) % self._patternName)
Parameters
----------
pattern : str
Sets the date templates pattern.
"""
super(DatePatternRegex, self).__init__()
self._pattern = None
if pattern is not None:
self.pattern = pattern
@property
def pattern(self):
"""The pattern used for regex with strptime "%" time fields.
This should be a valid regular expression, of which matching string
will be extracted from the log line. strptime style "%" fields will
be replaced by appropriate regular expressions, or custom regex
groups with names as per the strptime fields can also be used
instead.
"""
return self._pattern
@pattern.setter
def pattern(self, pattern):
self._pattern = pattern
self._name = re.sub(
self._patternRE, r'%(\1)s', pattern) % self._patternName
super(DatePatternRegex, self).setRegex(
re.sub(self._patternRE, r'%(\1)s', pattern) % timeRE)
def getDate(self, line):
dateMatch = self.matchDate(line)
if dateMatch:
return reGroupDictStrptime(dateMatch.groupdict()), dateMatch
def setRegex(self, line):
def setRegex(self, value):
raise NotImplementedError("Regex derived from pattern")
def setName(self, line):
@DateTemplate.name.setter
def name(self, value):
raise NotImplementedError("Name derived from pattern")
def getDate(self, line):
"""Method to return the date for a log line.
This uses a custom version of strptime, using the named groups
from the instances `pattern` property.
Parameters
----------
line : str
Log line, of which the date should be extracted from.
Returns
-------
(float, str)
Tuple containing a Unix timestamp, and the string of the date
which was matched and in turned used to calculated the timestamp.
"""
dateMatch = self.matchDate(line)
if dateMatch:
groupdict = dict(
(key, value)
for key, value in dateMatch.groupdict().iteritems()
if value is not None)
return reGroupDictStrptime(groupdict), dateMatch
class DateTai64n(DateTemplate):
"""A date template which matches TAI64N formate timestamps.
"""
def __init__(self):
"""Initialise the date template.
"""
DateTemplate.__init__(self)
# We already know the format for TAI64N
# yoh: we should not add an additional front anchor
self.setRegex("@[0-9a-f]{24}", wordBegin=False)
def getDate(self, line):
"""Method to return the date for a log line.
Parameters
----------
line : str
Log line, of which the date should be extracted from.
Returns
-------
(float, str)
Tuple containing a Unix timestamp, and the string of the date
which was matched and in turned used to calculated the timestamp.
"""
dateMatch = self.matchDate(line)
if dateMatch:
# extract part of format which represents seconds since epoch
@ -282,19 +261,3 @@ class DateTai64n(DateTemplate):
# convert seconds from HEX into local time stamp
return (int(seconds_since_epoch, 16), dateMatch)
return None
class DateISO8601(DateTemplate):
def __init__(self):
DateTemplate.__init__(self)
self.setRegex(iso8601.ISO8601_REGEX_RAW)
def getDate(self, line):
dateMatch = self.matchDate(line)
if dateMatch:
# Parses the date.
value = dateMatch.group()
return (calendar.timegm(iso8601.parse_date(value).utctimetuple()), dateMatch)
return None

View File

@ -27,7 +27,7 @@ from .failmanager import FailManagerEmpty, FailManager
from .ticket import FailTicket
from .jailthread import JailThread
from .datedetector import DateDetector
from .datetemplate import DatePatternRegex, DateISO8601, DateEpoch, DateTai64n
from .datetemplate import DatePatternRegex, DateEpoch, DateTai64n
from .mytime import MyTime
from .failregex import FailRegex, Regex, RegexException
from .action import CommandAction
@ -202,24 +202,20 @@ class Filter(JailThread):
if pattern is None:
self.dateDetector = None
return
elif pattern.upper() == "ISO8601":
template = DateISO8601()
template.setName("ISO8601")
elif pattern.upper() == "EPOCH":
template = DateEpoch()
template.setName("Epoch")
template.name = "Epoch"
elif pattern.upper() == "TAI64N":
template = DateTai64n()
template.setName("TAI64N")
template.name = "TAI64N"
else:
template = DatePatternRegex()
template.setPattern(pattern)
template = DatePatternRegex(pattern)
self.dateDetector = DateDetector()
self.dateDetector.appendTemplate(template)
logSys.info("Date pattern set to `%r`: `%s`" %
(pattern, template.getName()))
(pattern, template.name))
logSys.debug("Date pattern regex for %r: %s" %
(pattern, template.getRegex()))
(pattern, template.regex))
##
# Get the date detector pattern, or Default Detectors if not changed
@ -228,15 +224,15 @@ class Filter(JailThread):
def getDatePattern(self):
if self.dateDetector is not None:
templates = self.dateDetector.getTemplates()
templates = self.dateDetector.templates
if len(templates) > 1:
return None, "Default Detectors"
elif len(templates) == 1:
if hasattr(templates[0], "getPattern"):
pattern = templates[0].getPattern()
if hasattr(templates[0], "pattern"):
pattern = templates[0].pattern
else:
pattern = None
return pattern, templates[0].getName()
return pattern, templates[0].name
##
# Set the maximum retry value.

View File

@ -1,136 +0,0 @@
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
# Copyright (c) 2007 Michael Twomey
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""ISO 8601 date time string parsing
Basic usage:
>>> import iso8601
>>> iso8601.parse_date("2007-01-25T12:00:00Z")
datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>)
>>>
"""
from datetime import datetime, timedelta, tzinfo
import time
import re
__all__ = ["parse_date", "ParseError"]
# Adapted from http://delete.me.uk/2005/03/iso8601.html
ISO8601_REGEX_RAW = "(?P<year>[0-9]{4})-(?P<month>[0-9]{1,2})-(?P<day>[0-9]{1,2})" \
"T(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?" \
"(?P<timezone>Z|[-+][0-9]{2}(:?[0-9]{2})?)?"
ISO8601_REGEX = re.compile(ISO8601_REGEX_RAW)
TIMEZONE_REGEX = re.compile("(?P<prefix>[+-])(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})?")
class ParseError(Exception):
"""Raised when there is a problem parsing a date string"""
# Yoinked from python docs
ZERO = timedelta(0)
class Utc(tzinfo):
"""UTC
"""
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
UTC = Utc()
class FixedOffset(tzinfo):
"""Fixed offset in hours and minutes from UTC
"""
def __init__(self, name, offset_hours, offset_minutes, offset_seconds=0):
self.__offset = timedelta(hours=offset_hours, minutes=offset_minutes, seconds=offset_seconds)
self.__name = name
def utcoffset(self, dt):
return self.__offset
def tzname(self, dt):
return self.__name
def dst(self, dt):
return ZERO
def __repr__(self):
return "<FixedOffset %r>" % self.__name
def parse_timezone(tzstring):
"""Parses ISO 8601 time zone specs into tzinfo offsets
"""
if tzstring == "Z":
return UTC
if tzstring is None:
zone_sec = -time.timezone
return FixedOffset(name=time.tzname[0],offset_hours=(zone_sec / 3600), offset_minutes=(zone_sec % 3600)/60, offset_seconds=zone_sec % 60)
m = TIMEZONE_REGEX.match(tzstring)
prefix, hours, minutes = m.groups()
if minutes is None:
minutes = 0
else:
minutes = int(minutes)
hours = int(hours)
if prefix == "-":
hours = -hours
minutes = -minutes
return FixedOffset(tzstring, hours, minutes)
def parse_date(datestring):
"""Parses ISO 8601 dates into datetime objects
The timezone is parsed from the date string. However it is quite common to
have dates without a timezone (not strictly correct). In this case the
default timezone specified in default_timezone is used. This is UTC by
default.
"""
if not isinstance(datestring, basestring):
raise ValueError("Expecting a string %r" % datestring)
m = ISO8601_REGEX.match(datestring)
if not m:
raise ParseError("Unable to parse date string %r" % datestring)
groups = m.groupdict()
tz = parse_timezone(groups["timezone"])
if groups["fraction"] is None:
groups["fraction"] = 0
else:
groups["fraction"] = int(float("0.%s" % groups["fraction"]) * 1e6)
try:
return datetime(int(groups["year"]), int(groups["month"]), int(groups["day"]),
int(groups["hour"]), int(groups["minute"]), int(groups["second"]),
int(groups["fraction"]), tz)
except Exception, e:
raise ParseError("Failed to create a valid datetime record due to: %s"
% e)

View File

@ -26,11 +26,24 @@ from .mytime import MyTime
locale_time = LocaleTime()
timeRE = TimeRE()
if 'z' not in timeRE: # python2.6 not present
timeRE['z'] = r"(?P<z>[+-]\d{2}[0-5]\d)"
timeRE['z'] = r"(?P<z>Z|[+-]\d{2}(?::?[0-5]\d)?)"
def reGroupDictStrptime(found_dict):
"""This is tweaked from python built-in _strptime"""
"""Return time from dictionary of strptime fields
This is tweaked from python built-in _strptime.
Parameters
----------
found_dict : dict
Dictionary where keys represent the strptime fields, and values the
respective value.
Returns
-------
float
Unix time stamp.
"""
now = MyTime.now()
year = month = day = hour = minute = None
@ -119,9 +132,14 @@ def reGroupDictStrptime(found_dict):
week_of_year_start = 0
elif group_key == 'z':
z = found_dict['z']
tzoffset = int(z[1:3]) * 60 + int(z[3:5])
if z.startswith("-"):
tzoffset = -tzoffset
if z == "Z":
tzoffset = 0
else:
tzoffset = int(z[1:3]) * 60 # Hours...
if len(z)>3:
tzoffset += int(z[-2:]) # ...and minutes
if z.startswith("-"):
tzoffset = -tzoffset
elif group_key == 'Z':
# Since -1 is default value only need to worry about setting tz if
# it can be something other than -1.
@ -158,7 +176,6 @@ def reGroupDictStrptime(found_dict):
month = datetime_result.month
day = datetime_result.day
# Add timezone info
tzname = found_dict.get("Z")
if tzoffset is not None:
gmtoff = tzoffset * 60
else:

View File

@ -27,7 +27,6 @@ __license__ = "GPL"
import unittest, calendar, time, datetime, re, pprint
from ..server.datedetector import DateDetector
from ..server.datetemplate import DateTemplate
from ..server.iso8601 import Utc
from .utils import setUpMyTime, tearDownMyTime
class DateDetectorTest(unittest.TestCase):
@ -88,8 +87,9 @@ class DateDetectorTest(unittest.TestCase):
(False, "23-01-2005 21:59:59"),
(False, "01-23-2005 21:59:59.252"), # reported on f2b, causes Feb29 fix to break
(False, "@4000000041f4104f00000000"), # TAI64N
(False, "2005-01-23T20:59:59.252Z"), #ISO 8601
(False, "2005-01-23T20:59:59.252Z"), #ISO 8601 (UTC)
(False, "2005-01-23T15:59:59-05:00"), #ISO 8601 with TZ
(False, "2005-01-23T21:59:59"), #ISO 8601 no TZ, assume local
(True, "<01/23/05@21:59:59>"),
(True, "050123 21:59:59"), # MySQL
(True, "Jan-23-05 21:59:59"), # ASSP like
@ -116,15 +116,15 @@ class DateDetectorTest(unittest.TestCase):
self.assertEqual(logtime, None, "getTime should have not matched for %r Got: %s" % (sdate, logtime))
def testStableSortTemplate(self):
old_names = [x.getName() for x in self.__datedetector.getTemplates()]
old_names = [x.name for x in self.__datedetector.templates]
self.__datedetector.sortTemplate()
# If there were no hits -- sorting should not change the order
for old_name, n in zip(old_names, self.__datedetector.getTemplates()):
self.assertEqual(old_name, n.getName()) # "Sort must be stable"
for old_name, n in zip(old_names, self.__datedetector.templates):
self.assertEqual(old_name, n.name) # "Sort must be stable"
def testAllUniqueTemplateNames(self):
self.assertRaises(ValueError, self.__datedetector.appendTemplate,
self.__datedetector.getTemplates()[0])
self.__datedetector.templates[0])
def testFullYearMatch_gh130(self):
# see https://github.com/fail2ban/fail2ban/pull/130

View File

@ -170,46 +170,46 @@ class TestsUtilsTest(unittest.TestCase):
self.assertTrue(pindex > 10) # we should have some traceback
self.assertEqual(s[:pindex], s[pindex+1:pindex*2 + 1])
from ..server import iso8601
import datetime
import time
from ..server.datetemplate import DatePatternRegex
iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z")
class CustomDateFormatsTest(unittest.TestCase):
def testIso8601(self):
date = iso8601.parse_date("2007-01-25T12:00:00Z")
date = datetime.datetime.fromtimestamp(
iso8601.getDate("2007-01-25T12:00:00Z")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 12, 0, tzinfo=iso8601.Utc()))
self.assertRaises(ValueError, iso8601.parse_date, None)
self.assertRaises(ValueError, iso8601.parse_date, date)
datetime.datetime(2007, 1, 25, 12, 0))
self.assertRaises(TypeError, iso8601.getDate, None)
self.assertRaises(TypeError, iso8601.getDate, date)
self.assertRaises(iso8601.ParseError, iso8601.parse_date, "")
self.assertRaises(iso8601.ParseError, iso8601.parse_date, "Z")
self.assertEqual(iso8601.getDate(""), None)
self.assertEqual(iso8601.getDate("Z"), None)
self.assertRaises(iso8601.ParseError,
iso8601.parse_date, "2007-01-01T120:00:00Z")
self.assertRaises(iso8601.ParseError,
iso8601.parse_date, "2007-13-01T12:00:00Z")
date = iso8601.parse_date("2007-01-25T12:00:00+0400")
self.assertEqual(iso8601.getDate("2007-01-01T120:00:00Z"), None)
self.assertEqual(iso8601.getDate("2007-13-01T12:00:00Z"), None)
date = datetime.datetime.fromtimestamp(
iso8601.getDate("2007-01-25T12:00:00+0400")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 8, 0, tzinfo=iso8601.Utc()))
date = iso8601.parse_date("2007-01-25T12:00:00+04:00")
datetime.datetime(2007, 1, 25, 8, 0))
date = datetime.datetime.fromtimestamp(
iso8601.getDate("2007-01-25T12:00:00+04:00")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 8, 0, tzinfo=iso8601.Utc()))
date = iso8601.parse_date("2007-01-25T12:00:00-0400")
datetime.datetime(2007, 1, 25, 8, 0))
date = datetime.datetime.fromtimestamp(
iso8601.getDate("2007-01-25T12:00:00-0400")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 16, 0, tzinfo=iso8601.Utc()))
date = iso8601.parse_date("2007-01-25T12:00:00-04")
datetime.datetime(2007, 1, 25, 16, 0))
date = datetime.datetime.fromtimestamp(
iso8601.getDate("2007-01-25T12:00:00-04")[0])
self.assertEqual(
date,
datetime.datetime(2007, 1, 25, 16, 0, tzinfo=iso8601.Utc()))
def testTimeZone(self):
# Just verify consistent operation and improve coverage ;)
self.assertEqual((iso8601.parse_timezone(None).tzname(False), iso8601.parse_timezone(None).tzname(True)), time.tzname)
self.assertEqual(iso8601.parse_timezone('Z').tzname(True), "UTC")
self.assertEqual(iso8601.parse_timezone('Z').dst(True), datetime.timedelta(0))
datetime.datetime(2007, 1, 25, 16, 0))

View File

@ -278,8 +278,6 @@ class Transmitter(TransmitterBase):
"datepattern", "Epoch", (None, "Epoch"), jail=self.jailName)
self.setGetTest(
"datepattern", "TAI64N", (None, "TAI64N"), jail=self.jailName)
self.setGetTest(
"datepattern", "ISO8601", (None, "ISO8601"), jail=self.jailName)
self.setGetTestNOK("datepattern", "%Cat%a%%%g", jail=self.jailName)
def testJailUseDNS(self):