partially cherry pick from branch 'multi-set', prepare for conditional config parameters logic:

- new readers logic (group some by multiple parameters 'set' -> 'multi-set';
- prevent to add 'known/' parameters twice (by merge section etc);
- test cases fixed;

# Conflicts:
#	fail2ban/client/actionreader.py
pull/1414/head
sebres 9 years ago
parent 43c0f3cdc4
commit 1a6450643d

@ -35,13 +35,13 @@ logSys = getLogger(__name__)
class ActionReader(DefinitionInitConfigReader):
_configOpts = [
["string", "actionstart", None],
["string", "actionstop", None],
["string", "actioncheck", None],
["string", "actionban", None],
["string", "actionunban", None],
]
_configOpts = {
"actionstart": ["string", None],
"actionstop": ["string", None],
"actioncheck": ["string", None],
"actionban": ["string", None],
"actionunban": ["string", None],
}
def __init__(self, file_, jailName, initOpts, **kwargs):
self._name = initOpts.get("actname", file_)
@ -65,20 +65,16 @@ class ActionReader(DefinitionInitConfigReader):
head = ["set", self._jailName]
stream = list()
stream.append(head + ["addaction", self._name])
head.extend(["action", self._name])
for opt in self._opts:
if opt == "actionstart":
stream.append(head + ["actionstart", self._opts[opt]])
elif opt == "actionstop":
stream.append(head + ["actionstop", self._opts[opt]])
elif opt == "actioncheck":
stream.append(head + ["actioncheck", self._opts[opt]])
elif opt == "actionban":
stream.append(head + ["actionban", self._opts[opt]])
elif opt == "actionunban":
stream.append(head + ["actionunban", self._opts[opt]])
multi = []
for opt, optval in self._opts.iteritems():
if opt in self._configOpts:
multi.append([opt, optval])
if self._initOpts:
for p in self._initOpts:
stream.append(head + [p, self._initOpts[p]])
for opt, optval in self._initOpts.iteritems():
multi.append([opt, optval])
if len(multi) > 1:
stream.append(["multi-set", self._jailName, "action", self._name, multi])
elif len(multi):
stream.append(["set", self._jailName, "action", self._name] + multi[0])
return stream

@ -231,7 +231,7 @@ after = 1.conf
# save previous known values, for possible using in local interpolations later:
sk = {}
for k, v in s2.iteritems():
if not k.startswith('known/'):
if not k.startswith('known/') and k != '__name__':
sk['known/'+k] = v
s2.update(sk)
# merge section
@ -256,7 +256,7 @@ after = 1.conf
alls = self.get_sections()
sk = {}
for k, v in options.iteritems():
if pref == '' or not k.startswith(pref):
if pref == '' or (not k.startswith(pref) and k != '__name__'):
sk[pref+k] = v
alls[section].update(sk)

@ -203,40 +203,47 @@ class ConfigReaderUnshared(SafeConfigParserWithIncludes):
#
# Read the given option in the configuration file. Default values
# are used...
# Each optionValues entry is composed of an array with:
# 0 -> the type of the option
# 1 -> the name of the option
# 2 -> the default value for the option
# Each options entry is composed of an array with:
# [[type, name, default], ...]
# Or it is a dict:
# {name: [type, default], ...}
def getOptions(self, sec, options, pOptions=None):
values = dict()
for option in options:
for optname in options:
if isinstance(options, (list,tuple)):
if len(optname) > 2:
opttype, optname, optvalue = optname
else:
(opttype, optname), optvalue = optname, None
else:
opttype, optvalue = options[optname]
try:
if option[0] == "bool":
v = self.getboolean(sec, option[1])
elif option[0] == "int":
v = self.getint(sec, option[1])
if opttype == "bool":
v = self.getboolean(sec, optname)
elif opttype == "int":
v = self.getint(sec, optname)
else:
v = self.get(sec, option[1])
if not pOptions is None and option[1] in pOptions:
v = self.get(sec, optname)
if not pOptions is None and optname in pOptions:
continue
values[option[1]] = v
values[optname] = v
except NoSectionError, e:
# No "Definition" section or wrong basedir
logSys.error(e)
values[option[1]] = option[2]
values[optname] = optvalue
# TODO: validate error handling here.
except NoOptionError:
if not option[2] is None:
if not optvalue is None:
logSys.warning("'%s' not defined in '%s'. Using default one: %r"
% (option[1], sec, option[2]))
values[option[1]] = option[2]
% (optname, sec, optvalue))
values[optname] = optvalue
elif logSys.getEffectiveLevel() <= logLevel:
logSys.log(logLevel, "Non essential option '%s' not defined in '%s'.", option[1], sec)
logSys.log(logLevel, "Non essential option '%s' not defined in '%s'.", optname, sec)
except ValueError:
logSys.warning("Wrong value for '" + option[1] + "' in '" + sec +
"'. Using default one: '" + repr(option[2]) + "'")
values[option[1]] = option[2]
logSys.warning("Wrong value for '" + optname + "' in '" + sec +
"'. Using default one: '" + repr(optvalue) + "'")
values[optname] = optvalue
return values
@ -286,7 +293,8 @@ class DefinitionInitConfigReader(ConfigReader):
if self.has_section("Init"):
for opt in self.options("Init"):
v = self.get("Init", opt)
self._initOpts['known/'+opt] = v
if not opt.startswith('known/') and opt != '__name__':
self._initOpts['known/'+opt] = v
if not opt in self._initOpts:
self._initOpts[opt] = v

@ -291,7 +291,14 @@ class Fail2banRegex(object):
RegexStat(m[3])
for m in filter(
lambda x: x[0] == 'set' and x[2] == "add%sregex" % regextype,
readercommands)]
readercommands)
] + [
RegexStat(m)
for mm in filter(
lambda x: x[0] == 'multi-set' and x[2] == "add%sregex" % regextype,
readercommands)
for m in mm[3]
]
# Read out and set possible value of maxlines
for command in readercommands:
if command[2] == "maxlines":

@ -37,10 +37,10 @@ logSys = getLogger(__name__)
class FilterReader(DefinitionInitConfigReader):
_configOpts = [
["string", "ignoreregex", None],
["string", "failregex", ""],
]
_configOpts = {
"ignoreregex": ["string", None],
"failregex": ["string", ""],
}
def setFile(self, fileName):
self.__file = fileName
@ -64,16 +64,16 @@ class FilterReader(DefinitionInitConfigReader):
if not len(opts):
return stream
for opt, value in opts.iteritems():
if opt == "failregex":
if opt in ("failregex", "ignoreregex"):
multi = []
for regex in value.split('\n'):
# Do not send a command if the rule is empty.
if regex != '':
stream.append(["set", self._jailName, "addfailregex", regex])
elif opt == "ignoreregex":
for regex in value.split('\n'):
# Do not send a command if the rule is empty.
if regex != '':
stream.append(["set", self._jailName, "addignoreregex", regex])
multi.append(regex)
if len(multi) > 1:
stream.append(["multi-set", self._jailName, "add" + opt, multi])
elif len(multi):
stream.append(["set", self._jailName, "add" + opt, multi[0]])
if self._initOpts:
if 'maxlines' in self._initOpts:
# We warn when multiline regex is used without maxlines > 1

@ -190,11 +190,11 @@ class JailReader(ConfigReader):
"""
stream = []
for opt in self.__opts:
for opt, value in self.__opts.iteritems():
if opt == "logpath" and \
self.__opts.get('backend', None) != "systemd":
found_files = 0
for path in self.__opts[opt].split("\n"):
for path in value.split("\n"):
path = path.rsplit(" ", 1)
path, tail = path if len(path) > 1 else (path[0], "head")
pathList = JailReader._glob(path)
@ -208,32 +208,32 @@ class JailReader(ConfigReader):
raise ValueError(
"Have not found any log file for %s jail" % self.__name)
elif opt == "logencoding":
stream.append(["set", self.__name, "logencoding", self.__opts[opt]])
stream.append(["set", self.__name, "logencoding", value])
elif opt == "backend":
backend = self.__opts[opt]
backend = value
elif opt == "maxretry":
stream.append(["set", self.__name, "maxretry", self.__opts[opt]])
stream.append(["set", self.__name, "maxretry", value])
elif opt == "ignoreip":
for ip in splitcommaspace(self.__opts[opt]):
for ip in splitcommaspace(value):
stream.append(["set", self.__name, "addignoreip", ip])
elif opt == "findtime":
stream.append(["set", self.__name, "findtime", self.__opts[opt]])
stream.append(["set", self.__name, "findtime", value])
elif opt == "bantime":
stream.append(["set", self.__name, "bantime", self.__opts[opt]])
stream.append(["set", self.__name, "bantime", value])
elif opt == "usedns":
stream.append(["set", self.__name, "usedns", self.__opts[opt]])
elif opt == "failregex":
for regex in self.__opts[opt].split('\n'):
stream.append(["set", self.__name, "usedns", value])
elif opt in ("failregex", "ignoreregex"):
multi = []
for regex in value.split('\n'):
# Do not send a command if the rule is empty.
if regex != '':
stream.append(["set", self.__name, "addfailregex", regex])
multi.append(regex)
if len(multi) > 1:
stream.append(["multi-set", self.__name, "add" + opt, multi])
elif len(multi):
stream.append(["set", self.__name, "add" + opt, multi[0]])
elif opt == "ignorecommand":
stream.append(["set", self.__name, "ignorecommand", self.__opts[opt]])
elif opt == "ignoreregex":
for regex in self.__opts[opt].split('\n'):
# Do not send a command if the rule is empty.
if regex != '':
stream.append(["set", self.__name, "addignoreregex", regex])
stream.append(["set", self.__name, "ignorecommand", value])
if self.__filter:
stream.extend(self.__filter.convert())
for action in self.__actions:

@ -262,8 +262,13 @@ class Server:
def getIgnoreCommand(self, name):
return self.__jails[name].filter.getIgnoreCommand()
def addFailRegex(self, name, value):
self.__jails[name].filter.addFailRegex(value)
def addFailRegex(self, name, value, multiple=False):
flt = self.__jails[name].filter
if multiple:
for value in value:
flt.addFailRegex(value)
else:
flt.addFailRegex(value)
def delFailRegex(self, name, index):
self.__jails[name].filter.delFailRegex(index)
@ -271,8 +276,13 @@ class Server:
def getFailRegex(self, name):
return self.__jails[name].filter.getFailRegex()
def addIgnoreRegex(self, name, value):
self.__jails[name].filter.addIgnoreRegex(value)
def addIgnoreRegex(self, name, value, multiple=False):
flt = self.__jails[name].filter
if multiple:
for value in value:
flt.addIgnoreRegex(value)
else:
flt.addIgnoreRegex(value)
def delIgnoreRegex(self, name, index):
self.__jails[name].filter.delIgnoreRegex(index)

@ -99,6 +99,8 @@ class Transmitter:
return None
elif command[0] == "flushlogs":
return self.__server.flushLogs()
elif command[0] == "multi-set":
return self.__commandSet(command[1:], True)
elif command[0] == "set":
return self.__commandSet(command[1:])
elif command[0] == "get":
@ -109,7 +111,7 @@ class Transmitter:
return version.version
raise Exception("Invalid command")
def __commandSet(self, command):
def __commandSet(self, command, multiple=False):
name = command[0]
# Logging
if name == "loglevel":
@ -196,7 +198,9 @@ class Transmitter:
return self.__server.getJournalMatch(name)
elif command[1] == "addfailregex":
value = command[2]
self.__server.addFailRegex(name, value)
self.__server.addFailRegex(name, value, multiple=multiple)
if multiple:
return True
return self.__server.getFailRegex(name)
elif command[1] == "delfailregex":
value = int(command[2])
@ -204,7 +208,9 @@ class Transmitter:
return self.__server.getFailRegex(name)
elif command[1] == "addignoreregex":
value = command[2]
self.__server.addIgnoreRegex(name, value)
self.__server.addIgnoreRegex(name, value, multiple=multiple)
if multiple:
return True
return self.__server.getIgnoreRegex(name)
elif command[1] == "delignoreregex":
value = int(command[2])
@ -254,15 +260,26 @@ class Transmitter:
return None
elif command[1] == "action":
actionname = command[2]
actionkey = command[3]
action = self.__server.getAction(name, actionname)
if callable(getattr(action, actionkey, None)):
actionvalue = json.loads(command[4]) if len(command)>4 else {}
return getattr(action, actionkey)(**actionvalue)
if multiple:
for cmd in command[3]:
actionkey = cmd[0]
if callable(getattr(action, actionkey, None)):
actionvalue = json.loads(cmd[1]) if len(cmd)>1 else {}
getattr(action, actionkey)(**actionvalue)
else:
actionvalue = cmd[1]
setattr(action, actionkey, actionvalue)
return True
else:
actionvalue = command[4]
setattr(action, actionkey, actionvalue)
return getattr(action, actionkey)
actionkey = command[3]
if callable(getattr(action, actionkey, None)):
actionvalue = json.loads(command[4]) if len(command)>4 else {}
return getattr(action, actionkey)(**actionvalue)
else:
actionvalue = command[4]
setattr(action, actionkey, actionvalue)
return getattr(action, actionkey)
raise Exception("Invalid command (no set action or not yet implemented)")
def __commandGet(self, command):

@ -275,7 +275,15 @@ class JailReaderTest(LogCaptureTestCase):
# convert and get stream
stream = jail.convert()
# get action and retrieve agent from it, compare with agent saved in version:
act = [o for o in stream if len(o) > 4 and (o[4] == 'agent' or o[4].endswith('badips.py'))]
act = []
for cmd in stream:
if len(cmd) <= 4:
continue
# differentiate between set and multi-set (wrop it here to single set):
if cmd[0] == 'set' and (cmd[4] == 'agent' or cmd[4].endswith('badips.py')):
act.append(cmd)
elif cmd[0] == 'multi-set':
act.extend([['set'] + cmd[1:4] + o for o in cmd[4] if o[0] == 'agent'])
useragent = 'Fail2Ban/%s' % version
self.assertEqual(len(act), 4)
self.assertEqual(act[0], ['set', 'blocklisttest', 'action', 'blocklist_de', 'agent', useragent])
@ -311,23 +319,21 @@ class FilterReaderTest(unittest.TestCase):
self.__share_cfg = {}
def testConvert(self):
output = [['set', 'testcase01', 'addfailregex',
output = [['multi-set', 'testcase01', 'addfailregex', [
"^\\s*(?:\\S+ )?(?:kernel: \\[\\d+\\.\\d+\\] )?(?:@vserver_\\S+ )"
"?(?:(?:\\[\\d+\\])?:\\s+[\\[\\(]?sshd(?:\\(\\S+\\))?[\\]\\)]?:?|"
"[\\[\\(]?sshd(?:\\(\\S+\\))?[\\]\\)]?:?(?:\\[\\d+\\])?:)?\\s*(?:"
"error: PAM: )?Authentication failure for .* from <HOST>\\s*$"],
['set', 'testcase01', 'addfailregex',
"error: PAM: )?Authentication failure for .* from <HOST>\\s*$",
"^\\s*(?:\\S+ )?(?:kernel: \\[\\d+\\.\\d+\\] )?(?:@vserver_\\S+ )"
"?(?:(?:\\[\\d+\\])?:\\s+[\\[\\(]?sshd(?:\\(\\S+\\))?[\\]\\)]?:?|"
"[\\[\\(]?sshd(?:\\(\\S+\\))?[\\]\\)]?:?(?:\\[\\d+\\])?:)?\\s*(?:"
"error: PAM: )?User not known to the underlying authentication mo"
"dule for .* from <HOST>\\s*$"],
['set', 'testcase01', 'addfailregex',
"dule for .* from <HOST>\\s*$",
"^\\s*(?:\\S+ )?(?:kernel: \\[\\d+\\.\\d+\\] )?(?:@vserver_\\S+ )"
"?(?:(?:\\[\\d+\\])?:\\s+[\\[\\(]?sshd(?:\\(\\S+\\))?[\\]\\)]?:?|"
"[\\[\\(]?sshd(?:\\(\\S+\\))?[\\]\\)]?:?(?:\\[\\d+\\])?:)?\\s*(?:"
"error: PAM: )?User not known to the\\nunderlying authentication."
"+$<SKIPLINES>^.+ module for .* from <HOST>\\s*$"],
"+$<SKIPLINES>^.+ module for .* from <HOST>\\s*$"]],
['set', 'testcase01', 'addignoreregex',
"^.+ john from host 192.168.1.1\\s*$"],
['set', 'testcase01', 'addjournalmatch',
@ -495,9 +501,11 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertEqual(sorted(comm_commands),
sorted([['add', 'emptyaction', 'auto'],
['add', 'test-known-interp', 'auto'],
['set', 'test-known-interp', 'addfailregex', 'failure test 1 (filter.d/test.conf) <HOST>'],
['set', 'test-known-interp', 'addfailregex', 'failure test 2 (filter.d/test.local) <HOST>'],
['set', 'test-known-interp', 'addfailregex', 'failure test 3 (jail.local) <HOST>'],
['multi-set', 'test-known-interp', 'addfailregex', [
'failure test 1 (filter.d/test.conf) <HOST>',
'failure test 2 (filter.d/test.local) <HOST>',
'failure test 3 (jail.local) <HOST>'
]],
['start', 'test-known-interp'],
['add', 'missinglogfiles', 'auto'],
['set', 'missinglogfiles', 'addfailregex', '<IP>'],
@ -660,12 +668,16 @@ class JailsReaderTest(LogCaptureTestCase):
self.assertTrue('blocktype' in action._initOpts)
# Verify that we have a call to set it up
blocktype_present = False
target_command = ['set', jail_name, 'action', action_name, 'blocktype']
target_command = [jail_name, 'action', action_name]
for command in commands:
if (len(command) > 5 and
command[:5] == target_command):
blocktype_present = True
continue
if (len(command) > 4 and command[0] == 'multi-set' and
command[1:4] == target_command):
blocktype_present = ('blocktype' in [cmd[0] for cmd in command[4]])
elif (len(command) > 5 and command[0] == 'set' and
command[1:4] == target_command and command[4] == 'blocktype'): # pragma: no cover - because of multi-set
blocktype_present = True
if blocktype_present:
break
self.assertTrue(
blocktype_present,
msg="Found no %s command among %s"

@ -72,14 +72,21 @@ def testSampleRegexsFactory(name):
filterConf.getOptions({})
for opt in filterConf.convert():
if opt[2] == "addfailregex":
self.filter.addFailRegex(opt[3])
elif opt[2] == "maxlines":
self.filter.setMaxLines(opt[3])
elif opt[2] == "addignoreregex":
self.filter.addIgnoreRegex(opt[3])
elif opt[2] == "datepattern":
self.filter.setDatePattern(opt[3])
if opt[0] == 'multi-set':
optval = opt[3]
elif opt[0] == 'set':
optval = [opt[3]]
else:
continue
for optval in optval:
if opt[2] == "addfailregex":
self.filter.addFailRegex(optval)
elif opt[2] == "addignoreregex":
self.filter.addIgnoreRegex(optval)
elif opt[2] == "maxlines":
self.filter.setMaxLines(optval)
elif opt[2] == "datepattern":
self.filter.setDatePattern(optval)
self.assertTrue(
os.path.isfile(os.path.join(TEST_FILES_DIR, "logs", name)),

@ -1024,7 +1024,10 @@ class ServerConfigReaderTests(LogCaptureTestCase):
cmd[3] = os.path.join(TEST_FILES_DIR, 'logs', cmd[1])
# add dummy regex to prevent too long compile of all regexp (we don't use it in this test at all):
# [todo sebres] remove `not hasattr(unittest, 'F2B') or `, after merge with "f2b-perfom-prepare-716" ...
elif (not hasattr(unittest, 'F2B') or unittest.F2B.fast) and len(cmd) > 3 and cmd[0] == 'set' and cmd[2] == 'addfailregex':
elif (not hasattr(unittest, 'F2B') or unittest.F2B.fast) and (
len(cmd) > 3 and cmd[0] in ('set', 'multi-set') and cmd[2] == 'addfailregex'
):
cmd[0] = "set"
cmd[3] = "DUMMY-REGEX <HOST>"
# command to server, use cmdHandler direct instead of `transm.proceed(cmd)`:
try:
@ -1058,7 +1061,7 @@ class ServerConfigReaderTests(LogCaptureTestCase):
testJailsActions = (
('j-w-iptables-mp', 'iptables-multiport[name=%(__name__)s, bantime="600", port="http,https", protocol="tcp", chain="INPUT"]', {
'ip4': '`iptables ', 'ip6': '`ip6tables ',
'ip4': ('`iptables ',), 'ip6': ('`ip6tables ',),
'start': (
"`iptables -w -N f2b-j-w-iptables-mp`",
"`iptables -w -A f2b-j-w-iptables-mp -j RETURN`",
@ -1095,7 +1098,7 @@ class ServerConfigReaderTests(LogCaptureTestCase):
),
}),
('j-w-iptables-ap', 'iptables-allports[name=%(__name__)s, bantime="600", protocol="tcp", chain="INPUT"]', {
'ip4': '`iptables ', 'ip6': '`ip6tables ',
'ip4': ('`iptables ',), 'ip6': ('`ip6tables ',),
'start': (
"`iptables -w -N f2b-j-w-iptables-ap`",
"`iptables -w -A f2b-j-w-iptables-ap -j RETURN`",
@ -1132,7 +1135,7 @@ class ServerConfigReaderTests(LogCaptureTestCase):
),
}),
('j-w-iptables-ipset', 'iptables-ipset-proto6[name=%(__name__)s, bantime="600", port="http", protocol="tcp", chain="INPUT"]', {
'ip4': ' f2b-j-w-iptables-ipset ', 'ip6': ' f2b-j-w-iptables-ipset6 ',
'ip4': (' f2b-j-w-iptables-ipset ',), 'ip6': (' f2b-j-w-iptables-ipset6 ',),
'start': (
"`ipset create f2b-j-w-iptables-ipset hash:ip timeout 600`",
"`iptables -w -I INPUT -p tcp -m multiport --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject-with icmp-port-unreachable`",
@ -1196,22 +1199,22 @@ class ServerConfigReaderTests(LogCaptureTestCase):
logSys.debug('# === ban-ipv4 ==='); self.pruneLog()
action.ban({'ip': IPAddr('192.0.2.1')})
self.assertLogged(*tests['ip4-check']+tests['ip4-ban'], all=True)
self.assertNotLogged(tests['ip6'])
self.assertNotLogged(*tests['ip6'], all=True)
# test unban ip4 :
logSys.debug('# === unban ipv4 ==='); self.pruneLog()
action.unban({'ip': IPAddr('192.0.2.1')})
self.assertLogged(*tests['ip4-check']+tests['ip4-unban'], all=True)
self.assertNotLogged(tests['ip6'])
self.assertNotLogged(*tests['ip6'], all=True)
# test ban ip6 :
logSys.debug('# === ban ipv6 ==='); self.pruneLog()
action.ban({'ip': IPAddr('2001:DB8::')})
self.assertLogged(*tests['ip6-check']+tests['ip6-ban'], all=True)
self.assertNotLogged(tests['ip4'])
self.assertNotLogged(*tests['ip4'], all=True)
# test unban ip6 :
logSys.debug('# === unban ipv6 ==='); self.pruneLog()
action.unban({'ip': IPAddr('2001:DB8::')})
self.assertLogged(*tests['ip6-check']+tests['ip6-unban'], all=True)
self.assertNotLogged(tests['ip4'])
self.assertNotLogged(*tests['ip4'], all=True)
# test stop :
logSys.debug('# === stop ==='); self.pruneLog()
action.stop()

@ -335,7 +335,7 @@ class LogCaptureTestCase(unittest.TestCase):
----------
s : string or list/set/tuple of strings
Test should succeed if string (or any of the listed) is present in the log
all : boolean, should find all in s
all : boolean (default False) if True should fail if any of s not logged
"""
logged = self._log.getvalue()
if not kwargs.get('all', False):
@ -343,16 +343,15 @@ class LogCaptureTestCase(unittest.TestCase):
for s_ in s:
if s_ in logged:
return
# pragma: no cover
self.fail("None among %r was found in the log: ===\n%s===" % (s, logged))
if True: # pragma: no cover
self.fail("None among %r was found in the log: ===\n%s===" % (s, logged))
else:
# each entry should be found:
for s_ in s:
if s_ not in logged:
# pragma: no cover
if s_ not in logged: # pragma: no cover
self.fail("%r was not found in the log: ===\n%s===" % (s_, logged))
def assertNotLogged(self, *s):
def assertNotLogged(self, *s, **kwargs):
"""Assert that strings were not logged
Parameters
@ -360,13 +359,19 @@ class LogCaptureTestCase(unittest.TestCase):
s : string or list/set/tuple of strings
Test should succeed if the string (or at least one of the listed) is not
present in the log
all : boolean (default False) if True should fail if any of s logged
"""
logged = self._log.getvalue()
for s_ in s:
if s_ not in logged:
return
# pragma: no cover
self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged))
if not kwargs.get('all', False):
for s_ in s:
if s_ not in logged:
return
if True: # pragma: no cover
self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged))
else:
for s_ in s:
if s_ in logged: # pragma: no cover
self.fail("%r was found in the log: ===\n%s===" % (s_, logged))
def pruneLog(self):
self._log.truncate(0)

Loading…
Cancel
Save