Merge pull request #1094 from leeclemens/pep8-e3

Fix PEP8 E301, E302 and E303
pull/1096/head
Yaroslav Halchenko 2015-07-05 00:03:19 -04:00
commit 7fc93cee37
59 changed files with 124 additions and 48 deletions

View File

@ -35,6 +35,7 @@ else:
from fail2ban.server.actions import ActionBase
from fail2ban.version import version as f2bVersion
class BadIPsAction(ActionBase):
"""Fail2Ban action which reports bans to badips.com, and also
blacklist bad IPs listed on badips.com by using another action's

View File

@ -68,6 +68,7 @@ Matches for %(ip)s for jail %(jailname)s:
%(ipjailmatches)s
"""
class SMTPAction(ActionBase):
"""Fail2Ban action which sends emails to inform on jail starting,
stopping and bans.

View File

@ -37,6 +37,7 @@ Below derived from:
logging.NOTICE = logging.INFO + 5
logging.addLevelName(logging.NOTICE, 'NOTICE')
# define a new logger function for notice
# this is exactly like existing info, critical, debug...etc
def _Logger_notice(self, msg, *args, **kwargs):
@ -53,6 +54,7 @@ def _Logger_notice(self, msg, *args, **kwargs):
logging.Logger.notice = _Logger_notice
# define a new root level notice function
# this is exactly like existing info, critical, debug...etc
def _root_notice(msg, *args, **kwargs):

View File

@ -32,6 +32,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class ActionReader(DefinitionInitConfigReader):
_configOpts = [

View File

@ -27,6 +27,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
##
# Beautify the output of the client.
#

View File

@ -67,6 +67,7 @@ logLevel = 7
__all__ = ['SafeConfigParserWithIncludes']
class SafeConfigParserWithIncludes(SafeConfigParser):
"""
Class adds functionality to SafeConfigParser to handle included

View File

@ -34,6 +34,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class ConfigReader():
"""Generic config reader class.
@ -136,6 +137,7 @@ class ConfigReader():
return self._cfg.getOptions(*args, **kwargs)
return {}
class ConfigReaderUnshared(SafeConfigParserWithIncludes):
"""Unshared config reader (previously ConfigReader).
@ -237,6 +239,7 @@ class ConfigReaderUnshared(SafeConfigParserWithIncludes):
values[option[1]] = option[2]
return values
class DefinitionInitConfigReader(ConfigReader):
"""Config reader for files with options grouped in [Definition] and
[Init] sections.

View File

@ -31,6 +31,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class Configurator:
def __init__(self, force_enable=False, share_config=None):

View File

@ -36,6 +36,7 @@ else:
# python 2.x, string type is equivalent to bytes.
EMPTY_BYTES = ""
class CSocket:
if sys.version_info >= (3,):

View File

@ -30,6 +30,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class Fail2banReader(ConfigReader):
def __init__(self, **kwargs):

View File

@ -34,6 +34,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class FilterReader(DefinitionInitConfigReader):
_configOpts = [

View File

@ -37,6 +37,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class JailReader(ConfigReader):
optionCRE = re.compile("^((?:\w|-|_|\.)+)(?:\[(.*)\])?$")

View File

@ -31,6 +31,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class JailsReader(ConfigReader):
def __init__(self, force_enable=False, **kwargs):

View File

@ -23,12 +23,14 @@ __author__ = "Cyril Jaquier, Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2011-2012 Yaroslav Halchenko"
__license__ = "GPL"
#
# Jails
#
class DuplicateJailException(Exception):
pass
class UnknownJailException(KeyError):
pass

View File

@ -26,11 +26,13 @@ import traceback
import re
import logging
def formatExceptionInfo():
""" Consistently format exception information """
cla, exc = sys.exc_info()[:2]
return (cla.__name__, str(exc))
#
# Following "traceback" functions are adopted from PyMVPA distributed
# under MIT/Expat and copyright by PyMVPA developers (i.e. me and
@ -49,6 +51,7 @@ def mbasename(s):
base = os.path.basename(os.path.dirname(s)) + '.' + base
return base
class TraceBack(object):
"""Customized traceback to be included in debug messages
"""
@ -94,6 +97,7 @@ class TraceBack(object):
return sftb
class FormatterWithTraceBack(logging.Formatter):
"""Custom formatter which expands %(tb) and %(tbc) with tracebacks
@ -108,6 +112,7 @@ class FormatterWithTraceBack(logging.Formatter):
record.tbc = record.tb = self._tb()
return logging.Formatter.format(self, record)
def getLogger(name):
"""Get logging.Logger instance with Fail2Ban logger name convention
"""
@ -115,6 +120,7 @@ def getLogger(name):
name = "fail2ban.%s" % name.rpartition(".")[-1]
return logging.getLogger(name)
def excepthook(exctype, value, traceback):
"""Except hook used to log unhandled exceptions to Fail2Ban log
"""

View File

@ -119,6 +119,7 @@ protocol = [
["get <JAIL> action <ACT> <PROPERTY>", "gets the value of <PROPERTY> for the action <ACT> for <JAIL>"],
]
##
# Prints the protocol in a "man" format. This is used for the
# "-h" output of fail2ban-client.
@ -143,6 +144,7 @@ def printFormatted():
line = ' ' * (INDENT + MARGIN) + n.strip()
print line
##
# Prints the protocol in a "mediawiki" format.
@ -159,6 +161,7 @@ def printWiki():
print "| <span style=\"white-space:nowrap;\"><tt>" + m[0] + "</tt></span> || || " + m[1]
print "|}"
def __printWikiHeader(section, desc):
print
print "=== " + section + " ==="

View File

@ -55,6 +55,7 @@ _RETCODE_HINTS = {
signame = dict((num, name)
for name, num in signal.__dict__.iteritems() if name.startswith("SIG"))
class CallingMap(MutableMapping):
"""A Mapping type which returns the result of callable values.
@ -100,6 +101,7 @@ class CallingMap(MutableMapping):
def copy(self):
return self.__class__(self.data.copy())
class ActionBase(object):
"""An abstract base class for actions in Fail2Ban.
@ -182,6 +184,7 @@ class ActionBase(object):
"""
pass
class CommandAction(ActionBase):
"""A action which executes OS shell commands.

View File

@ -47,6 +47,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class Actions(JailThread, Mapping):
"""Handles jail actions.

View File

@ -45,6 +45,7 @@ else:
# python 2.x, string type is equivalent to bytes.
EMPTY_BYTES = ""
##
# Request handler class.
#
@ -92,6 +93,7 @@ class RequestHandler(asynchat.async_chat):
logSys.error(traceback.format_exc().splitlines())
self.close()
##
# Asynchronous server class.
#
@ -187,6 +189,7 @@ class AsyncServer(asyncore.dispatcher):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags|fcntl.FD_CLOEXEC)
##
# AsyncServerException is used to wrap communication exceptions.

View File

@ -33,6 +33,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
##
# Banning Manager.
#
@ -271,7 +272,6 @@ class BanManager:
finally:
self.__lock.release()
##
# Get the size of the ban list.
#

View File

@ -46,6 +46,7 @@ if sys.version_info >= (3,):
logSys.error('json dumps failed: %s', e)
x = '{}'
return x
def _json_loads_safe(x):
try:
x = json.loads(x.decode(
@ -64,6 +65,7 @@ else:
return x.encode(locale.getpreferredencoding())
else:
return x
def _json_dumps_safe(x):
try:
x = json.dumps(_normalize(x), ensure_ascii=False).decode(
@ -72,6 +74,7 @@ else:
logSys.error('json dumps failed: %s', e)
x = '{}'
return x
def _json_loads_safe(x):
try:
x = _normalize(json.loads(x.decode(
@ -84,6 +87,7 @@ else:
sqlite3.register_adapter(dict, _json_dumps_safe)
sqlite3.register_converter("JSON", _json_loads_safe)
def commitandrollback(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
@ -92,6 +96,7 @@ def commitandrollback(f):
return f(self, self._db.cursor(), *args, **kwargs)
return wrapper
class Fail2BanDb(object):
"""Fail2Ban database for storing persistent data.
@ -156,6 +161,7 @@ class Fail2BanDb(object):
"CREATE INDEX bans_jail_ip ON bans(jail, ip);" \
"CREATE INDEX bans_ip ON bans(ip);" \
def __init__(self, filename, purgeAge=24*60*60):
try:
self._lock = RLock()

View File

@ -29,6 +29,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class DateDetector(object):
"""Manages one or more date templates to find a date within a log line.

View File

@ -154,6 +154,7 @@ class DateEpoch(DateTemplate):
return (float(dateMatch.group()), dateMatch)
return None
class DatePatternRegex(DateTemplate):
"""Date template, with regex/pattern
@ -236,6 +237,7 @@ class DatePatternRegex(DateTemplate):
if value is not None)
return reGroupDictStrptime(groupdict), dateMatch
class DateTai64n(DateTemplate):
"""A date template which matches TAI64N formate timestamps.

View File

@ -29,6 +29,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class FailData:
def __init__(self):

View File

@ -34,6 +34,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class FailManager:
def __init__(self):
@ -154,5 +155,6 @@ class FailManager:
finally:
self.__lock.release()
class FailManagerEmpty(Exception):
pass

View File

@ -25,6 +25,7 @@ import re
import sre_constants
import sys
##
# Regular expression class.
#
@ -57,6 +58,7 @@ class Regex:
except sre_constants.error:
raise RegexException("Unable to compile regular expression '%s'" %
regex)
def __str__(self):
return "%s(%r)" % (self.__class__.__name__, self._regex)
##
@ -93,7 +95,6 @@ class Regex:
except ValueError:
self._matchLineEnd = len(self._matchCache.string)
lineCount1 = self._matchCache.string.count(
"\n", 0, self._matchLineStart)
lineCount2 = self._matchCache.string.count(
@ -184,6 +185,7 @@ class Regex:
else:
return ["".join(line) for line in self._matchedTupleLines]
##
# Exception dedicated to the class Regex.

View File

@ -48,6 +48,7 @@ logSys = getLogger(__name__)
# that matches a given regular expression. This class is instantiated by
# a Jail object.
class Filter(JailThread):
##
@ -86,7 +87,6 @@ class Filter(JailThread):
self.dateDetector.addDefaultTemplate()
logSys.debug("Created %s" % self)
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, self.jail)
@ -109,7 +109,6 @@ class Filter(JailThread):
logSys.error(e)
raise e
def delFailRegex(self, index):
try:
del self.__failRegex[index]
@ -395,7 +394,6 @@ class Filter(JailThread):
return False
def processLine(self, line, date=None, returnRawHost=False,
checkAllRegex=False):
"""Split the time portion from log msg and return findFailures on them
@ -581,7 +579,6 @@ class FileFilter(Filter):
# to be overridden by backends
pass
##
# Delete a log path
#
@ -721,6 +718,7 @@ except ImportError: # pragma: no cover
import md5
md5sum = md5.new
class FileContainer:
def __init__(self, filename, encoding, tail = False):
@ -847,6 +845,7 @@ class JournalFilter(Filter): # pragma: systemd no cover
import socket
import struct
class DNSUtils:
IP_CRE = re.compile("^(?:\d{1,3}\.){3}\d{1,3}$")

View File

@ -36,6 +36,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
##
# Log reader class.
#
@ -61,7 +62,6 @@ class FilterGamin(FileFilter):
fcntl.fcntl(fd, fcntl.F_SETFD, flags|fcntl.FD_CLOEXEC)
logSys.debug("Created FilterGamin")
def callback(self, path, event):
logSys.debug("Got event: " + repr(event) + " for " + path)
if event in (gamin.GAMCreated, gamin.GAMChanged, gamin.GAMExists):
@ -70,7 +70,6 @@ class FilterGamin(FileFilter):
self._process_file(path)
def _process_file(self, path):
"""Process a given file
@ -122,7 +121,6 @@ class FilterGamin(FileFilter):
logSys.debug(self.jail.name + ": filter terminated")
return True
def stop(self):
super(FilterGamin, self).stop()
self.__cleanup()

View File

@ -35,6 +35,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
##
# Log reader class.
#

View File

@ -51,6 +51,7 @@ except Exception, e:
# Gets the instance of the logger.
logSys = getLogger(__name__)
##
# Log reader class.
#
@ -73,7 +74,6 @@ class FilterPyinotify(FileFilter):
self.__watches = dict()
logSys.debug("Created FilterPyinotify")
def callback(self, event, origin=''):
logSys.debug("%sCallback for Event: %s", origin, event)
path = event.pathname
@ -95,7 +95,6 @@ class FilterPyinotify(FileFilter):
self._process_file(path)
def _process_file(self, path):
"""Process a given file
@ -112,7 +111,6 @@ class FilterPyinotify(FileFilter):
self.dateDetector.sortTemplate()
self.__modified = False
def _addFileWatcher(self, path):
wd = self.__monitor.add_watch(path, pyinotify.IN_MODIFY)
self.__watches.update(wd)
@ -144,7 +142,6 @@ class FilterPyinotify(FileFilter):
self._addFileWatcher(path)
self._process_file(path)
##
# Delete a log path
#
@ -163,7 +160,6 @@ class FilterPyinotify(FileFilter):
self.__monitor.rm_watch(wdInt)
logSys.debug("Removed monitor for the parent directory %s", path_dir)
##
# Main loop.
#

View File

@ -38,6 +38,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
##
# Journal reader class.
#
@ -61,7 +62,6 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
self.setDatePattern(None)
logSys.debug("Created FilterSystemd")
##
# Add a journal match filters from list structure
#

View File

@ -32,6 +32,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class Jail:
"""Fail2Ban jail, which manages a filter and associated actions.
@ -116,7 +117,6 @@ class Jail:
raise RuntimeError(
"Failed to initialize any backend for Jail %r" % self.name)
def _initPolling(self):
from filterpoll import FilterPoll
logSys.info("Jail '%s' uses poller" % self.name)

View File

@ -30,6 +30,7 @@ from abc import abstractmethod
from ..helpers import excepthook
class JailThread(Thread):
"""Abstract class for threading elements in Fail2Ban.
@ -59,6 +60,7 @@ class JailThread(Thread):
# excepthook workaround for threads, derived from:
# http://bugs.python.org/issue1230540#msg91244
run = self.run
def run_with_except_hook(*args, **kwargs):
try:
run(*args, **kwargs)

View File

@ -24,6 +24,7 @@ __license__ = "GPL"
import datetime
import time
##
# MyTime class.
#

View File

@ -48,6 +48,7 @@ except ImportError:
# Dont print error here, as database may not even be used
Fail2BanDb = None
class Server:
def __init__(self, daemon = False):
@ -71,7 +72,6 @@ class Server:
self.setLogTarget("STDOUT")
self.setSyslogSocket("auto")
def __sigTERMhandler(self, signum, frame):
logSys.debug("Caught signal %d. Exiting" % signum)
self.quit()
@ -144,7 +144,6 @@ class Server:
finally:
self.__loggingLock.release()
def addJail(self, name, backend):
self.__jails.add(name, backend, self.__db)
if self.__db is not None:
@ -521,7 +520,6 @@ class Server:
def getDatabase(self):
return self.__db
def __createDaemon(self): # pragma: no cover
""" Detach a process from the controlling terminal and run it in the
background as a daemon.

View File

@ -28,6 +28,7 @@ locale_time = LocaleTime()
timeRE = TimeRE()
timeRE['z'] = r"(?P<z>Z|[+-]\d{2}(?::?[0-5]\d)?)"
def reGroupDictStrptime(found_dict):
"""Return time from dictionary of strptime fields

View File

@ -29,6 +29,7 @@ from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
class Ticket:
def __init__(self, ip, time, matches=None):

View File

@ -33,6 +33,7 @@ from .. import version
# Gets the instance of the logger.
logSys = getLogger(__name__)
class Transmitter:
##

View File

@ -32,6 +32,7 @@ from ..dummyjail import DummyJail
from ..utils import CONFIG_DIR
class TestSMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
@ -40,6 +41,7 @@ class TestSMTPServer(smtpd.SMTPServer):
self.rcpttos = rcpttos
self.data = data
class SMTPActionTest(unittest.TestCase):
def setUp(self):

View File

@ -35,6 +35,7 @@ from .utils import LogCaptureTestCase
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
class ExecuteActions(LogCaptureTestCase):
def setUp(self):
@ -76,7 +77,6 @@ class ExecuteActions(LogCaptureTestCase):
self.assertEqual(self.__actions.getBanTime(),127)
self.assertRaises(ValueError, self.__actions.removeBannedIP, '127.0.0.1')
def testActionsOutput(self):
self.defaultActions()
self.__actions.start()
@ -89,7 +89,6 @@ class ExecuteActions(LogCaptureTestCase):
self.assertEqual(self.__actions.status(),[("Currently banned", 0 ),
("Total banned", 0 ), ("Banned IP list", [] )])
def testAddActionPython(self):
self.__actions.add(
"Action", os.path.join(TEST_FILES_DIR, "action.d/action.py"),

View File

@ -30,6 +30,7 @@ from ..server.action import CommandAction, CallingMap
from .utils import LogCaptureTestCase
class CommandActionTest(LogCaptureTestCase):
def setUp(self):
@ -110,7 +111,6 @@ class CommandActionTest(LogCaptureTestCase):
{'ipjailmatches': "some >char< should \< be[ escap}ed&\n"}),
"some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n")
# Recursive
aInfo["ABC"] = "<xyz>"
self.assertEqual(

View File

@ -29,6 +29,7 @@ import unittest
from ..server.banmanager import BanManager
from ..server.ticket import BanTicket
class AddFailure(unittest.TestCase):
def setUp(self):
"""Call before every test case."""

View File

@ -45,6 +45,7 @@ STOCK = os.path.exists(os.path.join('config','fail2ban.conf'))
IMPERFECT_CONFIG = os.path.join(os.path.dirname(__file__), 'config')
class ConfigReaderTest(unittest.TestCase):
def setUp(self):
@ -77,12 +78,10 @@ option = %s
os.unlink("%s/%s" % (self.d, fname))
self.assertTrue(self.c.read('c')) # we still should have some
def _getoption(self, f='c'):
self.assertTrue(self.c.read(f)) # we got some now
return self.c.getOptions('section', [("int", 'option')])['option']
def testInaccessibleFile(self):
f = os.path.join(self.d, "d.conf") # inaccessible file
self._write('d.conf', 0)
@ -97,7 +96,6 @@ option = %s
# raise unittest.SkipTest("Skipping on %s -- access rights are not enforced" % platform)
pass
def testOptionalDotDDir(self):
self.assertFalse(self.c.read('c')) # nothing is there yet
self._write("c.conf", "1")
@ -159,6 +157,7 @@ c = d ;in line comment
self.assertEqual(self.c.get('DEFAULT', 'b'), 'a')
self.assertEqual(self.c.get('DEFAULT', 'c'), 'd')
class JailReaderTest(LogCaptureTestCase):
def __init__(self, *args, **kwargs):
@ -194,7 +193,6 @@ class JailReaderTest(LogCaptureTestCase):
self.assertTrue(self._is_logged('Error in action definition joho[foo'))
self.assertTrue(self._is_logged('Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0'))
if STOCK:
def testStockSSHJail(self):
jail = JailReader('sshd', basedir=CONFIG_DIR, share_config = self.__share_cfg) # we are running tests from root project dir atm
@ -224,7 +222,6 @@ class JailReaderTest(LogCaptureTestCase):
#self.assertRaises(ValueError, JailReader.extractOptions ,'mail-how[')
# Empty option
option = "abc[]"
expected = ('abc', {})
@ -318,7 +315,6 @@ class FilterReaderTest(unittest.TestCase):
output[-1][-1] = "5"
self.assertEqual(sorted(filterReader.convert()), sorted(output))
def testFilterReaderSubstitionDefault(self):
output = [['set', 'jailname', 'addfailregex', 'to=sweet@example.com fromip=<IP>']]
filterReader = FilterReader('substition', "jailname", {})
@ -361,6 +357,7 @@ class FilterReaderTest(unittest.TestCase):
except Exception, e: # pragma: no cover - failed if reachable
self.fail('unexpected options after readexplicit: %s' % (e))
class JailsReaderTestCache(LogCaptureTestCase):
def _readWholeConf(self, basedir, force_enable=False, share_config=None):
@ -480,7 +477,6 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertTrue('Init' in actionReader.sections(),
msg="Action file %r is lacking [Init] section" % actionConfig)
def testReadStockJailConf(self):
jails = JailsReader(basedir=CONFIG_DIR, share_config=self.__share_cfg) # we are running tests from root project dir atm
self.assertTrue(jails.read()) # opens fine
@ -607,7 +603,6 @@ class JailsReaderTest(LogCaptureTestCase):
msg="Found no %s command among %s"
% (target_command, str(commands)) )
def testStockConfigurator(self):
configurator = Configurator()
configurator.setBaseDir(CONFIG_DIR)

View File

@ -42,6 +42,7 @@ from .utils import LogCaptureTestCase
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
class DatabaseTest(LogCaptureTestCase):
def setUp(self):
@ -317,7 +318,6 @@ class DatabaseTest(LogCaptureTestCase):
actions._Actions__checkBan()
self.assertTrue(self._is_logged("ban ainfo %s, %s, %s, %s" % (True, True, True, True)))
def testPurge(self):
if Fail2BanDb is None: # pragma: no cover
return

View File

@ -32,6 +32,7 @@ from ..server.datedetector import DateDetector
from ..server.datetemplate import DateTemplate
from .utils import setUpMyTime, tearDownMyTime
class DateDetectorTest(unittest.TestCase):
def setUp(self):

View File

@ -26,6 +26,7 @@ from threading import Lock
from ..server.actions import Actions
class DummyJail(object):
"""A simple 'jail' to suck in all the tickets generated by Filter's
"""

View File

@ -29,6 +29,7 @@ import unittest
from ..server.failmanager import FailManager, FailManagerEmpty
from ..server.ticket import FailTicket
class AddFailure(unittest.TestCase):
def setUp(self):

View File

@ -1,6 +1,7 @@
from fail2ban.server.action import ActionBase
class TestAction(ActionBase):
def __init__(self, jail, name, opt1, opt2=None):

View File

@ -1,6 +1,7 @@
from fail2ban.server.action import ActionBase
class TestAction(ActionBase):
def ban(self, aInfo):

View File

@ -1,6 +1,7 @@
from fail2ban.server.action import ActionBase
class TestAction(ActionBase):
def __init__(self, jail, name):

View File

@ -1,6 +1,7 @@
from fail2ban.server.action import ActionBase
class TestAction(ActionBase):
def ban(self, aInfo):

View File

@ -1,5 +1,6 @@
from fail2ban.server.action import ActionBase
class TestAction(ActionBase):
pass

View File

@ -10,6 +10,7 @@ except ImportError: # pragma: no cover
import md5
md5sum = md5.new
def auth(v):
ha1 = md5sum(username + ':' + realm + ':' + password).hexdigest()
@ -44,6 +45,7 @@ def auth(v):
s = requests.Session()
return s.send(p)
def preauth():
r = requests.get(host + url)
print(r)

View File

@ -46,6 +46,7 @@ from .dummyjail import DummyJail
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
# yoh: per Steven Hiscocks's insight while troubleshooting
# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
# adding a sufficiently large buffer might help to guarantee that
@ -63,6 +64,7 @@ def open(*args):
else:
return fopen(*args)
def _killfile(f, name):
try:
f.close()
@ -98,6 +100,7 @@ def _assert_equal_entries(utest, found, output, count=None):
srepr = repr
utest.assertEqual(srepr(found[3]), srepr(output[3]))
def _ticket_tuple(ticket):
"""Create a tuple for easy comparison from fail ticket
"""
@ -107,6 +110,7 @@ def _ticket_tuple(ticket):
matches = ticket.getMatches()
return (ip, attempts, date, matches)
def _assert_correct_last_attempt(utest, filter_, output, count=None):
"""Additional helper to wrap most common test case
@ -120,6 +124,7 @@ def _assert_correct_last_attempt(utest, filter_, output, count=None):
_assert_equal_entries(utest, found, output, count)
def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line=""):
"""Copy lines from one file to another (which might be already open)
@ -156,6 +161,7 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
time.sleep(0.1)
return fout
def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # pragma: systemd no cover
"""Copy lines from one file to systemd journal
@ -184,6 +190,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p
# Opened earlier, therefore must close it
fin.close()
#
# Actual tests
#
@ -209,6 +216,7 @@ class BasicFilter(unittest.TestCase):
("^%Y-%m-%d-%H%M%S.%f %z",
"^Year-Month-Day-24hourMinuteSecond.Microseconds Zone offset"))
class IgnoreIP(LogCaptureTestCase):
def setUp(self):
@ -276,6 +284,7 @@ class IgnoreIP(LogCaptureTestCase):
self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED")
self.assertFalse(self._is_logged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED")))
class IgnoreIPDNS(IgnoreIP):
def testIgnoreIPDNSOK(self):
@ -289,6 +298,7 @@ class IgnoreIPDNS(IgnoreIP):
self.assertFalse(self.filter.inIgnoreIPList("128.178.50.11"))
self.assertFalse(self.filter.inIgnoreIPList("128.178.50.13"))
class LogFile(LogCaptureTestCase):
MISSING = 'testcases/missingLogFile'
@ -303,6 +313,7 @@ class LogFile(LogCaptureTestCase):
self.filter = FilterPoll(None)
self.assertRaises(IOError, self.filter.addLogPath, LogFile.MISSING)
class LogFileFilterPoll(unittest.TestCase):
FILENAME = os.path.join(TEST_FILES_DIR, "testcase01.log")
@ -475,6 +486,7 @@ def get_monitor_failures_testcase(Filter_):
class MonitorFailures(unittest.TestCase):
count = 0
def setUp(self):
"""Call before every test case."""
setUpMyTime()
@ -493,7 +505,6 @@ def get_monitor_failures_testcase(Filter_):
self._sleep_4_poll()
#print "D: started filter %s" % self.filter
def tearDown(self):
tearDownMyTime()
#print "D: SLEEPING A BIT"
@ -532,7 +543,6 @@ def get_monitor_failures_testcase(Filter_):
self.assertTrue(self.isFilled(20)) # give Filter a chance to react
_assert_correct_last_attempt(self, self.jail, failures, count=count)
def test_grow_file(self):
# suck in lines from this sample log file
self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
@ -576,7 +586,6 @@ def get_monitor_failures_testcase(Filter_):
skip=3, mode='w')
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
def test_move_file(self):
# if we move file into a new location while it has been open already
self.file.close()
@ -601,7 +610,6 @@ def get_monitor_failures_testcase(Filter_):
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
def _test_move_into_file(self, interim_kill=False):
# if we move a new file into the location of an old (monitored) file
_copy_lines_between_files(GetFailures.FILENAME_01, self.name,
@ -627,7 +635,6 @@ def get_monitor_failures_testcase(Filter_):
self.assert_correct_last_attempt(GetFailures.FAILURES_01)
self.assertEqual(self.filter.failManager.getFailTotal(), 9)
def test_move_into_file(self):
self._test_move_into_file(interim_kill=False)
@ -636,7 +643,6 @@ def get_monitor_failures_testcase(Filter_):
# to test against possible drop-out of the file from monitoring
self._test_move_into_file(interim_kill=True)
def test_new_bogus_file(self):
# to make sure that watching whole directory does not effect
_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
@ -649,7 +655,6 @@ def get_monitor_failures_testcase(Filter_):
self.assertEqual(self.filter.failManager.getFailTotal(), 6)
_killfile(None, self.name + '.bak2')
def test_delLogPath(self):
# Smoke test for removing of the path from being watched
@ -681,6 +686,7 @@ def get_monitor_failures_testcase(Filter_):
% (Filter_.__name__, testclass_name) # 'tempfile')
return MonitorFailures
def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
"""Generator of TestCase's for journal based filters/backends
"""
@ -804,6 +810,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
return MonitorJournalFailures
class GetFailures(unittest.TestCase):
FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
@ -862,7 +869,6 @@ class GetFailures(unittest.TestCase):
self.testGetFailures01(filename=fname)
_killfile(fout, fname)
def testGetFailures02(self):
output = ('141.3.81.106', 4, 1124013539.0,
[u'Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2'
@ -921,8 +927,6 @@ class GetFailures(unittest.TestCase):
filter_.getFailures(GetFailures.FILENAME_USEDNS)
_assert_correct_last_attempt(self, filter_, output)
def testGetFailuresMultiRegex(self):
output = ('141.3.81.106', 8, 1124013541.0)
@ -996,6 +1000,7 @@ class GetFailures(unittest.TestCase):
break
self.assertEqual(sorted(foundList), sorted(output))
class DNSUtilsTests(unittest.TestCase):
def testUseDns(self):
@ -1043,6 +1048,7 @@ class DNSUtilsTests(unittest.TestCase):
res = DNSUtils.bin2addr(167772160L)
self.assertEqual(res, '10.0.0.0')
class JailTests(unittest.TestCase):
def testSetBackend_gh83(self):

View File

@ -55,6 +55,7 @@ class HelpersTest(unittest.TestCase):
# might be fragile due to ' vs "
self.assertEqual(args, "('Very bad', None)")
class SetupTest(unittest.TestCase):
def setUp(self):
@ -116,6 +117,7 @@ class SetupTest(unittest.TestCase):
os.system("%s %s clean --all >/dev/null 2>&1"
% (sys.executable, self.setup))
class TestsUtilsTest(unittest.TestCase):
def testmbasename(self):
@ -154,7 +156,6 @@ class TestsUtilsTest(unittest.TestCase):
self.assertFalse('>' in s, msg="'>' present in %r" % s) # There is only "fail2ban-testcases" in this case, no true traceback
self.assertTrue(':' in s, msg="no ':' in %r" % s)
def testFormatterWithTraceBack(self):
strout = StringIO()
Formatter = FormatterWithTraceBack
@ -177,6 +178,7 @@ class TestsUtilsTest(unittest.TestCase):
iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z")
class CustomDateFormatsTest(unittest.TestCase):
def testIso8601(self):

View File

@ -37,6 +37,7 @@ from .utils import setUpMyTime, tearDownMyTime, CONFIG_DIR
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
class FilterSamplesRegex(unittest.TestCase):
def setUp(self):
@ -58,6 +59,7 @@ class FilterSamplesRegex(unittest.TestCase):
>= 10,
"Expected more FilterSampleRegexs tests")
def testSampleRegexsFactory(name):
def testFilter(self):
@ -125,7 +127,6 @@ def testSampleRegexsFactory(name):
except ValueError:
jsonTimeLocal = datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S.%f")
jsonTime = time.mktime(jsonTimeLocal.timetuple())
jsonTime += jsonTimeLocal.microsecond / 1000000

View File

@ -47,12 +47,15 @@ except ImportError: # pragma: no cover
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
class TestServer(Server):
def setLogLevel(self, *args, **kwargs):
pass
def setLogTarget(self, *args, **kwargs):
pass
class TransmitterBase(unittest.TestCase):
def setUp(self):
@ -145,6 +148,7 @@ class TransmitterBase(unittest.TestCase):
self.transm.proceed(["get", jail, cmd]),
(0, outValues[n+1:]))
class Transmitter(TransmitterBase):
def setUp(self):
@ -558,7 +562,6 @@ class Transmitter(TransmitterBase):
)
)
def testAction(self):
action = "TestCaseAction"
cmdList = [
@ -760,6 +763,7 @@ class Transmitter(TransmitterBase):
["set", jailName, "deljournalmatch", value])
self.assertTrue(isinstance(result[1], ValueError))
class TransmitterLogging(TransmitterBase):
def setUp(self):
@ -884,6 +888,7 @@ class JailTests(unittest.TestCase):
jail = Jail(longname)
self.assertEqual(jail.name, longname)
class RegexTests(unittest.TestCase):
def testInit(self):
@ -908,10 +913,12 @@ class RegexTests(unittest.TestCase):
self.assertTrue(fr.hasMatched())
self.assertRaises(RegexException, fr.getHost)
class _BadThread(JailThread):
def run(self):
raise RuntimeError('run bad thread exception')
class LoggingTests(LogCaptureTestCase):
def testGetF2BLogger(self):

View File

@ -33,6 +33,7 @@ import unittest
from ..server.asyncserver import AsyncServer, AsyncServerException
from ..client.csocket import CSocket
class Socket(unittest.TestCase):
def setUp(self):

View File

@ -44,12 +44,15 @@ if not CONFIG_DIR:
else:
CONFIG_DIR = '/etc/fail2ban'
def mtimesleep():
# no sleep now should be necessary since polling tracks now not only
# mtime but also ino and size
pass
old_TZ = os.environ.get('TZ', None)
def setUpMyTime():
# Set the time to a fixed, known value
# Sun Aug 14 12:00:00 CEST 2005
@ -58,6 +61,7 @@ def setUpMyTime():
time.tzset()
MyTime.setTime(1124013600)
def tearDownMyTime():
os.environ.pop('TZ')
if old_TZ:
@ -65,6 +69,7 @@ def tearDownMyTime():
time.tzset()
MyTime.myTime = None
def gatherTests(regexps=None, no_network=False):
# Import all the test cases here instead of a module level to
# avoid circular imports
@ -86,6 +91,7 @@ def gatherTests(regexps=None, no_network=False):
else: # pragma: no cover
class FilteredTestSuite(unittest.TestSuite):
_regexps = [re.compile(r) for r in regexps]
def addTest(self, suite):
suite_str = str(suite)
for r in self._regexps:
@ -190,13 +196,13 @@ def gatherTests(regexps=None, no_network=False):
except Exception, e: # pragma: no cover
logSys.warning("I: Skipping systemd backend testing. Got exception '%s'" % e)
# Server test for logging elements which break logging used to support
# testcases analysis
tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging))
return tests
class LogCaptureTestCase(unittest.TestCase):
def setUp(self):