Browse Source

RF(PEP8 etc): Make new fail2banclienttestcase a bit more readable and PEP8 friendly

pull/1483/head
Yaroslav Halchenko 8 years ago
parent
commit
94cada0c6b
  1. 267
      fail2ban/tests/fail2banclienttestcase.py

267
fail2ban/tests/fail2banclienttestcase.py

@ -31,25 +31,29 @@ import time
import signal
import unittest
from os.path import join as pjoin, isdir, isfile, exists, dirname
from functools import wraps
from threading import Thread
from ..client import fail2banclient, fail2banserver, fail2bancmdline
from ..client.fail2banclient import Fail2banClient, exec_command_line as _exec_client, VisualWait
from ..client.fail2banclient import exec_command_line as _exec_client, VisualWait
from ..client.fail2banserver import Fail2banServer, exec_command_line as _exec_server
from .. import protocol
from ..server import server
from ..server.utils import Utils
from .utils import LogCaptureTestCase, logSys, with_tmpdir, shutil, logging
from .utils import LogCaptureTestCase, with_tmpdir, shutil, logging
from ..helpers import getLogger
# Gets the instance of the logger.
logSys = getLogger(__name__)
STOCK_CONF_DIR = "config"
STOCK = os.path.exists(os.path.join(STOCK_CONF_DIR,'fail2ban.conf'))
STOCK = exists(pjoin(STOCK_CONF_DIR, 'fail2ban.conf'))
CLIENT = "fail2ban-client"
SERVER = "fail2ban-server"
BIN = os.path.dirname(Fail2banServer.getServerPath())
BIN = dirname(Fail2banServer.getServerPath())
MAX_WAITTIME = 30 if not unittest.F2B.fast else 5
@ -57,7 +61,7 @@ MAX_WAITTIME = 30 if not unittest.F2B.fast else 5
# Several wrappers and settings for proper testing:
#
fail2bancmdline.MAX_WAITTIME = MAX_WAITTIME-1
fail2bancmdline.MAX_WAITTIME = MAX_WAITTIME - 1
fail2bancmdline.logSys = \
fail2banclient.logSys = \
@ -72,23 +76,43 @@ fail2banclient.output = \
fail2banserver.output = \
protocol.output = _test_output
#
# Mocking .exit so we could test its correct operation.
# Two custom exceptions will be assessed to be raised in the tests
#
class ExitException(fail2bancmdline.ExitException):
"""Exception upon a normal exit"""
pass
class FailExitException(fail2bancmdline.ExitException):
"""Exception upon abnormal exit"""
pass
def _test_exit(code=0):
logSys.debug("Exit with code %s", code)
if code == 0:
raise ExitException()
else:
raise FailExitException()
raise FailExitException()
fail2bancmdline.exit = \
fail2banclient.exit = \
fail2banserver.exit = _test_exit
INTERACT = []
def _test_input_command(*args):
if len(INTERACT):
#logSys.debug('--- interact command: %r', INTERACT[0])
#logSys.debug('interact command: %r', INTERACT[0])
return INTERACT.pop(0)
else:
return "exit"
return "exit"
fail2banclient.input_command = _test_input_command
# prevents change logging params, log capturing, etc:
@ -97,38 +121,35 @@ fail2banclient.PRODUCTION = \
fail2banserver.PRODUCTION = False
class ExitException(fail2bancmdline.ExitException):
pass
class FailExitException(fail2bancmdline.ExitException):
pass
def _out_file(fn): # pragma: no cover
def _out_file(fn):
"""Helper which outputs content of the file at HEAVYDEBUG loglevels"""
logSys.debug('---- ' + fn + ' ----')
for line in fileinput.input(fn):
line = line.rstrip('\n')
logSys.debug(line)
logSys.debug('-'*30)
def _start_params(tmp, use_stock=False, logtarget="/dev/null"):
cfg = os.path.join(tmp,"config")
cfg = pjoin(tmp, "config")
if use_stock and STOCK:
# copy config (sub-directories as alias):
def ig_dirs(dir, files):
return [f for f in files if os.path.isdir(os.path.join(dir, f))]
"""Filters list of 'files' to contain only directories (under dir)"""
return [f for f in files if isdir(pjoin(dir, f))]
shutil.copytree(STOCK_CONF_DIR, cfg, ignore=ig_dirs)
os.symlink(os.path.join(STOCK_CONF_DIR,"action.d"), os.path.join(cfg,"action.d"))
os.symlink(os.path.join(STOCK_CONF_DIR,"filter.d"), os.path.join(cfg,"filter.d"))
os.symlink(pjoin(STOCK_CONF_DIR, "action.d"), pjoin(cfg, "action.d"))
os.symlink(pjoin(STOCK_CONF_DIR, "filter.d"), pjoin(cfg, "filter.d"))
# replace fail2ban params (database with memory):
r = re.compile(r'^dbfile\s*=')
for line in fileinput.input(os.path.join(cfg,"fail2ban.conf"), inplace=True):
for line in fileinput.input(pjoin(cfg, "fail2ban.conf"), inplace=True):
line = line.rstrip('\n')
if r.match(line):
line = "dbfile = :memory:"
print(line)
# replace jail params (polling as backend to be fast in initialize):
r = re.compile(r'^backend\s*=')
for line in fileinput.input(os.path.join(cfg,"jail.conf"), inplace=True):
for line in fileinput.input(pjoin(cfg, "jail.conf"), inplace=True):
line = line.rstrip('\n')
if r.match(line):
line = "backend = polling"
@ -136,64 +157,71 @@ def _start_params(tmp, use_stock=False, logtarget="/dev/null"):
else:
# just empty config directory without anything (only fail2ban.conf/jail.conf):
os.mkdir(cfg)
f = open(os.path.join(cfg,"fail2ban.conf"), "w")
f = open(pjoin(cfg, "fail2ban.conf"), "w")
f.write('\n'.join((
"[Definition]",
"loglevel = INFO",
"logtarget = " + logtarget,
"syslogsocket = auto",
"socket = "+os.path.join(tmp,"f2b.sock"),
"pidfile = "+os.path.join(tmp,"f2b.pid"),
"socket = " + pjoin(tmp, "f2b.sock"),
"pidfile = " + pjoin(tmp, "f2b.pid"),
"backend = polling",
"dbfile = :memory:",
"dbpurgeage = 1d",
"",
)))
f.close()
f = open(os.path.join(cfg,"jail.conf"), "w")
f = open(pjoin(cfg, "jail.conf"), "w")
f.write('\n'.join((
"[INCLUDES]", "",
"[DEFAULT]", "",
"",
)))
f.close()
if logSys.level < logging.DEBUG: # if HEAVYDEBUG
_out_file(os.path.join(cfg,"fail2ban.conf"))
_out_file(os.path.join(cfg,"jail.conf"))
if logSys.level < logging.DEBUG: # if HEAVYDEBUG
_out_file(pjoin(cfg, "fail2ban.conf"))
_out_file(pjoin(cfg, "jail.conf"))
# parameters (sock/pid and config, increase verbosity, set log, etc.):
return ("-c", cfg, "-s", os.path.join(tmp,"f2b.sock"), "-p", os.path.join(tmp,"f2b.pid"),
"-vv", "--logtarget", logtarget, "--loglevel", "DEBUG", "--syslogsocket", "auto",
"--timeout", str(fail2bancmdline.MAX_WAITTIME),
return (
"-c", cfg, "-s", pjoin(tmp, "f2b.sock"), "-p", pjoin(tmp, "f2b.pid"),
"-vv", "--logtarget", logtarget, "--loglevel", "DEBUG", "--syslogsocket", "auto",
"--timeout", str(fail2bancmdline.MAX_WAITTIME),
)
def _kill_srv(pidfile): # pragma: no cover
def _pid_exists(pid):
try:
os.kill(pid, 0)
return True
except OSError:
return False
logSys.debug("-- cleanup: %r", (pidfile, os.path.isdir(pidfile)))
if os.path.isdir(pidfile):
def _pid_exists(pid):
"""Check if PID exists by sending 0 signal to the PID process"""
try:
os.kill(pid, 0)
return True
except OSError:
return False
def _kill_srv(pidfile):
logSys.debug("cleanup: %r", (pidfile, isdir(pidfile)))
if isdir(pidfile):
piddir = pidfile
pidfile = piddir + "/f2b.pid"
if not os.path.isfile(pidfile):
if not isfile(pidfile):
pidfile = piddir + "/fail2ban.pid"
if not os.path.isfile(pidfile):
logSys.debug("--- cleanup: no pidfile for %r", piddir)
if not isfile(pidfile):
logSys.debug("cleanup: no pidfile for %r", piddir)
return True
f = pid = None
try:
logSys.debug("--- cleanup pidfile: %r", pidfile)
logSys.debug("cleanup pidfile: %r", pidfile)
f = open(pidfile)
pid = f.read().split()[1]
pid = int(pid)
logSys.debug("--- cleanup pid: %r", pid)
logSys.debug("cleanup pid: %r", pid)
if pid <= 0:
raise ValueError('pid %s of %s is invalid' % (pid, pidfile))
if not _pid_exists(pid):
return True
## try to preper stop (have signal handler):
## try to prepare stop (have signal handler):
os.kill(pid, signal.SIGTERM)
## check still exists after small timeout:
if not Utils.wait_for(lambda: not _pid_exists(pid), 1):
@ -207,6 +235,7 @@ def _kill_srv(pidfile): # pragma: no cover
f.close()
return True
def with_kill_srv(f):
"""Helper to decorate tests which receive in the last argument tmpdir to pass to kill_srv
@ -234,19 +263,25 @@ class Fail2banClientServerBase(LogCaptureTestCase):
def _wait_for_srv(self, tmp, ready=True, startparams=None):
try:
sock = os.path.join(tmp,"f2b.sock")
sock = pjoin(tmp, "f2b.sock")
# wait for server (socket):
ret = Utils.wait_for(lambda: os.path.exists(sock), MAX_WAITTIME)
ret = Utils.wait_for(lambda: exists(sock), MAX_WAITTIME)
if not ret:
raise Exception('Unexpected: Socket file does not exists.\nStart failed: %r' % (startparams,))
raise Exception(
'Unexpected: Socket file does not exists.\nStart failed: %r'
% (startparams,)
)
if ready:
# wait for communication with worker ready:
ret = Utils.wait_for(lambda: "Server ready" in self.getLog(), MAX_WAITTIME)
if not ret:
raise Exception('Unexpected: Server ready was not found.\nStart failed: %r' % (startparams,))
except: # pragma: no cover
log = os.path.join(tmp,"f2b.log")
if os.path.isfile(log):
raise Exception(
'Unexpected: Server ready was not found.\nStart failed: %r'
% (startparams,)
)
except: # pragma: no cover
log = pjoin(tmp, "f2b.log")
if isfile(log):
_out_file(log)
else:
logSys.debug("No log file %s to examine details of error", log)
@ -256,16 +291,16 @@ class Fail2banClientServerBase(LogCaptureTestCase):
class Fail2banClientTest(Fail2banClientServerBase):
def testConsistency(self):
self.assertTrue(os.path.isfile(os.path.join(os.path.join(BIN), CLIENT)))
self.assertTrue(os.path.isfile(os.path.join(os.path.join(BIN), SERVER)))
self.assertTrue(isfile(pjoin(BIN, CLIENT)))
self.assertTrue(isfile(pjoin(BIN, SERVER)))
def testClientUsage(self):
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT, "-h",))
self.assertLogged("Usage: " + CLIENT)
self.assertLogged("Report bugs to ")
self.pruneLog()
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT, "-vq", "-V",))
self.assertLogged("Fail2Ban v" + fail2bancmdline.version)
@ -273,7 +308,7 @@ class Fail2banClientTest(Fail2banClientServerBase):
def testClientDump(self, tmp):
# use here the stock configuration (if possible)
startparams = _start_params(tmp, True)
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
((CLIENT,) + startparams + ("-vvd",)))
self.assertLogged("Loading files")
self.assertLogged("logtarget")
@ -284,33 +319,33 @@ class Fail2banClientTest(Fail2banClientServerBase):
# use once the stock configuration (to test starting also)
startparams = _start_params(tmp, True)
# start:
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT, "-b") + startparams + ("start",))
# wait for server (socket and ready):
self._wait_for_srv(tmp, True, startparams=startparams)
self.assertLogged("Server ready")
self.assertLogged("Exit with code 0")
try:
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("echo", "TEST-ECHO",))
self.assertRaises(FailExitException, _exec_client,
self.assertRaises(FailExitException, _exec_client,
(CLIENT,) + startparams + ("~~unknown~cmd~failed~~",))
self.pruneLog()
# start again (should fail):
self.assertRaises(FailExitException, _exec_client,
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "-b") + startparams + ("start",))
self.assertLogged("Server already running")
finally:
self.pruneLog()
# stop:
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("stop",))
self.assertLogged("Shutdown successful")
self.assertLogged("Exit with code 0")
self.pruneLog()
# stop again (should fail):
self.assertRaises(FailExitException, _exec_client,
self.assertRaises(FailExitException, _exec_client,
(CLIENT,) + startparams + ("stop",))
self.assertLogged("Failed to access socket path")
self.assertLogged("Is fail2ban running?")
@ -319,9 +354,9 @@ class Fail2banClientTest(Fail2banClientServerBase):
@with_kill_srv
def testClientStartBackgroundCall(self, tmp):
global INTERACT
startparams = _start_params(tmp, logtarget=os.path.join(tmp,"f2b.log"))
startparams = _start_params(tmp, logtarget=pjoin(tmp, "f2b.log"))
# start (in new process, using the same python version):
cmd = (sys.executable, os.path.join(os.path.join(BIN), CLIENT))
cmd = (sys.executable, pjoin(pjoin(BIN), CLIENT))
logSys.debug('Start %s ...', cmd)
cmd = cmd + startparams + ("--async", "start",)
ret = Utils.executeCmd(cmd, timeout=MAX_WAITTIME, shell=False, output=True)
@ -332,7 +367,7 @@ class Fail2banClientTest(Fail2banClientServerBase):
self.pruneLog()
try:
# echo from client (inside):
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("echo", "TEST-ECHO",))
self.assertLogged("TEST-ECHO")
self.assertLogged("Exit with code 0")
@ -343,7 +378,7 @@ class Fail2banClientTest(Fail2banClientServerBase):
"status",
"exit"
]
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("-i",))
self.assertLogged("INTERACT-ECHO")
self.assertLogged("Status", "Number of jail:")
@ -355,7 +390,7 @@ class Fail2banClientTest(Fail2banClientServerBase):
"restart",
"exit"
]
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("-i",))
self.assertLogged("Reading config files:")
self.assertLogged("Shutdown successful")
@ -367,12 +402,12 @@ class Fail2banClientTest(Fail2banClientServerBase):
"reload ~~unknown~jail~fail~~",
"exit"
]
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("-i",))
self.assertLogged("Failed during configuration: No section: '~~unknown~jail~fail~~'")
self.pruneLog()
# test reload missing jail (direct):
self.assertRaises(FailExitException, _exec_client,
self.assertRaises(FailExitException, _exec_client,
(CLIENT,) + startparams + ("reload", "~~unknown~jail~fail~~"))
self.assertLogged("Failed during configuration: No section: '~~unknown~jail~fail~~'")
self.assertLogged("Exit with code -1")
@ -380,20 +415,20 @@ class Fail2banClientTest(Fail2banClientServerBase):
finally:
self.pruneLog()
# stop:
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("stop",))
self.assertLogged("Shutdown successful")
self.assertLogged("Exit with code 0")
def _testClientStartForeground(self, tmp, startparams, phase):
# start and wait to end (foreground):
logSys.debug("-- start of test worker")
logSys.debug("start of test worker")
phase['start'] = True
self.assertRaises(fail2bancmdline.ExitException, _exec_client,
self.assertRaises(fail2bancmdline.ExitException, _exec_client,
(CLIENT, "-f") + startparams + ("start",))
# end :
phase['end'] = True
logSys.debug("-- end of test worker")
logSys.debug("end of test worker")
@with_tmpdir
def testClientStartForeground(self, tmp):
@ -403,7 +438,7 @@ class Fail2banClientTest(Fail2banClientServerBase):
startparams = _start_params(tmp, logtarget="INHERITED")
# because foreground block execution - start it in thread:
phase = dict()
th = Thread(name="_TestCaseWorker",
th = Thread(name="_TestCaseWorker",
target=Fail2banClientTest._testClientStartForeground, args=(self, tmp, startparams, phase))
th.daemon = True
th.start()
@ -415,16 +450,16 @@ class Fail2banClientTest(Fail2banClientServerBase):
self._wait_for_srv(tmp, True, startparams=startparams)
self.pruneLog()
# several commands to server:
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("ping",))
self.assertRaises(FailExitException, _exec_client,
self.assertRaises(FailExitException, _exec_client,
(CLIENT,) + startparams + ("~~unknown~cmd~failed~~",))
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("echo", "TEST-ECHO",))
finally:
self.pruneLog()
# stop:
self.assertRaises(ExitException, _exec_client,
self.assertRaises(ExitException, _exec_client,
(CLIENT,) + startparams + ("stop",))
# wait for end:
Utils.wait_for(lambda: phase.get('end', None) is not None, MAX_WAITTIME)
@ -442,33 +477,33 @@ class Fail2banClientTest(Fail2banClientServerBase):
startparams = _start_params(tmp, logtarget="INHERITED")
## wrong config directory
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "--async", "-c", os.path.join(tmp,"miss"), "start",))
self.assertLogged("Base configuration directory " + os.path.join(tmp,"miss") + " does not exist")
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "--async", "-c", pjoin(tmp, "miss"), "start",))
self.assertLogged("Base configuration directory " + pjoin(tmp, "miss") + " does not exist")
self.pruneLog()
## wrong socket
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "--async", "-c", os.path.join(tmp,"config"), "-s", os.path.join(tmp,"miss/f2b.sock"), "start",))
self.assertLogged("There is no directory " + os.path.join(tmp,"miss") + " to contain the socket file")
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "--async", "-c", pjoin(tmp, "config"), "-s", pjoin(tmp, "miss/f2b.sock"), "start",))
self.assertLogged("There is no directory " + pjoin(tmp, "miss") + " to contain the socket file")
self.pruneLog()
## not running
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "-c", os.path.join(tmp,"config"), "-s", os.path.join(tmp,"f2b.sock"), "reload",))
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "-c", pjoin(tmp, "config"), "-s", pjoin(tmp, "f2b.sock"), "reload",))
self.assertLogged("Could not find server")
self.pruneLog()
## already exists:
open(os.path.join(tmp,"f2b.sock"), 'a').close()
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "--async", "-c", os.path.join(tmp,"config"), "-s", os.path.join(tmp,"f2b.sock"), "start",))
open(pjoin(tmp, "f2b.sock"), 'a').close()
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "--async", "-c", pjoin(tmp, "config"), "-s", pjoin(tmp, "f2b.sock"), "start",))
self.assertLogged("Fail2ban seems to be in unexpected state (not running but the socket exists)")
self.pruneLog()
os.remove(os.path.join(tmp,"f2b.sock"))
os.remove(pjoin(tmp, "f2b.sock"))
## wrong option:
self.assertRaises(FailExitException, _exec_client,
self.assertRaises(FailExitException, _exec_client,
(CLIENT, "-s",))
self.assertLogged("Usage: ")
self.pruneLog()
@ -488,7 +523,7 @@ class Fail2banClientTest(Fail2banClientServerBase):
class Fail2banServerTest(Fail2banClientServerBase):
def testServerUsage(self):
self.assertRaises(ExitException, _exec_server,
self.assertRaises(ExitException, _exec_server,
(SERVER, "-h",))
self.assertLogged("Usage: " + SERVER)
self.assertLogged("Report bugs to ")
@ -497,9 +532,9 @@ class Fail2banServerTest(Fail2banClientServerBase):
@with_kill_srv
def testServerStartBackground(self, tmp):
# to prevent fork of test-cases process, start server in background via command:
startparams = _start_params(tmp, logtarget=os.path.join(tmp,"f2b.log"))
startparams = _start_params(tmp, logtarget=pjoin(tmp, "f2b.log"))
# start (in new process, using the same python version):
cmd = (sys.executable, os.path.join(os.path.join(BIN), SERVER))
cmd = (sys.executable, pjoin(pjoin(BIN), SERVER))
logSys.debug('Start %s ...', cmd)
cmd = cmd + startparams + ("-b",)
ret = Utils.executeCmd(cmd, timeout=MAX_WAITTIME, shell=False, output=True)
@ -509,14 +544,14 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.assertLogged("Server ready")
self.pruneLog()
try:
self.assertRaises(ExitException, _exec_server,
self.assertRaises(ExitException, _exec_server,
(SERVER,) + startparams + ("echo", "TEST-ECHO",))
self.assertRaises(FailExitException, _exec_server,
self.assertRaises(FailExitException, _exec_server,
(SERVER,) + startparams + ("~~unknown~cmd~failed~~",))
finally:
self.pruneLog()
# stop:
self.assertRaises(ExitException, _exec_server,
self.assertRaises(ExitException, _exec_server,
(SERVER,) + startparams + ("stop",))
self.assertLogged("Shutdown successful")
self.assertLogged("Exit with code 0")
@ -525,7 +560,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
# start and wait to end (foreground):
logSys.debug("-- start of test worker")
phase['start'] = True
self.assertRaises(fail2bancmdline.ExitException, _exec_server,
self.assertRaises(fail2bancmdline.ExitException, _exec_server,
(SERVER, "-f") + startparams + ("start",))
# end :
phase['end'] = True
@ -539,7 +574,7 @@ class Fail2banServerTest(Fail2banClientServerBase):
startparams = _start_params(tmp, logtarget="INHERITED")
# because foreground block execution - start it in thread:
phase = dict()
th = Thread(name="_TestCaseWorker",
th = Thread(name="_TestCaseWorker",
target=Fail2banServerTest._testServerStartForeground, args=(self, tmp, startparams, phase))
th.daemon = True
th.start()
@ -551,16 +586,16 @@ class Fail2banServerTest(Fail2banClientServerBase):
self._wait_for_srv(tmp, True, startparams=startparams)
self.pruneLog()
# several commands to server:
self.assertRaises(ExitException, _exec_server,
self.assertRaises(ExitException, _exec_server,
(SERVER,) + startparams + ("ping",))
self.assertRaises(FailExitException, _exec_server,
self.assertRaises(FailExitException, _exec_server,
(SERVER,) + startparams + ("~~unknown~cmd~failed~~",))
self.assertRaises(ExitException, _exec_server,
self.assertRaises(ExitException, _exec_server,
(SERVER,) + startparams + ("echo", "TEST-ECHO",))
finally:
self.pruneLog()
# stop:
self.assertRaises(ExitException, _exec_server,
self.assertRaises(ExitException, _exec_server,
(SERVER,) + startparams + ("stop",))
# wait for end:
Utils.wait_for(lambda: phase.get('end', None) is not None, MAX_WAITTIME)
@ -578,21 +613,21 @@ class Fail2banServerTest(Fail2banClientServerBase):
startparams = _start_params(tmp, logtarget="INHERITED")
## wrong config directory
self.assertRaises(FailExitException, _exec_server,
(SERVER, "-c", os.path.join(tmp,"miss"),))
self.assertLogged("Base configuration directory " + os.path.join(tmp,"miss") + " does not exist")
self.assertRaises(FailExitException, _exec_server,
(SERVER, "-c", pjoin(tmp, "miss"),))
self.assertLogged("Base configuration directory " + pjoin(tmp, "miss") + " does not exist")
self.pruneLog()
## wrong socket
self.assertRaises(FailExitException, _exec_server,
(SERVER, "-c", os.path.join(tmp,"config"), "-x", "-s", os.path.join(tmp,"miss/f2b.sock"),))
self.assertLogged("There is no directory " + os.path.join(tmp,"miss") + " to contain the socket file")
self.assertRaises(FailExitException, _exec_server,
(SERVER, "-c", pjoin(tmp, "config"), "-x", "-s", pjoin(tmp, "miss/f2b.sock"),))
self.assertLogged("There is no directory " + pjoin(tmp, "miss") + " to contain the socket file")
self.pruneLog()
## already exists:
open(os.path.join(tmp,"f2b.sock"), 'a').close()
self.assertRaises(FailExitException, _exec_server,
(SERVER, "-c", os.path.join(tmp,"config"), "-s", os.path.join(tmp,"f2b.sock"),))
open(pjoin(tmp, "f2b.sock"), 'a').close()
self.assertRaises(FailExitException, _exec_server,
(SERVER, "-c", pjoin(tmp, "config"), "-s", pjoin(tmp, "f2b.sock"),))
self.assertLogged("Fail2ban seems to be in unexpected state (not running but the socket exists)")
self.pruneLog()
os.remove(os.path.join(tmp,"f2b.sock"))
os.remove(pjoin(tmp, "f2b.sock"))

Loading…
Cancel
Save