Merge pull request #1726 from sebres/0.10-grave-fix-escape-tags-1st

0.10 fix escape tags
pull/1410/head
Serg G. Brester 8 years ago committed by GitHub
commit 1e6787877a

@ -32,7 +32,7 @@ from ..helpers import getLogger
if sys.version_info >= (3,2):
# SafeConfigParser deprecated from Python 3.2 (renamed to ConfigParser)
from configparser import ConfigParser as SafeConfigParser, \
from configparser import ConfigParser as SafeConfigParser, NoSectionError, \
BasicInterpolation
# And interpolation of __name__ was simply removed, thus we need to
@ -60,7 +60,7 @@ if sys.version_info >= (3,2):
parser, option, accum, rest, section, map, depth)
else: # pragma: no cover
from ConfigParser import SafeConfigParser
from ConfigParser import SafeConfigParser, NoSectionError
# Gets the instance of the logger.
logSys = getLogger(__name__)
@ -200,6 +200,21 @@ after = 1.conf
def get_sections(self):
return self._sections
def options(self, section, withDefault=True):
"""Return a list of option names for the given section name.
Parameter `withDefault` controls the include of names from section `[DEFAULT]`
"""
try:
opts = self._sections[section]
except KeyError:
raise NoSectionError(section)
if withDefault:
# mix it with defaults:
return set(opts.keys()) | set(self._defaults)
# only own option names:
return opts.keys()
def read(self, filenames, get_includes=True):
if not isinstance(filenames, list):
filenames = [ filenames ]

@ -109,33 +109,44 @@ class ConfigReader():
self._cfg = ConfigReaderUnshared(**self._cfg_share_kwargs)
def sections(self):
if self._cfg is not None:
try:
return self._cfg.sections()
return []
except AttributeError:
return []
def has_section(self, sec):
if self._cfg is not None:
try:
return self._cfg.has_section(sec)
return False
except AttributeError:
return False
def merge_section(self, *args, **kwargs):
if self._cfg is not None:
return self._cfg.merge_section(*args, **kwargs)
def merge_section(self, section, *args, **kwargs):
try:
return self._cfg.merge_section(section, *args, **kwargs)
except AttributeError:
raise NoSectionError(section)
def options(self, section, withDefault=False):
"""Return a list of option names for the given section name.
def options(self, *args):
if self._cfg is not None:
return self._cfg.options(*args)
return {}
Parameter `withDefault` controls the include of names from section `[DEFAULT]`
"""
try:
return self._cfg.options(section, withDefault)
except AttributeError:
raise NoSectionError(section)
def get(self, sec, opt, raw=False, vars={}):
if self._cfg is not None:
try:
return self._cfg.get(sec, opt, raw=raw, vars=vars)
return None
except AttributeError:
raise NoSectionError(sec)
def getOptions(self, *args, **kwargs):
if self._cfg is not None:
return self._cfg.getOptions(*args, **kwargs)
return {}
def getOptions(self, section, *args, **kwargs):
try:
return self._cfg.getOptions(section, *args, **kwargs)
except AttributeError:
raise NoSectionError(section)
class ConfigReaderUnshared(SafeConfigParserWithIncludes):
@ -297,23 +308,35 @@ class DefinitionInitConfigReader(ConfigReader):
self._create_unshared(self._file)
return SafeConfigParserWithIncludes.read(self._cfg, self._file)
def getOptions(self, pOpts):
def getOptions(self, pOpts, all=False):
# overwrite static definition options with init values, supplied as
# direct parameters from jail-config via action[xtra1="...", xtra2=...]:
if not pOpts:
pOpts = dict()
if self._initOpts:
if not pOpts:
pOpts = dict()
pOpts = _merge_dicts(pOpts, self._initOpts)
self._opts = ConfigReader.getOptions(
self, "Definition", self._configOpts, pOpts)
self._pOpts = pOpts
if self.has_section("Init"):
for opt in self.options("Init"):
v = self.get("Init", opt)
if not opt.startswith('known/') and opt != '__name__':
# get only own options (without options from default):
getopt = lambda opt: self.get("Init", opt)
for opt in self.options("Init", withDefault=False):
if opt == '__name__': continue
v = None
if not opt.startswith('known/'):
if v is None: v = getopt(opt)
self._initOpts['known/'+opt] = v
if not opt in self._initOpts:
if opt not in self._initOpts:
if v is None: v = getopt(opt)
self._initOpts[opt] = v
if all and self.has_section("Definition"):
# merge with all definition options (and options from default),
# bypass already converted option (so merge only new options):
for opt in self.options("Definition"):
if opt == '__name__' or opt in self._opts: continue
self._opts[opt] = self.get("Definition", opt)
def _convert_to_boolean(self, value):
return value.lower() in ("1", "yes", "true", "on")
@ -336,12 +359,12 @@ class DefinitionInitConfigReader(ConfigReader):
def getCombined(self, ignore=()):
combinedopts = self._opts
ignore = set(ignore).copy()
if self._initOpts:
combinedopts = _merge_dicts(self._opts, self._initOpts)
combinedopts = _merge_dicts(combinedopts, self._initOpts)
if not len(combinedopts):
return {}
# ignore conditional options:
ignore = set(ignore).copy()
for n in combinedopts:
cond = SafeConfigParserWithIncludes.CONDITIONAL_RE.match(n)
if cond:

@ -139,11 +139,11 @@ class JailReader(ConfigReader):
filterName, self.__name, filterOpt,
share_config=self.share_config, basedir=self.getBaseDir())
ret = self.__filter.read()
# merge options from filter as 'known/...':
self.__filter.getOptions(self.__opts)
ConfigReader.merge_section(self, self.__name, self.__filter.getCombined(), 'known/')
if not ret:
raise JailDefError("Unable to read the filter %r" % filterName)
# merge options from filter as 'known/...' (all options unfiltered):
self.__filter.getOptions(self.__opts, all=True)
ConfigReader.merge_section(self, self.__name, self.__filter.getCombined(), 'known/')
else:
self.__filter = None
logSys.warning("No filter set for jail %s" % self.__name)

@ -453,7 +453,7 @@ class CommandAction(ActionBase):
return value
@classmethod
def replaceTag(cls, query, aInfo, conditional='', cache=None, substRec=True):
def replaceTag(cls, query, aInfo, conditional='', cache=None):
"""Replaces tags in `query` with property values.
Parameters
@ -481,9 +481,8 @@ class CommandAction(ActionBase):
# **Important**: don't replace if calling map - contains dynamic values only,
# no recursive tags, otherwise may be vulnerable on foreign user-input:
noRecRepl = isinstance(aInfo, CallingMap)
if noRecRepl:
subInfo = aInfo
else:
subInfo = aInfo
if not noRecRepl:
# substitute tags recursive (and cache if possible),
# first try get cached tags dictionary:
subInfo = csubkey = None
@ -534,13 +533,86 @@ class CommandAction(ActionBase):
"unexpected too long replacement interpolation, "
"possible self referencing definitions in query: %s" % (query,))
# cache if possible:
if cache is not None:
cache[ckey] = value
#
return value
ESCAPE_CRE = re.compile(r"""[\\#&;`|*?~<>\^\(\)\[\]{}$'"\n\r]""")
ESCAPE_VN_CRE = re.compile(r"\W")
@classmethod
def replaceDynamicTags(cls, realCmd, aInfo):
"""Replaces dynamical tags in `query` with property values.
**Important**
-------------
Because this tags are dynamic resp. foreign (user) input:
- values should be escaped (using "escape" as shell variable)
- no recursive substitution (no interpolation for <a<b>>)
- don't use cache
Parameters
----------
query : str
String with tags.
aInfo : dict
Tags(keys) and associated values for substitution in query.
Returns
-------
str
shell script as string or array with tags replaced (direct or as variables).
"""
# array for escaped vars:
varsDict = dict()
def escapeVal(tag, value):
# if the value should be escaped:
if cls.ESCAPE_CRE.search(value):
# That one needs to be escaped since its content is
# out of our control
tag = 'f2bV_%s' % cls.ESCAPE_VN_CRE.sub('_', tag)
varsDict[tag] = value # add variable
value = '$'+tag # replacement as variable
# replacement for tag:
return value
# substitution callable, used by interpolation of each tag
def substVal(m):
tag = m.group(1) # tagname from match
try:
value = aInfo[tag]
except KeyError:
# fallback (no or default replacement)
return ADD_REPL_TAGS.get(tag, m.group())
value = str(value) # assure string
# replacement for tag:
return escapeVal(tag, value)
# Replace normally properties of aInfo non-recursive:
realCmd = TAG_CRE.sub(substVal, realCmd)
# Replace ticket options (filter capture groups) non-recursive:
if '<' in realCmd:
tickData = aInfo.get("F-*")
if not tickData: tickData = {}
def substTag(m):
tag = mapTag2Opt(m.groups()[0])
try:
value = str(tickData[tag])
except KeyError:
return ""
return escapeVal("F_"+tag, value)
realCmd = FCUSTAG_CRE.sub(substTag, realCmd)
# build command corresponding "escaped" variables:
if varsDict:
realCmd = Utils.buildShellCmd(realCmd, varsDict)
return realCmd
def _processCmd(self, cmd, aInfo=None, conditional=''):
"""Executes a command with preliminary checks and substitutions.
@ -605,21 +677,9 @@ class CommandAction(ActionBase):
realCmd = self.replaceTag(cmd, self._properties,
conditional=conditional, cache=self.__substCache)
# Replace dynamical tags (don't use cache here)
# Replace dynamical tags, important - don't cache, no recursion and auto-escape here
if aInfo is not None:
realCmd = self.replaceTag(realCmd, aInfo, conditional=conditional)
# Replace ticket options (filter capture groups) non-recursive:
if '<' in realCmd:
tickData = aInfo.get("F-*")
if not tickData: tickData = {}
def substTag(m):
tn = mapTag2Opt(m.groups()[0])
try:
return str(tickData[tn])
except KeyError:
return ""
realCmd = FCUSTAG_CRE.sub(substTag, realCmd)
realCmd = self.replaceDynamicTags(realCmd, aInfo)
else:
realCmd = cmd
@ -653,8 +713,5 @@ class CommandAction(ActionBase):
logSys.debug("Nothing to do")
return True
_cmd_lock.acquire()
try:
with _cmd_lock:
return Utils.executeCmd(realCmd, timeout, shell=True, output=False, **kwargs)
finally:
_cmd_lock.release()

@ -290,6 +290,7 @@ class Actions(JailThread, Mapping):
AI_DICT = {
"ip": lambda self: self.__ticket.getIP(),
"family": lambda self: self['ip'].familyStr,
"ip-rev": lambda self: self['ip'].getPTR(''),
"ip-host": lambda self: self['ip'].getHost(),
"fid": lambda self: self.__ticket.getID(),

@ -261,6 +261,11 @@ class IPAddr(object):
def family(self):
return self._family
FAM2STR = {socket.AF_INET: 'inet4', socket.AF_INET6: 'inet6'}
@property
def familyStr(self):
return IPAddr.FAM2STR.get(self._family)
@property
def plen(self):
return self._plen

@ -28,7 +28,7 @@ import signal
import subprocess
import sys
import time
from ..helpers import getLogger, uni_decode
from ..helpers import getLogger, _merge_dicts, uni_decode
if sys.version_info >= (3, 3):
import importlib.machinery
@ -60,6 +60,7 @@ class Utils():
DEFAULT_SLEEP_TIME = 2
DEFAULT_SLEEP_INTERVAL = 0.2
DEFAULT_SHORT_INTERVAL = 0.001
DEFAULT_SHORTEST_INTERVAL = DEFAULT_SHORT_INTERVAL / 100
class Cache(object):
@ -116,7 +117,31 @@ class Utils():
return flags
@staticmethod
def executeCmd(realCmd, timeout=60, shell=True, output=False, tout_kill_tree=True, success_codes=(0,)):
def buildShellCmd(realCmd, varsDict):
"""Generates new shell command as array, contains map as variables to
arguments statement (varsStat), the command (realCmd) used this variables and
the list of the arguments, mapped from varsDict
Example:
buildShellCmd('echo "V2: $v2, V1: $v1"', {"v1": "val 1", "v2": "val 2", "vUnused": "unused var"})
returns:
['v1=$0 v2=$1 vUnused=$2 \necho "V2: $v2, V1: $v1"', 'val 1', 'val 2', 'unused var']
"""
# build map as array of vars and command line array:
varsStat = ""
if not isinstance(realCmd, list):
realCmd = [realCmd]
i = len(realCmd)-1
for k, v in varsDict.iteritems():
varsStat += "%s=$%s " % (k, i)
realCmd.append(v)
i += 1
realCmd[0] = varsStat + "\n" + realCmd[0]
return realCmd
@staticmethod
def executeCmd(realCmd, timeout=60, shell=True, output=False, tout_kill_tree=True,
success_codes=(0,), varsDict=None):
"""Executes a command.
Parameters
@ -131,6 +156,8 @@ class Utils():
output : bool
If output is True, the function returns tuple (success, stdoutdata, stderrdata, returncode).
If False, just indication of success is returned
varsDict: dict
variables supplied to the command (or to the shell script)
Returns
-------
@ -146,10 +173,18 @@ class Utils():
"""
stdout = stderr = None
retcode = None
popen = None
popen = env = None
if varsDict:
if shell:
# build map as array of vars and command line array:
realCmd = Utils.buildShellCmd(realCmd, varsDict)
else: # pragma: no cover - currently unused
env = _merge_dicts(os.environ, varsDict)
realCmdId = id(realCmd)
logCmd = lambda level: logSys.log(level, "%x -- exec: %s", realCmdId, realCmd)
try:
popen = subprocess.Popen(
realCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell,
realCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, env=env,
preexec_fn=os.setsid # so that killpg does not kill our process
)
# wait with timeout for process has terminated:
@ -158,13 +193,15 @@ class Utils():
def _popen_wait_end():
retcode = popen.poll()
return (True, retcode) if retcode is not None else None
retcode = Utils.wait_for(_popen_wait_end, timeout, Utils.DEFAULT_SHORT_INTERVAL)
# popen.poll is fast operation so we can use the shortest sleep interval:
retcode = Utils.wait_for(_popen_wait_end, timeout, Utils.DEFAULT_SHORTEST_INTERVAL)
if retcode:
retcode = retcode[1]
# if timeout:
if retcode is None:
logSys.error("%s -- timed out after %s seconds." %
(realCmd, timeout))
if logCmd: logCmd(logging.ERROR); logCmd = None
logSys.error("%x -- timed out after %s seconds." %
(realCmdId, timeout))
pgid = os.getpgid(popen.pid)
# if not tree - first try to terminate and then kill, otherwise - kill (-9) only:
os.killpg(pgid, signal.SIGTERM) # Terminate the process
@ -179,54 +216,56 @@ class Utils():
if retcode is None and not Utils.pid_exists(pgid): # pragma: no cover
retcode = signal.SIGKILL
except OSError as e:
if logCmd: logCmd(logging.ERROR); logCmd = None
stderr = "%s -- failed with %s" % (realCmd, e)
logSys.error(stderr)
if not popen:
return False if not output else (False, stdout, stderr, retcode)
std_level = logging.DEBUG if retcode in success_codes else logging.ERROR
if std_level > logSys.getEffectiveLevel():
if logCmd: logCmd(std_level-1); logCmd = None
# if we need output (to return or to log it):
if output or std_level >= logSys.getEffectiveLevel():
# if was timeouted (killed/terminated) - to prevent waiting, set std handles to non-blocking mode.
if popen.stdout:
try:
if retcode is None or retcode < 0:
Utils.setFBlockMode(popen.stdout, False)
stdout = popen.stdout.read()
except IOError as e:
except IOError as e: # pragma: no cover
logSys.error(" ... -- failed to read stdout %s", e)
if stdout is not None and stdout != '' and std_level >= logSys.getEffectiveLevel():
logSys.log(std_level, "%s -- stdout:", realCmd)
for l in stdout.splitlines():
logSys.log(std_level, " -- stdout: %r", uni_decode(l))
logSys.log(std_level, "%x -- stdout: %r", realCmdId, uni_decode(l))
popen.stdout.close()
if popen.stderr:
try:
if retcode is None or retcode < 0:
Utils.setFBlockMode(popen.stderr, False)
stderr = popen.stderr.read()
except IOError as e:
except IOError as e: # pragma: no cover
logSys.error(" ... -- failed to read stderr %s", e)
if stderr is not None and stderr != '' and std_level >= logSys.getEffectiveLevel():
logSys.log(std_level, "%s -- stderr:", realCmd)
for l in stderr.splitlines():
logSys.log(std_level, " -- stderr: %r", uni_decode(l))
logSys.log(std_level, "%x -- stderr: %r", realCmdId, uni_decode(l))
popen.stderr.close()
success = False
if retcode in success_codes:
logSys.debug("%-.40s -- returned successfully %i", realCmd, retcode)
logSys.debug("%x -- returned successfully %i", realCmdId, retcode)
success = True
elif retcode is None:
logSys.error("%-.40s -- unable to kill PID %i", realCmd, popen.pid)
logSys.error("%x -- unable to kill PID %i", realCmdId, popen.pid)
elif retcode < 0 or retcode > 128:
# dash would return negative while bash 128 + n
sigcode = -retcode if retcode < 0 else retcode - 128
logSys.error("%-.40s -- killed with %s (return code: %s)",
realCmd, signame.get(sigcode, "signal %i" % sigcode), retcode)
logSys.error("%x -- killed with %s (return code: %s)",
realCmdId, signame.get(sigcode, "signal %i" % sigcode), retcode)
else:
msg = _RETCODE_HINTS.get(retcode, None)
logSys.error("%-.40s -- returned %i", realCmd, retcode)
logSys.error("%x -- returned %i", realCmdId, retcode)
if msg:
logSys.info("HINT on %i: %s", retcode, msg % locals())
if output:
@ -290,7 +329,7 @@ class Utils():
return e.errno == errno.EPERM
else:
return True
else:
else: # pragma : no cover (no windows currently supported)
@staticmethod
def pid_exists(pid):
import ctypes

@ -389,6 +389,51 @@ class CommandActionTest(LogCaptureTestCase):
self.assertLogged('Nothing to do')
self.pruneLog()
def testExecuteWithVars(self):
self.assertTrue(self.__action.executeCmd(
r'''printf %b "foreign input:\n'''
r''' -- $f2bV_A --\n'''
r''' -- $f2bV_B --\n'''
r''' -- $(echo -n $f2bV_C) --''' # echo just replaces \n to test it as single line
r'''"''',
varsDict={
'f2bV_A': 'I\'m a hacker; && $(echo $f2bV_B)',
'f2bV_B': 'I"m very bad hacker',
'f2bV_C': '`Very | very\n$(bad & worst hacker)`'
}))
self.assertLogged(r"""foreign input:""",
' -- I\'m a hacker; && $(echo $f2bV_B) --',
' -- I"m very bad hacker --',
' -- `Very | very $(bad & worst hacker)` --', all=True)
def testExecuteReplaceEscapeWithVars(self):
self.__action.actionban = 'echo "** ban <ip>, reason: <reason> ...\\n<matches>"'
self.__action.actionunban = 'echo "** unban <ip>"'
self.__action.actionstop = 'echo "** stop monitoring"'
matches = [
'<actionunban>',
'" Hooray! #',
'`I\'m cool script kiddy',
'`I`m very cool > /here-is-the-path/to/bin/.x-attempt.sh',
'<actionstop>',
]
aInfo = {
'ip': '192.0.2.1',
'reason': 'hacking attempt ( he thought he knows how f2b internally works ;)',
'matches': '\n'.join(matches)
}
self.pruneLog()
self.__action.ban(aInfo)
self.assertLogged(
'** ban %s' % aInfo['ip'], aInfo['reason'], *matches, all=True)
self.assertNotLogged(
'** unban %s' % aInfo['ip'], '** stop monitoring', all=True)
self.pruneLog()
self.__action.unban(aInfo)
self.__action.stop()
self.assertLogged(
'** unban %s' % aInfo['ip'], '** stop monitoring', all=True)
def testExecuteIncorrectCmd(self):
CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null')
self.assertLogged('HINT on 127: "Command not found"')
@ -400,8 +445,9 @@ class CommandActionTest(LogCaptureTestCase):
self.assertFalse(CommandAction.executeCmd('sleep 30', timeout=timeout))
# give a test still 1 second, because system could be too busy
self.assertTrue(time.time() >= stime + timeout and time.time() <= stime + timeout + 1)
self.assertLogged('sleep 30 -- timed out after')
self.assertLogged('sleep 30 -- killed with SIGTERM')
self.assertLogged('sleep 30', ' -- timed out after', all=True)
self.assertLogged(' -- killed with SIGTERM',
' -- killed with SIGKILL')
def testExecuteTimeoutWithNastyChildren(self):
# temporary file for a nasty kid shell script
@ -457,9 +503,9 @@ class CommandActionTest(LogCaptureTestCase):
# Verify that the process itself got killed
self.assertTrue(Utils.wait_for(lambda: not pid_exists(cpid), 3))
self.assertLogged('my pid ', 'Resource temporarily unavailable')
self.assertLogged('timed out')
self.assertLogged('killed with SIGTERM',
'killed with SIGKILL')
self.assertLogged(' -- timed out')
self.assertLogged(' -- killed with SIGTERM',
' -- killed with SIGKILL')
os.unlink(tmpFilename)
os.unlink(tmpFilename + '.pid')

@ -28,7 +28,7 @@ import re
import shutil
import tempfile
import unittest
from ..client.configreader import ConfigReader, ConfigReaderUnshared
from ..client.configreader import ConfigReader, ConfigReaderUnshared, NoSectionError
from ..client import configparserinc
from ..client.jailreader import JailReader
from ..client.filterreader import FilterReader
@ -317,7 +317,17 @@ class JailReaderTest(LogCaptureTestCase):
self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2)
self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), [])
def testCommonFunction(self):
c = ConfigReader(share_config={})
# test common functionalities (no shared, without read of config):
self.assertEqual(c.sections(), [])
self.assertFalse(c.has_section('test'))
self.assertRaises(NoSectionError, c.merge_section, 'test', {})
self.assertRaises(NoSectionError, c.options, 'test')
self.assertRaises(NoSectionError, c.get, 'test', 'any')
self.assertRaises(NoSectionError, c.getOptions, 'test', {})
class FilterReaderTest(unittest.TestCase):
def __init__(self, *args, **kwargs):
@ -712,6 +722,7 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
configurator.readAll()
configurator.getOptions()
configurator.convertToProtocol()
commands = configurator.getConfigStream()

@ -1,6 +1,13 @@
#[INCLUDES]
#before = common.conf
[DEFAULT]
_daemon = default
[Definition]
failregex = failure test 1 (filter.d/test.conf) <HOST>
where = conf
failregex = failure <_daemon> <one> (filter.d/test.%(where)s) <HOST>
[Init]
# test parameter, should be overriden in jail by "filter=test[one=1,...]"
one = *1*

@ -2,6 +2,15 @@
#before = common.conf
[Definition]
# overwrite default daemon, additionally it should be accessible in jail with "%(known/_daemon)s":
_daemon = test
# interpolate previous regex (from test.conf) + new 2nd + dynamical substitution) of "two" an "where":
failregex = %(known/failregex)s
failure test 2 (filter.d/test.local) <HOST>
failure %(_daemon)s <two> (filter.d/test.<where>) <HOST>
# parameter "two" should be specified in jail by "filter=test[..., two=2]"
[Init]
# this parameter can be used in jail with "%(known/three)s":
three = 3
# this parameter "where" does not overwrite "where" in definition of test.conf (dynamical values only):
where = local

@ -15,9 +15,9 @@ ignoreip =
[test-known-interp]
enabled = true
filter = test
filter = test[one=1,two=2]
failregex = %(known/failregex)s
failure test 3 (jail.local) <HOST>
failure %(known/_daemon)s %(known/three)s (jail.local) <HOST>
[missinglogfiles]
enabled = true

@ -1679,7 +1679,7 @@ class ServerConfigReaderTests(LogCaptureTestCase):
# complain --
('j-complain-abuse',
'complain['
'name=%(__name__)s, grepopts="-m 1", grepmax=2, mailcmd="mail -s Hostname: <ip-host> - ",' +
'name=%(__name__)s, grepopts="-m 1", grepmax=2, mailcmd="mail -s \'Hostname: <ip-host>, family: <family>\' - ",' +
# test reverse ip:
'debug=1,' +
# 2 logs to test grep from multiple logs:
@ -1694,14 +1694,14 @@ class ServerConfigReaderTests(LogCaptureTestCase):
'testcase01.log:Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10',
'testcase01a.log:Dec 31 11:55:01 [sshd] error: PAM: Authentication failure for test from 87.142.124.10',
# both abuse mails should be separated with space:
'mail -s Hostname: test-host - Abuse from 87.142.124.10 abuse-1@abuse-test-server abuse-2@abuse-test-server',
'mail -s Hostname: test-host, family: inet4 - Abuse from 87.142.124.10 abuse-1@abuse-test-server abuse-2@abuse-test-server',
),
'ip6-ban': (
# test reverse ip:
'try to resolve 1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.abuse-contacts.abusix.org',
'Lines containing failures of 2001:db8::1 (max 2)',
# both abuse mails should be separated with space:
'mail -s Hostname: test-host - Abuse from 2001:db8::1 abuse-1@abuse-test-server abuse-2@abuse-test-server',
'mail -s Hostname: test-host, family: inet6 - Abuse from 2001:db8::1 abuse-1@abuse-test-server abuse-2@abuse-test-server',
),
}),
)

Loading…
Cancel
Save