pull/3539/head
sebres 2023-06-16 15:23:41 +02:00
parent e2c4982417
commit 03d7c92ae8
49 changed files with 285 additions and 287 deletions

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :
#

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :
"""Script to run Fail2Ban tests battery

View File

@ -89,11 +89,11 @@ class ActionReader(DefinitionInitConfigReader):
stream = list()
stream.append(head + ["addaction", self._name])
multi = []
for opt, optval in opts.iteritems():
for opt, optval in opts.items():
if opt in self._configOpts and not opt.startswith('known/'):
multi.append([opt, optval])
if self._initOpts:
for opt, optval in self._initOpts.iteritems():
for opt, optval in self._initOpts.items():
if opt not in self._configOpts and not opt.startswith('known/'):
multi.append([opt, optval])
if len(multi) > 1:

View File

@ -62,7 +62,7 @@ if sys.version_info >= (3,): # pragma: 2.x no cover
parser, option, accum, rest, section, map, *args, **kwargs)
else: # pragma: 3.x no cover
from ConfigParser import SafeConfigParser, \
from configparser import SafeConfigParser, \
InterpolationMissingOptionError, NoOptionError, NoSectionError
# Interpolate missing known/option as option from default section
@ -327,7 +327,7 @@ after = 1.conf
# mix it with defaults:
return set(opts.keys()) | set(self._defaults)
# only own option names:
return opts.keys()
return list(opts.keys())
def read(self, filenames, get_includes=True):
if not isinstance(filenames, list):
@ -356,7 +356,7 @@ after = 1.conf
ret += i
# merge defaults and all sections to self:
alld.update(cfg.get_defaults())
for n, s in cfg.get_sections().iteritems():
for n, s in cfg.get_sections().items():
# conditional sections
cond = SafeConfigParserWithIncludes.CONDITIONAL_RE.match(n)
if cond:
@ -366,14 +366,14 @@ after = 1.conf
del(s['__name__'])
except KeyError:
pass
for k in s.keys():
for k in list(s.keys()):
v = s.pop(k)
s[k + cond] = v
s2 = alls.get(n)
if isinstance(s2, dict):
# save previous known values, for possible using in local interpolations later:
self.merge_section('KNOWN/'+n,
dict(filter(lambda i: i[0] in s, s2.iteritems())), '')
dict([i for i in iter(s2.items()) if i[0] in s]), '')
# merge section
s2.update(s)
else:
@ -400,7 +400,7 @@ after = 1.conf
sec.update(options)
return
sk = {}
for k, v in options.iteritems():
for k, v in options.items():
if not k.startswith(pref) and k != '__name__':
sk[pref+k] = v
sec.update(sk)

View File

@ -26,7 +26,7 @@ __license__ = "GPL"
import glob
import os
from ConfigParser import NoOptionError, NoSectionError
from configparser import NoOptionError, NoSectionError
from .configparserinc import sys, SafeConfigParserWithIncludes, logLevel
from ..helpers import getLogger, _as_bool, _merge_dicts, substituteRecursiveTags
@ -221,7 +221,7 @@ class ConfigReaderUnshared(SafeConfigParserWithIncludes):
config_files += sorted(glob.glob('%s/*.local' % config_dir))
# choose only existing ones
config_files = filter(os.path.exists, config_files)
config_files = list(filter(os.path.exists, config_files))
if len(config_files):
# at least one config exists and accessible

View File

@ -47,7 +47,7 @@ class CSocket:
def send(self, msg, nonblocking=False, timeout=None):
# Convert every list member to string
obj = dumps(map(CSocket.convert, msg), HIGHEST_PROTOCOL)
obj = dumps(list(map(CSocket.convert, msg)), HIGHEST_PROTOCOL)
self.__csock.send(obj)
self.__csock.send(CSPROTO.END)
return self.receive(self.__csock, nonblocking, timeout)
@ -72,7 +72,7 @@ class CSocket:
@staticmethod
def convert(m):
"""Convert every "unexpected" member of message to string"""
if isinstance(m, (basestring, bool, int, float, list, dict, set)):
if isinstance(m, (str, bool, int, float, list, dict, set)):
return m
else: # pragma: no cover
return str(m)

View File

@ -45,7 +45,7 @@ def _thread_name():
return threading.current_thread().__class__.__name__
def input_command(): # pragma: no cover
return raw_input(PROMPT)
return input(PROMPT)
##
#
@ -456,7 +456,7 @@ class Fail2banClient(Fail2banCmdLine, Thread):
return False
finally:
self._alive = False
for s, sh in _prev_signals.iteritems():
for s, sh in _prev_signals.items():
signal.signal(s, sh)

View File

@ -40,10 +40,10 @@ import os
import shlex
import sys
import time
import urllib
import urllib.request, urllib.parse, urllib.error
from optparse import OptionParser, Option
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
from configparser import NoOptionError, NoSectionError, MissingSectionHeaderError
try: # pragma: no cover
from ..server.filtersystemd import FilterSystemd
@ -67,7 +67,7 @@ def debuggexURL(sample, regex, multiline=False, useDns="yes"):
'flavor': 'python'
}
if multiline: args['flags'] = 'm'
return 'https://www.debuggex.com/?' + urllib.urlencode(args)
return 'https://www.debuggex.com/?' + urllib.parse.urlencode(args)
def output(args): # pragma: no cover (overriden in test-cases)
print(args)
@ -246,7 +246,7 @@ class Fail2banRegex(object):
def __init__(self, opts):
# set local protected members from given options:
self.__dict__.update(dict(('_'+o,v) for o,v in opts.__dict__.iteritems()))
self.__dict__.update(dict(('_'+o,v) for o,v in opts.__dict__.items()))
self._opts = opts
self._maxlines_set = False # so we allow to override maxlines in cmdline
self._datepattern_set = False
@ -313,7 +313,7 @@ class Fail2banRegex(object):
realopts = {}
combopts = reader.getCombined()
# output all options that are specified in filter-argument as well as some special (mostly interested):
for k in ['logtype', 'datepattern'] + fltOpt.keys():
for k in ['logtype', 'datepattern'] + list(fltOpt.keys()):
# combined options win, but they contain only a sub-set in filter expected keys,
# so get the rest from definition section:
try:
@ -440,7 +440,7 @@ class Fail2banRegex(object):
self.output( "Use %11s line : %s" % (regex, shortstr(value)) )
regex_values = {regextype: [RegexStat(value)]}
for regextype, regex_values in regex_values.iteritems():
for regextype, regex_values in regex_values.items():
regex = regextype + 'regex'
setattr(self, "_" + regex, regex_values)
for regex in regex_values:
@ -532,13 +532,13 @@ class Fail2banRegex(object):
def _out(ret):
for r in ret:
for r in r[3].get('matches'):
if not isinstance(r, basestring):
if not isinstance(r, str):
r = ''.join(r for r in r)
output(r)
elif ofmt == 'row':
def _out(ret):
for r in ret:
output('[%r,\t%r,\t%r],' % (r[1],r[2],dict((k,v) for k, v in r[3].iteritems() if k != 'matches')))
output('[%r,\t%r,\t%r],' % (r[1],r[2],dict((k,v) for k, v in r[3].items() if k != 'matches')))
elif '<' not in ofmt:
def _out(ret):
for r in ret:
@ -573,7 +573,7 @@ class Fail2banRegex(object):
# wrap multiline tag (msg) interpolations to single line:
for r, v in rows:
for r in r[3].get('matches'):
if not isinstance(r, basestring):
if not isinstance(r, str):
r = ''.join(r for r in r)
r = v.replace("\x00msg\x00", r)
output(r)
@ -639,9 +639,9 @@ class Fail2banRegex(object):
ans = [[]]
for arg in [l, regexlist]:
ans = [ x + [y] for x in ans for y in arg ]
b = map(lambda a: a[0] + ' | ' + a[1].getFailRegex() + ' | ' +
b = [a[0] + ' | ' + a[1].getFailRegex() + ' | ' +
debuggexURL(self.encode_line(a[0]), a[1].getFailRegex(),
multiline, self._opts.usedns), ans)
multiline, self._opts.usedns) for a in ans]
pprint_list([x.rstrip() for x in b], header)
else:
output( "%s too many to print. Use --print-all-%s " \

View File

@ -71,7 +71,7 @@ class FilterReader(DefinitionInitConfigReader):
@staticmethod
def _fillStream(stream, opts, jailName):
prio0idx = 0
for opt, value in opts.iteritems():
for opt, value in opts.items():
# Do not send a command if the value is not set (empty).
if value is None: continue
if opt in ("failregex", "ignoreregex"):

View File

@ -117,7 +117,7 @@ class JailReader(ConfigReader):
}
_configOpts.update(FilterReader._configOpts)
_ignoreOpts = set(['action', 'filter', 'enabled'] + FilterReader._configOpts.keys())
_ignoreOpts = set(['action', 'filter', 'enabled'] + list(FilterReader._configOpts.keys()))
def getOptions(self):
@ -240,7 +240,7 @@ class JailReader(ConfigReader):
stream.extend(self.__filter.convert())
# and using options from jail:
FilterReader._fillStream(stream, self.__opts, self.__name)
for opt, value in self.__opts.iteritems():
for opt, value in self.__opts.items():
if opt == "logpath":
if self.__opts.get('backend', '').startswith("systemd"): continue
found_files = 0

View File

@ -31,6 +31,7 @@ import traceback
from threading import Lock
from .server.mytime import MyTime
import importlib
try:
import ctypes
@ -63,7 +64,7 @@ if sys.version_info < (3,): # pragma: 3.x no cover
from imp import load_dynamic as __ldm
_sys = __ldm('_sys', 'sys')
except ImportError: # pragma: no cover - only if load_dynamic fails
reload(sys)
importlib.reload(sys)
_sys = sys
if hasattr(_sys, "setdefaultencoding"):
_sys.setdefaultencoding(encoding)
@ -103,7 +104,7 @@ if sys.version_info >= (3,): # pragma: 2.x no cover
else: # pragma: 3.x no cover
def uni_decode(x, enc=PREFER_ENC, errors='strict'):
try:
if isinstance(x, unicode):
if isinstance(x, str):
return x.encode(enc, errors)
return x
except (UnicodeDecodeError, UnicodeEncodeError): # pragma: no cover - unsure if reachable
@ -112,7 +113,7 @@ else: # pragma: 3.x no cover
return x.encode(enc, 'replace')
if sys.getdefaultencoding().upper() != 'UTF-8': # pragma: no cover - utf-8 is default encoding now
def uni_string(x):
if not isinstance(x, unicode):
if not isinstance(x, str):
return str(x)
return x.encode(PREFER_ENC, 'replace')
else:
@ -121,7 +122,7 @@ else: # pragma: 3.x no cover
def _as_bool(val):
return bool(val) if not isinstance(val, basestring) \
return bool(val) if not isinstance(val, str) \
else val.lower() in ('1', 'on', 'true', 'yes')
@ -330,7 +331,7 @@ def splitwords(s):
"""
if not s:
return []
return filter(bool, map(lambda v: v.strip(), re.split('[ ,\n]+', s)))
return list(filter(bool, [v.strip() for v in re.split('[ ,\n]+', s)]))
if sys.version_info >= (3,5):
eval(compile(r'''if 1:
@ -447,7 +448,7 @@ def substituteRecursiveTags(inptags, conditional='',
while True:
repFlag = False
# substitute each value:
for tag in tags.iterkeys():
for tag in tags.keys():
# ignore escaped or already done (or in ignore list):
if tag in ignore or tag in done: continue
# ignore replacing callable items from calling map - should be converted on demand only (by get):
@ -487,7 +488,7 @@ def substituteRecursiveTags(inptags, conditional='',
m = tre_search(value, m.end())
continue
# if calling map - be sure we've string:
if not isinstance(repl, basestring): repl = uni_string(repl)
if not isinstance(repl, str): repl = uni_string(repl)
value = value.replace('<%s>' % rtag, repl)
#logSys.log(5, 'value now: %s' % value)
# increment reference count:

View File

@ -114,9 +114,9 @@ class CallingMap(MutableMapping, object):
def _asdict(self, calculated=False, checker=None):
d = dict(self.data, **self.storage)
if not calculated:
return dict((n,v) for n,v in d.iteritems() \
return dict((n,v) for n,v in d.items() \
if not callable(v) or n in self.CM_REPR_ITEMS)
for n,v in d.items():
for n,v in list(d.items()):
if callable(v):
try:
# calculate:
@ -182,7 +182,7 @@ class CallingMap(MutableMapping, object):
return self.__class__(_merge_copy_dicts(self.data, self.storage))
class ActionBase(object):
class ActionBase(object, metaclass=ABCMeta):
"""An abstract base class for actions in Fail2Ban.
Action Base is a base definition of what methods need to be in
@ -212,7 +212,6 @@ class ActionBase(object):
Any additional arguments specified in `jail.conf` or passed
via `fail2ban-client` will be passed as keyword arguments.
"""
__metaclass__ = ABCMeta
@classmethod
def __subclasshook__(cls, C):
@ -423,7 +422,7 @@ class CommandAction(ActionBase):
if not callable(family): # pragma: no cover
return self.__substCache.get(key, {}).get(family)
# family as expression - use it to filter values:
return [v for f, v in self.__substCache.get(key, {}).iteritems() if family(f)]
return [v for f, v in self.__substCache.get(key, {}).items() if family(f)]
cmd = args[0]
if cmd: # set:
try:
@ -435,7 +434,7 @@ class CommandAction(ActionBase):
try:
famd = self.__substCache[key]
cmd = famd.pop(family)
for family, v in famd.items():
for family, v in list(famd.items()):
if v == cmd:
del famd[family]
except KeyError: # pragma: no cover
@ -451,7 +450,7 @@ class CommandAction(ActionBase):
res = True
err = 'Script error'
if not family: # all started:
family = [famoper for (famoper,v) in self.__started.iteritems() if v]
family = [famoper for (famoper,v) in self.__started.items() if v]
for famoper in family:
try:
cmd = self._getOperation(tag, famoper)
@ -631,7 +630,7 @@ class CommandAction(ActionBase):
and executes the resulting command.
"""
# collect started families, may be started on demand (conditional):
family = [f for (f,v) in self.__started.iteritems() if v & 3 == 3]; # started and contains items
family = [f for (f,v) in self.__started.items() if v & 3 == 3]; # started and contains items
# if nothing contains items:
if not family: return True
# flush:
@ -656,7 +655,7 @@ class CommandAction(ActionBase):
"""
# collect started families, if started on demand (conditional):
if family is None:
family = [f for (f,v) in self.__started.iteritems() if v]
family = [f for (f,v) in self.__started.items() if v]
# if no started (on demand) actions:
if not family: return True
self.__started = {}
@ -690,7 +689,7 @@ class CommandAction(ActionBase):
ret = True
# for each started family:
if self.actioncheck:
for (family, started) in self.__started.items():
for (family, started) in list(self.__started.items()):
if started and not self._invariantCheck(family, beforeRepair):
# reset started flag and command of executed operation:
self.__started[family] = 0

View File

@ -156,11 +156,11 @@ class Actions(JailThread, Mapping):
else:
if hasattr(self, '_reload_actions'):
# reload actions after all parameters set via stream:
for name, initOpts in self._reload_actions.iteritems():
for name, initOpts in self._reload_actions.items():
if name in self._actions:
self._actions[name].reload(**(initOpts if initOpts else {}))
# remove obsolete actions (untouched by reload process):
delacts = OrderedDict((name, action) for name, action in self._actions.iteritems()
delacts = OrderedDict((name, action) for name, action in self._actions.items()
if name not in self._reload_actions)
if len(delacts):
# unban all tickets using removed actions only:
@ -217,7 +217,7 @@ class Actions(JailThread, Mapping):
return lst
if len(ids) == 1:
return 1 if ids[0] in lst else 0
return map(lambda ip: 1 if ip in lst else 0, ids)
return [1 if ip in lst else 0 for ip in ids]
def getBanList(self, withTime=False):
"""Returns the list of banned IP addresses.
@ -288,7 +288,7 @@ class Actions(JailThread, Mapping):
if not isinstance(ip, IPAddr):
ipa = IPAddr(ip)
if not ipa.isSingle: # subnet (mask/cidr) or raw (may be dns/hostname):
ips = filter(ipa.contains, self.banManager.getBanList())
ips = list(filter(ipa.contains, self.banManager.getBanList()))
if ips:
return self.removeBannedIP(ips, db, ifexists)
# not found:
@ -305,7 +305,7 @@ class Actions(JailThread, Mapping):
"""
if actions is None:
actions = self._actions
for name, action in reversed(actions.items()):
for name, action in reversed(list(actions.items())):
try:
action.stop()
except Exception as e:
@ -328,7 +328,7 @@ class Actions(JailThread, Mapping):
True when the thread exits nicely.
"""
cnt = 0
for name, action in self._actions.iteritems():
for name, action in self._actions.items():
try:
action.start()
except Exception as e:
@ -505,7 +505,7 @@ class Actions(JailThread, Mapping):
Observers.Main.add('banFound', bTicket, self._jail, btime)
logSys.notice("[%s] %sBan %s", self._jail.name, ('' if not bTicket.restored else 'Restore '), ip)
# do actions :
for name, action in self._actions.iteritems():
for name, action in self._actions.items():
try:
if bTicket.restored and getattr(action, 'norestored', False):
continue
@ -543,13 +543,13 @@ class Actions(JailThread, Mapping):
# avoid too often checks:
if not rebanacts and MyTime.time() > self.__lastConsistencyCheckTM + 3:
self.__lastConsistencyCheckTM = MyTime.time()
for action in self._actions.itervalues():
for action in self._actions.values():
if hasattr(action, 'consistencyCheck'):
action.consistencyCheck()
# check epoch in order to reban it:
if bTicket.banEpoch < self.banEpoch:
if not rebanacts: rebanacts = dict(
(name, action) for name, action in self._actions.iteritems()
(name, action) for name, action in self._actions.items()
if action.banEpoch > bTicket.banEpoch)
cnt += self.__reBan(bTicket, actions=rebanacts)
else: # pragma: no cover - unexpected: ticket is not banned for some reasons - reban using all actions:
@ -576,8 +576,8 @@ class Actions(JailThread, Mapping):
ip = ticket.getID()
aInfo = self._getActionInfo(ticket)
if log:
logSys.notice("[%s] Reban %s%s", self._jail.name, ip, (', action %r' % actions.keys()[0] if len(actions) == 1 else ''))
for name, action in actions.iteritems():
logSys.notice("[%s] Reban %s%s", self._jail.name, ip, (', action %r' % list(actions.keys())[0] if len(actions) == 1 else ''))
for name, action in actions.items():
try:
logSys.debug("[%s] action %r: reban %s", self._jail.name, name, ip)
if not aInfo.immutable: aInfo.reset()
@ -601,7 +601,7 @@ class Actions(JailThread, Mapping):
if not self.banManager._inBanList(ticket): return
# do actions :
aInfo = None
for name, action in self._actions.iteritems():
for name, action in self._actions.items():
try:
if ticket.restored and getattr(action, 'norestored', False):
continue
@ -650,7 +650,7 @@ class Actions(JailThread, Mapping):
cnt = 0
# first we'll execute flush for actions supporting this operation:
unbactions = {}
for name, action in (actions if actions is not None else self._actions).iteritems():
for name, action in (actions if actions is not None else self._actions).items():
try:
if hasattr(action, 'flush') and (not isinstance(action, CommandAction) or action.actionflush):
logSys.notice("[%s] Flush ticket(s) with %s", self._jail.name, name)
@ -705,7 +705,7 @@ class Actions(JailThread, Mapping):
aInfo = self._getActionInfo(ticket)
if log:
logSys.notice("[%s] Unban %s", self._jail.name, ip)
for name, action in unbactions.iteritems():
for name, action in unbactions.items():
try:
logSys.debug("[%s] action %r: unban %s", self._jail.name, name, ip)
if not aInfo.immutable: aInfo.reset()

View File

@ -178,7 +178,7 @@ def loop(active, timeout=None, use_poll=False, err_count=None):
elif err_count['listen'] > 100: # pragma: no cover - normally unreachable
if (
e.args[0] == errno.EMFILE # [Errno 24] Too many open files
or sum(err_count.itervalues()) > 1000
or sum(err_count.values()) > 1000
):
logSys.critical("Too many errors - critical count reached %r", err_count)
break
@ -220,7 +220,7 @@ class AsyncServer(asyncore.dispatcher):
elif self.__errCount['accept'] > 100:
if (
(isinstance(e, socket.error) and e.args[0] == errno.EMFILE) # [Errno 24] Too many open files
or sum(self.__errCount.itervalues()) > 1000
or sum(self.__errCount.values()) > 1000
):
logSys.critical("Too many errors - critical count reached %r", self.__errCount)
self.stop()

View File

@ -103,7 +103,7 @@ class BanManager:
return list(self.__banList.keys())
with self.__lock:
lst = []
for ticket in self.__banList.itervalues():
for ticket in self.__banList.values():
eob = ticket.getEndOfBanTime(self.__banTime)
lst.append((ticket,eob))
lst.sort(key=lambda t: t[1])
@ -161,7 +161,7 @@ class BanManager:
return return_dict
# get ips in lock:
with self.__lock:
banIPs = [banData.getIP() for banData in self.__banList.values()]
banIPs = [banData.getIP() for banData in list(self.__banList.values())]
# get cymru info:
try:
for ip in banIPs:
@ -333,7 +333,7 @@ class BanManager:
# Gets the list of ticket to remove (thereby correct next unban time).
unBanList = {}
nextUnbanTime = BanTicket.MAX_TIME
for fid,ticket in self.__banList.iteritems():
for fid,ticket in self.__banList.items():
# current time greater as end of ban - timed out:
eob = ticket.getEndOfBanTime(self.__banTime)
if time > eob:
@ -349,15 +349,15 @@ class BanManager:
if len(unBanList):
if len(unBanList) / 2.0 <= len(self.__banList) / 3.0:
# few as 2/3 should be removed - remove particular items:
for fid in unBanList.iterkeys():
for fid in unBanList.keys():
del self.__banList[fid]
else:
# create new dictionary without items to be deleted:
self.__banList = dict((fid,ticket) for fid,ticket in self.__banList.iteritems() \
self.__banList = dict((fid,ticket) for fid,ticket in self.__banList.items() \
if fid not in unBanList)
# return list of tickets:
return unBanList.values()
return list(unBanList.values())
##
# Flush the ban list.
@ -367,7 +367,7 @@ class BanManager:
def flushBanList(self):
with self.__lock:
uBList = self.__banList.values()
uBList = list(self.__banList.values())
self.__banList = dict()
return uBList

View File

@ -67,13 +67,13 @@ if sys.version_info >= (3,): # pragma: 2.x no cover
else: # pragma: 3.x no cover
def _normalize(x):
if isinstance(x, dict):
return dict((_normalize(k), _normalize(v)) for k, v in x.iteritems())
return dict((_normalize(k), _normalize(v)) for k, v in x.items())
elif isinstance(x, (list, set)):
return [_normalize(element) for element in x]
elif isinstance(x, unicode):
elif isinstance(x, str):
# in 2.x default text_factory is unicode - so return proper unicode here:
return x.encode(PREFER_ENC, 'replace').decode(PREFER_ENC)
elif isinstance(x, basestring):
elif isinstance(x, str):
return x.decode(PREFER_ENC, 'replace')
return x

View File

@ -55,7 +55,7 @@ class FailManager:
def getFailCount(self):
# may be slow on large list of failures, should be used for test purposes only...
with self.__lock:
return len(self.__failList), sum([f.getRetry() for f in self.__failList.values()])
return len(self.__failList), sum([f.getRetry() for f in list(self.__failList.values())])
def setMaxRetry(self, value):
self.__maxRetry = value
@ -116,7 +116,7 @@ class FailManager:
# in case of having many active failures, it should be ran only
# if debug level is "low" enough
failures_summary = ', '.join(['%s:%d' % (k, v.getRetry())
for k,v in self.__failList.iteritems()])
for k,v in self.__failList.items()])
logSys.log(logLevel, "Total # of detected failures: %d. Current failures from %d IPs (IP:count): %s"
% (self.__failTotal, len(self.__failList), failures_summary))
@ -129,7 +129,7 @@ class FailManager:
def cleanup(self, time):
time -= self.__maxTime
with self.__lock:
todelete = [fid for fid,item in self.__failList.iteritems() \
todelete = [fid for fid,item in self.__failList.items() \
if item.getTime() <= time]
if len(todelete) == len(self.__failList):
# remove all:
@ -143,7 +143,7 @@ class FailManager:
del self.__failList[fid]
else:
# create new dictionary without items to be deleted:
self.__failList = dict((fid,item) for fid,item in self.__failList.iteritems() \
self.__failList = dict((fid,item) for fid,item in self.__failList.items() \
if item.getTime() > time)
self.__bgSvc.service()

View File

@ -142,9 +142,7 @@ class Regex:
self._regex = regex
self._altValues = []
self._tupleValues = []
for k in filter(
lambda k: len(k) > len(COMPLNAME_PRE[0]), self._regexObj.groupindex
):
for k in [k for k in self._regexObj.groupindex if len(k) > len(COMPLNAME_PRE[0])]:
n = COMPLNAME_CRE.match(k)
if n:
g, n = n.group(1), mapTag2Opt(n.group(2))
@ -234,7 +232,7 @@ class Regex:
#
@staticmethod
def _tupleLinesBuf(tupleLines):
return "\n".join(map(lambda v: "".join(v[::2]), tupleLines)) + "\n"
return "\n".join(["".join(v[::2]) for v in tupleLines]) + "\n"
##
# Searches the regular expression.
@ -246,7 +244,7 @@ class Regex:
def search(self, tupleLines, orgLines=None):
buf = tupleLines
if not isinstance(tupleLines, basestring):
if not isinstance(tupleLines, str):
buf = Regex._tupleLinesBuf(tupleLines)
self._matchCache = self._regexObj.search(buf)
if self._matchCache:

View File

@ -307,7 +307,7 @@ class Filter(JailThread):
dd = DateDetector()
dd.default_tz = self.__logtimezone
if not isinstance(pattern, (list, tuple)):
pattern = filter(bool, map(str.strip, re.split('\n+', pattern)))
pattern = list(filter(bool, list(map(str.strip, re.split('\n+', pattern)))))
for pattern in pattern:
dd.appendTemplate(pattern)
self.dateDetector = dd
@ -800,7 +800,7 @@ class Filter(JailThread):
if (nfflgs & 4) == 0 and not mlfidGroups.get('mlfpending', 0):
mlfidGroups.pop("matches", None)
# overwrite multi-line failure with all values, available in fail:
mlfidGroups.update(((k,v) for k,v in fail.iteritems() if v is not None))
mlfidGroups.update(((k,v) for k,v in fail.items() if v is not None))
# new merged failure data:
fail = mlfidGroups
# if forget (disconnect/reset) - remove cached entry:
@ -1045,7 +1045,7 @@ class FileFilter(Filter):
# @return log paths
def getLogPaths(self):
return self.__logs.keys()
return list(self.__logs.keys())
##
# Get the log containers
@ -1053,7 +1053,7 @@ class FileFilter(Filter):
# @return log containers
def getLogs(self):
return self.__logs.values()
return list(self.__logs.values())
##
# Get the count of log containers
@ -1079,7 +1079,7 @@ class FileFilter(Filter):
def setLogEncoding(self, encoding):
encoding = super(FileFilter, self).setLogEncoding(encoding)
for log in self.__logs.itervalues():
for log in self.__logs.values():
log.setEncoding(encoding)
def getLog(self, path):
@ -1255,7 +1255,7 @@ class FileFilter(Filter):
"""Status of Filter plus files being monitored.
"""
ret = super(FileFilter, self).status(flavor=flavor)
path = self.__logs.keys()
path = list(self.__logs.keys())
ret.append(("File list", path))
return ret
@ -1277,7 +1277,7 @@ class FileFilter(Filter):
if self._pendDBUpdates and self.jail.database:
self._updateDBPending()
# stop files monitoring:
for path in self.__logs.keys():
for path in list(self.__logs.keys()):
self.delLogPath(path)
def stop(self):
@ -1530,7 +1530,7 @@ class FileContainer:
def __iter__(self):
return self
def next(self):
def __next__(self):
line = self.readline()
if line is None:
self.close()

View File

@ -173,4 +173,4 @@ class FilterPoll(FileFilter):
return False
def getPendingPaths(self):
return self.__file404Cnt.keys()
return list(self.__file404Cnt.keys())

View File

@ -155,7 +155,7 @@ class FilterPyinotify(FileFilter):
except KeyError: pass
def getPendingPaths(self):
return self.__pending.keys()
return list(self.__pending.keys())
def _checkPending(self):
if not self.__pending:
@ -181,7 +181,7 @@ class FilterPyinotify(FileFilter):
self.__pendingChkTime = time.time()
self.__pendingMinTime = minTime
# process now because we've missed it in monitoring:
for path, isDir in found.iteritems():
for path, isDir in found.items():
self._delPending(path)
# refresh monitoring of this:
if isDir is not None:

View File

@ -253,7 +253,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
return ((logline[:0], date[0] + ' ', logline.replace('\n', '\\n')), date[1])
def seekToTime(self, date):
if isinstance(date, (int, long)):
if isinstance(date, int):
date = float(date)
self.__journal.seek_realtime(date)

View File

@ -370,7 +370,7 @@ class IPAddr(object):
s[1] = IPAddr.masktoplen(s[2])
del s[2]
try:
s[1] = long(s[1])
s[1] = int(s[1])
except ValueError:
return ipstr, IPAddr.CIDR_UNSPEC
return s
@ -406,7 +406,7 @@ class IPAddr(object):
# mask out host portion if prefix length is supplied
if cidr is not None and cidr >= 0:
mask = ~(0xFFFFFFFFL >> cidr)
mask = ~(0xFFFFFFFF >> cidr)
self._addr &= mask
self._plen = cidr
@ -418,13 +418,13 @@ class IPAddr(object):
# mask out host portion if prefix length is supplied
if cidr is not None and cidr >= 0:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> cidr)
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF >> cidr)
self._addr &= mask
self._plen = cidr
# if IPv6 address is a IPv4-compatible, make instance a IPv4
elif self.isInNet(IPAddr.IP6_4COMPAT):
self._addr = lo & 0xFFFFFFFFL
self._addr = lo & 0xFFFFFFFF
self._family = socket.AF_INET
self._plen = 32
else:
@ -434,7 +434,7 @@ class IPAddr(object):
return repr(self.ntoa)
def __str__(self):
return self.ntoa if isinstance(self.ntoa, basestring) else str(self.ntoa)
return self.ntoa if isinstance(self.ntoa, str) else str(self.ntoa)
def __reduce__(self):
"""IPAddr pickle-handler, that simply wraps IPAddr to the str
@ -548,7 +548,7 @@ class IPAddr(object):
elif self.isIPv6:
# convert network to host byte order
hi = self._addr >> 64
lo = self._addr & 0xFFFFFFFFFFFFFFFFL
lo = self._addr & 0xFFFFFFFFFFFFFFFF
binary = struct.pack("!QQ", hi, lo)
if self._plen and self._plen < 128:
add = "/%d" % self._plen
@ -606,9 +606,9 @@ class IPAddr(object):
if self.family != net.family:
return False
if self.isIPv4:
mask = ~(0xFFFFFFFFL >> net.plen)
mask = ~(0xFFFFFFFF >> net.plen)
elif self.isIPv6:
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL >> net.plen)
mask = ~(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF >> net.plen)
else:
return False
@ -628,7 +628,7 @@ class IPAddr(object):
m4 = (1 << 32)-1
mmap = {m6: 128, m4: 32, 0: 0}
m = 0
for i in xrange(0, 128):
for i in range(0, 128):
m |= 1 << i
if i < 32:
mmap[m ^ m4] = 32-1-i

View File

@ -26,7 +26,7 @@ __license__ = "GPL"
import logging
import math
import random
import Queue
import queue
from .actions import Actions
from ..helpers import getLogger, _as_bool, extractOptions, MyTime
@ -76,7 +76,7 @@ class Jail(object):
"might not function correctly. Please shorten"
% name)
self.__name = name
self.__queue = Queue.Queue()
self.__queue = queue.Queue()
self.__filter = None
# Extra parameters for increase ban time
self._banExtra = {};
@ -127,25 +127,25 @@ class Jail(object):
"Failed to initialize any backend for Jail %r" % self.name)
def _initPolling(self, **kwargs):
from filterpoll import FilterPoll
from .filterpoll import FilterPoll
logSys.info("Jail '%s' uses poller %r" % (self.name, kwargs))
self.__filter = FilterPoll(self, **kwargs)
def _initGamin(self, **kwargs):
# Try to import gamin
from filtergamin import FilterGamin
from .filtergamin import FilterGamin
logSys.info("Jail '%s' uses Gamin %r" % (self.name, kwargs))
self.__filter = FilterGamin(self, **kwargs)
def _initPyinotify(self, **kwargs):
# Try to import pyinotify
from filterpyinotify import FilterPyinotify
from .filterpyinotify import FilterPyinotify
logSys.info("Jail '%s' uses pyinotify %r" % (self.name, kwargs))
self.__filter = FilterPyinotify(self, **kwargs)
def _initSystemd(self, **kwargs): # pragma: systemd no cover
# Try to import systemd
from filtersystemd import FilterSystemd
from .filtersystemd import FilterSystemd
logSys.info("Jail '%s' uses systemd %r" % (self.name, kwargs))
self.__filter = FilterSystemd(self, **kwargs)
@ -219,7 +219,7 @@ class Jail(object):
try:
ticket = self.__queue.get(False)
return ticket
except Queue.Empty:
except queue.Empty:
return False
def setBanTimeExtra(self, opt, value):

View File

@ -165,7 +165,7 @@ class MyTime:
@returns number (calculated seconds from expression "val")
"""
if isinstance(val, (int, long, float, complex)):
if isinstance(val, (int, float, complex)):
return val
# replace together standing abbreviations, example '1d12h' -> '1d 12h':
val = MyTime._str2sec_prep.sub(r" \1", val)

View File

@ -209,7 +209,7 @@ class Server:
# Restore default signal handlers:
if _thread_name() == '_MainThread':
for s, sh in self.__prev_signals.iteritems():
for s, sh in self.__prev_signals.items():
signal.signal(s, sh)
# Give observer a small chance to complete its work before exit
@ -287,10 +287,10 @@ class Server:
logSys.info("Stopping all jails")
with self.__lock:
# 1st stop all jails (signal and stop actions/filter thread):
for name in self.__jails.keys():
for name in list(self.__jails.keys()):
self.delJail(name, stop=True, join=False)
# 2nd wait for end and delete jails:
for name in self.__jails.keys():
for name in list(self.__jails.keys()):
self.delJail(name, stop=False, join=True)
def clearCaches(self):
@ -328,7 +328,7 @@ class Server:
if "--restart" in opts:
self.stopAllJail()
# first set all affected jail(s) to idle and reset filter regex and other lists/dicts:
for jn, jail in self.__jails.iteritems():
for jn, jail in self.__jails.items():
if name == '--all' or jn == name:
jail.idle = True
self.__reload_state[jn] = jail
@ -339,7 +339,7 @@ class Server:
# end reload, all affected (or new) jails have already all new parameters (via stream) and (re)started:
with self.__lock:
deljails = []
for jn, jail in self.__jails.iteritems():
for jn, jail in self.__jails.items():
# still in reload state:
if jn in self.__reload_state:
# remove jails that are not reloaded (untouched, so not in new configuration)
@ -539,7 +539,7 @@ class Server:
jails = [self.__jails[name]]
else:
# in all jails:
jails = self.__jails.values()
jails = list(self.__jails.values())
# unban given or all (if value is None):
cnt = 0
ifexists |= (name is None)
@ -553,7 +553,7 @@ class Server:
jails = [self.__jails[name]]
else:
# in all jails:
jails = self.__jails.values()
jails = list(self.__jails.values())
# check banned ids:
res = []
if name is None and ids:
@ -603,7 +603,7 @@ class Server:
def isAlive(self, jailnum=None):
if jailnum is not None and len(self.__jails) != jailnum:
return 0
for jail in self.__jails.values():
for jail in list(self.__jails.values()):
if not jail.isAlive():
return 0
return 1
@ -818,7 +818,7 @@ class Server:
return DNSUtils.setIPv6IsAllowed(value)
def setThreadOptions(self, value):
for o, v in value.iteritems():
for o, v in value.items():
if o == 'stacksize':
threading.stack_size(int(v)*1024)
else: # pragma: no cover
@ -942,7 +942,7 @@ class Server:
maxfd = os.sysconf("SC_OPEN_MAX")
except (AttributeError, ValueError):
maxfd = 256 # default maximum
fdlist = xrange(maxfd+1)
fdlist = range(maxfd+1)
# urandom should not be closed in Python 3.4.0. Fixed in 3.4.1
# http://bugs.python.org/issue21207

View File

@ -99,7 +99,7 @@ def _updateTimeRE():
if len(exprset) > 1 else "".join(exprset)
exprset = set( cent(now[0].year + i) for i in (-1, distance) )
if len(now) > 1 and now[1]:
exprset |= set( cent(now[1].year + i) for i in xrange(-1, now[0].year-now[1].year+1, distance) )
exprset |= set( cent(now[1].year + i) for i in range(-1, now[0].year-now[1].year+1, distance) )
return grp(sorted(list(exprset)))
# more precise year patterns, within same century of last year and
@ -116,7 +116,7 @@ def _updateTimeRE():
_updateTimeRE()
def getTimePatternRE():
keys = timeRE.keys()
keys = list(timeRE.keys())
patt = (r"%%(%%|%s|[%s])" % (
"|".join([k for k in keys if len(k) > 1]),
"".join([k for k in keys if len(k) == 1]),
@ -171,7 +171,7 @@ def zone2offset(tz, dt):
"""
if isinstance(tz, int):
return tz
if isinstance(tz, basestring):
if isinstance(tz, str):
return validateTimeZone(tz)
tz, tzo = tz
if tzo is None or tzo == '': # without offset
@ -208,7 +208,7 @@ def reGroupDictStrptime(found_dict, msec=False, default_tz=None):
year = month = day = tzoffset = \
weekday = julian = week_of_year = None
hour = minute = second = fraction = 0
for key, val in found_dict.iteritems():
for key, val in found_dict.items():
if val is None: continue
# Directives not explicitly handled below:
# c, x, X

View File

@ -55,7 +55,7 @@ class Ticket(object):
self._time = time if time is not None else MyTime.time()
self._data = {'matches': matches or [], 'failures': 0}
if data is not None:
for k,v in data.iteritems():
for k,v in data.items():
if v is not None:
self._data[k] = v
if ticket:
@ -88,7 +88,7 @@ class Ticket(object):
def setID(self, value):
# guarantee using IPAddr instead of unicode, str for the IP
if isinstance(value, basestring):
if isinstance(value, str):
value = IPAddr(value)
self._id = value
@ -180,7 +180,7 @@ class Ticket(object):
if len(args) == 1:
# todo: if support >= 2.7 only:
# self._data = {k:v for k,v in args[0].iteritems() if v is not None}
self._data = dict([(k,v) for k,v in args[0].iteritems() if v is not None])
self._data = dict([(k,v) for k,v in args[0].items() if v is not None])
# add k,v list or dict (merge):
elif len(args) == 2:
self._data.update((args,))
@ -191,7 +191,7 @@ class Ticket(object):
# filter (delete) None values:
# todo: if support >= 2.7 only:
# self._data = {k:v for k,v in self._data.iteritems() if v is not None}
self._data = dict([(k,v) for k,v in self._data.iteritems() if v is not None])
self._data = dict([(k,v) for k,v in self._data.items() if v is not None])
def getData(self, key=None, default=None):
# return whole data dict:
@ -200,17 +200,17 @@ class Ticket(object):
# return default if not exists:
if not self._data:
return default
if not isinstance(key,(str,unicode,type(None),int,float,bool,complex)):
if not isinstance(key,(str,type(None),int,float,bool,complex)):
# return filtered by lambda/function:
if callable(key):
# todo: if support >= 2.7 only:
# return {k:v for k,v in self._data.iteritems() if key(k)}
return dict([(k,v) for k,v in self._data.iteritems() if key(k)])
return dict([(k,v) for k,v in self._data.items() if key(k)])
# return filtered by keys:
if hasattr(key, '__iter__'):
# todo: if support >= 2.7 only:
# return {k:v for k,v in self._data.iteritems() if k in key}
return dict([(k,v) for k,v in self._data.iteritems() if k in key])
return dict([(k,v) for k,v in self._data.items() if k in key])
# return single value of data:
return self._data.get(key, default)

View File

@ -488,7 +488,7 @@ class Transmitter:
opt = command[1][len("bantime."):]
return self.__server.getBanTimeExtra(name, opt)
elif command[1] == "actions":
return self.__server.getActions(name).keys()
return list(self.__server.getActions(name).keys())
elif command[1] == "action":
actionname = command[2]
actionvalue = command[3]

View File

@ -53,7 +53,7 @@ _RETCODE_HINTS = {
# Dictionary to lookup signal name from number
signame = dict((num, name)
for name, num in signal.__dict__.iteritems() if name.startswith("SIG"))
for name, num in signal.__dict__.items() if name.startswith("SIG"))
class Utils():
"""Utilities provide diverse static methods like executes OS shell commands, etc.
@ -140,7 +140,7 @@ class Utils():
if not isinstance(realCmd, list):
realCmd = [realCmd]
i = len(realCmd)-1
for k, v in varsDict.iteritems():
for k, v in varsDict.items():
varsStat += "%s=$%s " % (k, i)
realCmd.append(v)
i += 1

View File

@ -242,14 +242,14 @@ class CommandActionTest(LogCaptureTestCase):
setattr(self.__action, 'ab', "<ac>")
setattr(self.__action, 'x?family=inet6', "")
# produce self-referencing properties except:
self.assertRaisesRegexp(ValueError, r"properties contain self referencing definitions",
self.assertRaisesRegex(ValueError, r"properties contain self referencing definitions",
lambda: self.__action.replaceTag("<a><b>",
self.__action._properties, conditional="family=inet4")
)
# remore self-referencing in props:
delattr(self.__action, 'ac')
# produce self-referencing query except:
self.assertRaisesRegexp(ValueError, r"possible self referencing definitions in query",
self.assertRaisesRegex(ValueError, r"possible self referencing definitions in query",
lambda: self.__action.replaceTag("<x"*30+">"*30,
self.__action._properties, conditional="family=inet6")
)

View File

@ -177,7 +177,7 @@ class StatusExtendedCymruInfo(unittest.TestCase):
super(StatusExtendedCymruInfo, self).setUp()
unittest.F2B.SkipIfNoNetwork()
setUpMyTime()
self.__ban_ip = iter(DNSUtils.dnsToIp("resolver1.opendns.com")).next()
self.__ban_ip = next(iter(DNSUtils.dnsToIp("resolver1.opendns.com")))
self.__asn = "36692"
self.__country = "US"
self.__rir = "arin"

View File

@ -419,7 +419,7 @@ class JailReaderTest(LogCaptureTestCase):
# And multiple groups (`][` instead of `,`)
result = extractOptions(option.replace(',', ']['))
expected2 = (expected[0],
dict((k, v.replace(',', '][')) for k, v in expected[1].iteritems())
dict((k, v.replace(',', '][')) for k, v in expected[1].items())
)
self.assertEqual(expected2, result)
@ -1018,7 +1018,7 @@ filter = testfilter1
self.assertEqual(add_actions[-1][-1], "{}")
def testLogPathFileFilterBackend(self):
self.assertRaisesRegexp(ValueError, r"Have not found any log file for .* jail",
self.assertRaisesRegex(ValueError, r"Have not found any log file for .* jail",
self._testLogPath, backend='polling')
def testLogPathSystemdBackend(self):

View File

@ -67,7 +67,7 @@ class DatabaseTest(LogCaptureTestCase):
@property
def db(self):
if isinstance(self._db, basestring) and self._db == ':auto-create-in-memory:':
if isinstance(self._db, str) and self._db == ':auto-create-in-memory:':
self._db = getFail2BanDb(self.dbFilename)
return self._db
@db.setter
@ -159,7 +159,7 @@ class DatabaseTest(LogCaptureTestCase):
self.db = Fail2BanDb(self.dbFilename)
self.assertEqual(self.db.getJailNames(), set(['DummyJail #29162448 with 0 tickets']))
self.assertEqual(self.db.getLogPaths(), set(['/tmp/Fail2BanDb_pUlZJh.log']))
ticket = FailTicket("127.0.0.1", 1388009242.26, [u"abc\n"])
ticket = FailTicket("127.0.0.1", 1388009242.26, ["abc\n"])
self.assertEqual(self.db.getBans()[0], ticket)
self.assertEqual(self.db.updateDb(Fail2BanDb.__version__), Fail2BanDb.__version__)
@ -185,9 +185,9 @@ class DatabaseTest(LogCaptureTestCase):
self.assertEqual(len(bans), 2)
# compare first ticket completely:
ticket = FailTicket("1.2.3.7", 1417595494, [
u'Dec 3 09:31:08 f2btest test:auth[27658]: pam_unix(test:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.3.7',
u'Dec 3 09:31:32 f2btest test:auth[27671]: pam_unix(test:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.3.7',
u'Dec 3 09:31:34 f2btest test:auth[27673]: pam_unix(test:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.3.7'
'Dec 3 09:31:08 f2btest test:auth[27658]: pam_unix(test:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.3.7',
'Dec 3 09:31:32 f2btest test:auth[27671]: pam_unix(test:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.3.7',
'Dec 3 09:31:34 f2btest test:auth[27673]: pam_unix(test:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.3.7'
])
ticket.setAttempt(3)
self.assertEqual(bans[0], ticket)
@ -287,11 +287,11 @@ class DatabaseTest(LogCaptureTestCase):
# invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ...
tickets = [
FailTicket("127.0.0.1", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.2", 0, ['user "test"', u'user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.2", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.3", 0, ['user "test"', b'user "\xd1\xe2\xe5\xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.4", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', u'user "\xe4\xf6\xfc\xdf"']),
FailTicket("127.0.0.4", 0, ['user "test"', 'user "\xd1\xe2\xe5\xf2\xe0"', 'user "\xe4\xf6\xfc\xdf"']),
FailTicket("127.0.0.5", 0, ['user "test"', 'unterminated \xcf']),
FailTicket("127.0.0.6", 0, ['user "test"', u'unterminated \xcf']),
FailTicket("127.0.0.6", 0, ['user "test"', 'unterminated \xcf']),
FailTicket("127.0.0.7", 0, ['user "test"', b'unterminated \xcf'])
]
for ticket in tickets:

View File

@ -288,7 +288,7 @@ class DateDetectorTest(LogCaptureTestCase):
self.assertEqual(logTime, mu)
self.assertEqual(logMatch.group(1), '2012/10/11 02:37:17')
# confuse it with year being at the end
for i in xrange(10):
for i in range(10):
( logTime, logMatch ) = self.datedetector.getTime('11/10/2012 02:37:17 [error] 18434#0')
self.assertEqual(logTime, mu)
self.assertEqual(logMatch.group(1), '11/10/2012 02:37:17')
@ -538,7 +538,7 @@ class CustomDateFormatsTest(unittest.TestCase):
date = dd.getTime(line)
if matched:
self.assertTrue(date)
if isinstance(matched, basestring):
if isinstance(matched, str):
self.assertEqual(matched, date[1].group(1))
else:
self.assertEqual(matched, date[0])
@ -573,7 +573,7 @@ class CustomDateFormatsTest(unittest.TestCase):
date = dd.getTime(line)
if matched:
self.assertTrue(date)
if isinstance(matched, basestring): # pragma: no cover
if isinstance(matched, str): # pragma: no cover
self.assertEqual(matched, date[1].group(1))
else:
self.assertEqual(matched, date[0])

View File

@ -367,10 +367,10 @@ def with_foreground_server_thread(startextra={}):
# several commands to server in body of decorated function:
return f(self, tmp, startparams, *args, **kwargs)
except Exception as e: # pragma: no cover
print('=== Catch an exception: %s' % e)
print(('=== Catch an exception: %s' % e))
log = self.getLog()
if log:
print('=== Error of server, log: ===\n%s===' % log)
print(('=== Error of server, log: ===\n%s===' % log))
self.pruneLog()
raise
finally:
@ -440,7 +440,7 @@ class Fail2banClientServerBase(LogCaptureTestCase):
)
except: # pragma: no cover
if _inherited_log(startparams):
print('=== Error by wait fot server, log: ===\n%s===' % self.getLog())
print(('=== Error by wait fot server, log: ===\n%s===' % self.getLog()))
self.pruneLog()
log = pjoin(tmp, "f2b.log")
if isfile(log):
@ -1702,6 +1702,6 @@ class Fail2banServerTest(Fail2banClientServerBase):
self.stopAndWaitForServerEnd(SUCCESS)
def testServerStartStop(self):
for i in xrange(2000):
for i in range(2000):
self._testServerStartStop()

View File

@ -596,8 +596,8 @@ class Fail2banRegexTest(LogCaptureTestCase):
# test on unicode string containing \x0A as part of uni-char,
# it must produce exactly 2 lines (both are failures):
for l in (
u'1490349000 \u20AC Failed auth: invalid user Test\u020A from 192.0.2.1\n',
u'1490349000 \u20AC Failed auth: invalid user TestI from 192.0.2.2\n'
'1490349000 \u20AC Failed auth: invalid user Test\u020A from 192.0.2.1\n',
'1490349000 \u20AC Failed auth: invalid user TestI from 192.0.2.2\n'
):
fout.write(l.encode(enc))
fout.close()

View File

@ -45,11 +45,11 @@ class AddFailure(unittest.TestCase):
super(AddFailure, self).tearDown()
def _addDefItems(self):
self.__items = [[u'193.168.0.128', 1167605999.0],
[u'193.168.0.128', 1167605999.0],
[u'193.168.0.128', 1167605999.0],
[u'193.168.0.128', 1167605999.0],
[u'193.168.0.128', 1167605999.0],
self.__items = [['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['87.142.124.10', 1167605999.0],
['87.142.124.10', 1167605999.0],
['87.142.124.10', 1167605999.0],

View File

@ -41,7 +41,7 @@ def auth(v):
response="%s"
""" % ( username, algorithm, realm, url, nonce, qop, response )
# opaque="%s",
print(p.method, p.url, p.headers)
print((p.method, p.url, p.headers))
s = requests.Session()
return s.send(p)
@ -76,18 +76,18 @@ r = auth(v)
# [Sun Jul 28 21:41:20 2013] [error] [client 127.0.0.1] Digest: unknown algorithm `super funky chicken' received: /digest/
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))
v['algorithm'] = algorithm
r = auth(v)
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))
nonce = v['nonce']
v['nonce']=v['nonce'][5:-5]
r = auth(v)
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))
# [Sun Jul 28 21:05:31.178340 2013] [auth_digest:error] [pid 24224:tid 139895539455744] [client 127.0.0.1:56906] AH01793: invalid qop `auth' received: /digest/qop_none/
@ -95,7 +95,7 @@ print(r.status_code,r.headers, r.text)
v['nonce']=nonce[0:11] + 'ZZZ' + nonce[14:]
r = auth(v)
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))
#[Sun Jul 28 21:18:11.769228 2013] [auth_digest:error] [pid 24752:tid 139895505884928] [client 127.0.0.1:56964] AH01776: invalid nonce b9YAiJDiBAZZZ1b1abe02d20063ea3b16b544ea1b0d981c1bafe received - hash is not d42d824dee7aaf50c3ba0a7c6290bd453e3dd35b
@ -107,7 +107,7 @@ import time
time.sleep(1)
r = auth(v)
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))
# Obtained by putting the following code in modules/aaa/mod_auth_digest.c
# in the function initialize_secret
@ -137,7 +137,7 @@ s = sha.sha(apachesecret)
v=preauth()
print(v['nonce'])
print((v['nonce']))
realm = v['Digest realm'][1:-1]
(t,) = struct.unpack('l',base64.b64decode(v['nonce'][1:13]))
@ -156,13 +156,13 @@ print(v)
r = auth(v)
#[Mon Jul 29 02:12:55.539813 2013] [auth_digest:error] [pid 9647:tid 139895522670336] [client 127.0.0.1:58474] AH01777: invalid nonce 59QJppTiBAA=b08983fd166ade9840407df1b0f75b9e6e07d88d received - user attempted time travel
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))
url='/digest_onetime/'
v=preauth()
# Need opaque header handling in auth
r = auth(v)
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))
r = auth(v)
print(r.status_code,r.headers, r.text)
print((r.status_code,r.headers, r.text))

View File

@ -22,7 +22,7 @@
__copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
__license__ = "GPL"
from __builtin__ import open as fopen
from builtins import open as fopen
import unittest
import os
import re
@ -213,12 +213,12 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
else:
fin = in_
# Skip
for i in xrange(skip):
for i in range(skip):
fin.readline()
# Read
i = 0
if lines:
lines = map(uni_bytes, lines)
lines = list(map(uni_bytes, lines))
else:
lines = []
while n is None or i < n:
@ -257,7 +257,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p
# Required for filtering
fields.update(TEST_JOURNAL_FIELDS)
# Skip
for i in xrange(skip):
for i in range(skip):
fin.readline()
# Read/Write
i = 0
@ -319,18 +319,18 @@ class BasicFilter(unittest.TestCase):
def testTest_tm(self):
unittest.F2B.SkipIfFast()
## test function "_tm" works correct (returns the same as slow strftime):
for i in xrange(1417512352, (1417512352 // 3600 + 3) * 3600):
for i in range(1417512352, (1417512352 // 3600 + 3) * 3600):
tm = MyTime.time2str(i)
if _tm(i) != tm: # pragma: no cover - never reachable
self.assertEqual((_tm(i), i), (tm, i))
def testWrongCharInTupleLine(self):
## line tuple has different types (ascii after ascii / unicode):
for a1 in ('', u'', b''):
for a2 in ('2016-09-05T20:18:56', u'2016-09-05T20:18:56', b'2016-09-05T20:18:56'):
for a1 in ('', '', b''):
for a2 in ('2016-09-05T20:18:56', '2016-09-05T20:18:56', b'2016-09-05T20:18:56'):
for a3 in (
'Fail for "g\xc3\xb6ran" from 192.0.2.1',
u'Fail for "g\xc3\xb6ran" from 192.0.2.1',
'Fail for "g\xc3\xb6ran" from 192.0.2.1',
b'Fail for "g\xc3\xb6ran" from 192.0.2.1'
):
# join should work if all arguments have the same type:
@ -517,7 +517,7 @@ class IgnoreIP(LogCaptureTestCase):
def testAddAttempt(self):
self.filter.setMaxRetry(3)
for i in xrange(1, 1+3):
for i in range(1, 1+3):
self.filter.addAttempt('192.0.2.1')
self.assertLogged('Attempt 192.0.2.1', '192.0.2.1:%d' % i, all=True, wait=True)
self.jail.actions._Actions__checkBan()
@ -554,7 +554,7 @@ class IgnoreIP(LogCaptureTestCase):
# like both test-cases above, just cached (so once per key)...
self.filter.ignoreCache = {"key":"<ip>"}
self.filter.ignoreCommand = 'if [ "<ip>" = "10.0.0.1" ]; then exit 0; fi; exit 1'
for i in xrange(5):
for i in range(5):
self.pruneLog()
self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1"))
self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0"))
@ -565,7 +565,7 @@ class IgnoreIP(LogCaptureTestCase):
# by host of IP:
self.filter.ignoreCache = {"key":"<ip-host>"}
self.filter.ignoreCommand = 'if [ "<ip-host>" = "test-host" ]; then exit 0; fi; exit 1'
for i in xrange(5):
for i in range(5):
self.pruneLog()
self.assertTrue(self.filter.inIgnoreIPList(FailTicket("2001:db8::1")))
self.assertFalse(self.filter.inIgnoreIPList(FailTicket("2001:db8::ffff")))
@ -577,7 +577,7 @@ class IgnoreIP(LogCaptureTestCase):
self.filter.ignoreCache = {"key":"<F-USER>", "max-count":"10", "max-time":"1h"}
self.assertEqual(self.filter.ignoreCache, ["<F-USER>", 10, 60*60])
self.filter.ignoreCommand = 'if [ "<F-USER>" = "tester" ]; then exit 0; fi; exit 1'
for i in xrange(5):
for i in range(5):
self.pruneLog()
self.assertTrue(self.filter.inIgnoreIPList(FailTicket("tester", data={'user': 'tester'})))
self.assertFalse(self.filter.inIgnoreIPList(FailTicket("root", data={'user': 'root'})))
@ -680,7 +680,7 @@ class LogFile(LogCaptureTestCase):
def testDecodeLineWarn(self):
# incomplete line (missing byte at end), warning is suppressed:
l = u"correct line\n"
l = "correct line\n"
r = l.encode('utf-16le')
self.assertEqual(FileContainer.decode_line('TESTFILE', 'utf-16le', r), l)
self.assertEqual(FileContainer.decode_line('TESTFILE', 'utf-16le', r[0:-1]), l[0:-1])
@ -740,7 +740,7 @@ class LogFileFilterPoll(unittest.TestCase):
fc = FileContainer(fname, self.filter.getLogEncoding())
fc.open()
# no time - nothing should be found :
for i in xrange(10):
for i in range(10):
f.write(b"[sshd] error: PAM: failure len 1\n")
f.flush()
fc.setPos(0); self.filter.seekToTime(fc, time)
@ -814,14 +814,14 @@ class LogFileFilterPoll(unittest.TestCase):
# variable length of file (ca 45K or 450K before and hereafter):
# write lines with smaller as search time:
t = time - count - 1
for i in xrange(count):
for i in range(count):
f.write(b"%s [sshd] error: PAM: failure\n" % _tmb(t))
t += 1
f.flush()
fc.setPos(0); self.filter.seekToTime(fc, time)
self.assertEqual(fc.getPos(), 47*count)
# write lines with exact search time:
for i in xrange(10):
for i in range(10):
f.write(b"%s [sshd] error: PAM: failure\n" % _tmb(time))
f.flush()
fc.setPos(0); self.filter.seekToTime(fc, time)
@ -830,8 +830,8 @@ class LogFileFilterPoll(unittest.TestCase):
self.assertEqual(fc.getPos(), 47*count)
# write lines with greater as search time:
t = time+1
for i in xrange(count//500):
for j in xrange(500):
for i in range(count//500):
for j in range(500):
f.write(b"%s [sshd] error: PAM: failure\n" % _tmb(t))
t += 1
f.flush()
@ -1641,10 +1641,10 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
# Add direct utf, unicode, blob:
for l in (
"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
u"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
b"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1".decode('utf-8', 'replace'),
"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
u"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
b"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2".decode('utf-8', 'replace')
):
fields = self.journal_fields
@ -1673,7 +1673,7 @@ class GetFailures(LogCaptureTestCase):
# so that they could be reused by other tests
FAILURES_01 = ('193.168.0.128', 3, 1124013599.0,
[u'Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128']*3)
['Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128']*3)
def setUp(self):
"""Call before every test case."""
@ -1759,8 +1759,8 @@ class GetFailures(LogCaptureTestCase):
# test on unicode string containing \x0A as part of uni-char,
# it must produce exactly 2 lines (both are failures):
for l in (
u'%s \u20AC Failed auth: invalid user Test\u020A from 192.0.2.1\n' % tm,
u'%s \u20AC Failed auth: invalid user TestI from 192.0.2.2\n' % tm
'%s \u20AC Failed auth: invalid user Test\u020A from 192.0.2.1\n' % tm,
'%s \u20AC Failed auth: invalid user TestI from 192.0.2.2\n' % tm
):
fout.write(l.encode(enc))
fout.close()
@ -1781,8 +1781,8 @@ class GetFailures(LogCaptureTestCase):
def testGetFailures02(self):
output = ('141.3.81.106', 4, 1124013539.0,
[u'Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2'
% m for m in 53, 54, 57, 58])
['Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2'
% m for m in (53, 54, 57, 58)])
self.filter.setMaxRetry(4)
self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=0)
@ -1893,19 +1893,19 @@ class GetFailures(LogCaptureTestCase):
# We should still catch failures with usedns = no ;-)
output_yes = (
('93.184.216.34', 1, 1124013299.0,
[u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2']
['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2']
),
('93.184.216.34', 1, 1124013539.0,
[u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
),
('2606:2800:220:1:248:1893:25c8:1946', 1, 1124013299.0,
[u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2']
['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2']
),
)
output_no = (
('93.184.216.34', 1, 1124013539.0,
[u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
)
)
@ -2011,9 +2011,9 @@ class DNSUtilsTests(unittest.TestCase):
self.assertTrue(c.get('a') is None)
self.assertEqual(c.get('a', 'test'), 'test')
# exact 5 elements :
for i in xrange(5):
for i in range(5):
c.set(i, i)
for i in xrange(5):
for i in range(5):
self.assertEqual(c.get(i), i)
# remove unavailable key:
c.unset('a'); c.unset('a')
@ -2021,30 +2021,30 @@ class DNSUtilsTests(unittest.TestCase):
def testCacheMaxSize(self):
c = Utils.Cache(maxCount=5, maxTime=60)
# exact 5 elements :
for i in xrange(5):
for i in range(5):
c.set(i, i)
self.assertEqual([c.get(i) for i in xrange(5)], [i for i in xrange(5)])
self.assertNotIn(-1, (c.get(i, -1) for i in xrange(5)))
self.assertEqual([c.get(i) for i in range(5)], [i for i in range(5)])
self.assertNotIn(-1, (c.get(i, -1) for i in range(5)))
# add one - too many:
c.set(10, i)
# one element should be removed :
self.assertIn(-1, (c.get(i, -1) for i in xrange(5)))
self.assertIn(-1, (c.get(i, -1) for i in range(5)))
# test max size (not expired):
for i in xrange(10):
for i in range(10):
c.set(i, 1)
self.assertEqual(len(c), 5)
def testCacheMaxTime(self):
# test max time (expired, timeout reached) :
c = Utils.Cache(maxCount=5, maxTime=0.0005)
for i in xrange(10):
for i in range(10):
c.set(i, 1)
st = time.time()
self.assertTrue(Utils.wait_for(lambda: time.time() >= st + 0.0005, 1))
# we have still 5 elements (or fewer if too slow test mashine):
self.assertTrue(len(c) <= 5)
# but all that are expiered also:
for i in xrange(10):
for i in range(10):
self.assertTrue(c.get(i) is None)
# here the whole cache should be empty:
self.assertEqual(len(c), 0)
@ -2065,7 +2065,7 @@ class DNSUtilsTests(unittest.TestCase):
c = count
while c:
c -= 1
s = xrange(0, 256, 1) if forw else xrange(255, -1, -1)
s = range(0, 256, 1) if forw else range(255, -1, -1)
if random: shuffle([i for i in s])
for i in s:
IPAddr('192.0.2.'+str(i), IPAddr.FAM_IPv4)
@ -2205,16 +2205,16 @@ class DNSUtilsNetworkTests(unittest.TestCase):
def testAddr2bin(self):
res = IPAddr('10.0.0.0')
self.assertEqual(res.addr, 167772160L)
self.assertEqual(res.addr, 167772160)
res = IPAddr('10.0.0.0', cidr=None)
self.assertEqual(res.addr, 167772160L)
res = IPAddr('10.0.0.0', cidr=32L)
self.assertEqual(res.addr, 167772160L)
res = IPAddr('10.0.0.1', cidr=32L)
self.assertEqual(res.addr, 167772161L)
self.assertEqual(res.addr, 167772160)
res = IPAddr('10.0.0.0', cidr=32)
self.assertEqual(res.addr, 167772160)
res = IPAddr('10.0.0.1', cidr=32)
self.assertEqual(res.addr, 167772161)
self.assertTrue(res.isSingle)
res = IPAddr('10.0.0.1', cidr=31L)
self.assertEqual(res.addr, 167772160L)
res = IPAddr('10.0.0.1', cidr=31)
self.assertEqual(res.addr, 167772160)
self.assertFalse(res.isSingle)
self.assertEqual(IPAddr('10.0.0.0').hexdump, '0a000000')
@ -2305,9 +2305,9 @@ class DNSUtilsNetworkTests(unittest.TestCase):
'93.184.216.34': 'ip4-test',
'2606:2800:220:1:248:1893:25c8:1946': 'ip6-test'
}
d2 = dict([(IPAddr(k), v) for k, v in d.iteritems()])
self.assertTrue(isinstance(d.keys()[0], basestring))
self.assertTrue(isinstance(d2.keys()[0], IPAddr))
d2 = dict([(IPAddr(k), v) for k, v in d.items()])
self.assertTrue(isinstance(list(d.keys())[0], str))
self.assertTrue(isinstance(list(d2.keys())[0], IPAddr))
self.assertEqual(d.get(ip4[2], ''), 'ip4-test')
self.assertEqual(d.get(ip6[2], ''), 'ip6-test')
self.assertEqual(d2.get(str(ip4[2]), ''), 'ip4-test')

View File

@ -29,9 +29,9 @@ import tempfile
import shutil
import fnmatch
from glob import glob
from StringIO import StringIO
from io import StringIO
from utils import LogCaptureTestCase, logSys as DefLogSys
from .utils import LogCaptureTestCase, logSys as DefLogSys
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, \
getVerbosityFormat, splitwords, uni_decode, uni_string
@ -67,7 +67,7 @@ class HelpersTest(unittest.TestCase):
self.assertEqual(splitwords(' 1\n 2'), ['1', '2'])
self.assertEqual(splitwords(' 1\n 2, 3'), ['1', '2', '3'])
# string as unicode:
self.assertEqual(splitwords(u' 1\n 2, 3'), ['1', '2', '3'])
self.assertEqual(splitwords(' 1\n 2, 3'), ['1', '2', '3'])
def _sh_call(cmd):
@ -191,12 +191,12 @@ class TestsUtilsTest(LogCaptureTestCase):
def testUniConverters(self):
self.assertRaises(Exception, uni_decode,
(b'test' if sys.version_info >= (3,) else u'test'), 'f2b-test::non-existing-encoding')
uni_decode((b'test\xcf' if sys.version_info >= (3,) else u'test\xcf'))
(b'test' if sys.version_info >= (3,) else 'test'), 'f2b-test::non-existing-encoding')
uni_decode((b'test\xcf' if sys.version_info >= (3,) else 'test\xcf'))
uni_string(b'test\xcf')
uni_string('test\xcf')
if sys.version_info < (3,) and 'PyPy' not in sys.version:
uni_string(u'test\xcf')
uni_string('test\xcf')
def testSafeLogging(self):
# logging should be exception-safe, to avoid possible errors (concat, str. conversion, representation failures, etc)
@ -208,7 +208,7 @@ class TestsUtilsTest(LogCaptureTestCase):
if self.err:
raise Exception('no represenation for test!')
else:
return u'conv-error (\xf2\xf0\xe5\xf2\xe8\xe9), unterminated utf \xcf'
return 'conv-error (\xf2\xf0\xe5\xf2\xe8\xe9), unterminated utf \xcf'
test = Test()
logSys.log(logging.NOTICE, "test 1a: %r", test)
self.assertLogged("Traceback", "no represenation for test!")
@ -256,7 +256,7 @@ class TestsUtilsTest(LogCaptureTestCase):
func_raise()
try:
print deep_function(3)
print(deep_function(3))
except ValueError:
s = tb()
@ -273,7 +273,7 @@ class TestsUtilsTest(LogCaptureTestCase):
self.assertIn(':', s)
def _testAssertionErrorRE(self, regexp, fun, *args, **kwargs):
self.assertRaisesRegexp(AssertionError, regexp, fun, *args, **kwargs)
self.assertRaisesRegex(AssertionError, regexp, fun, *args, **kwargs)
def testExtendedAssertRaisesRE(self):
## test _testAssertionErrorRE several fail cases:
@ -311,13 +311,13 @@ class TestsUtilsTest(LogCaptureTestCase):
self._testAssertionErrorRE(r"'a' unexpectedly found in 'cba'",
self.assertNotIn, 'a', 'cba')
self._testAssertionErrorRE(r"1 unexpectedly found in \[0, 1, 2\]",
self.assertNotIn, 1, xrange(3))
self.assertNotIn, 1, range(3))
self._testAssertionErrorRE(r"'A' unexpectedly found in \['C', 'A'\]",
self.assertNotIn, 'A', (c.upper() for c in 'cba' if c != 'b'))
self._testAssertionErrorRE(r"'a' was not found in 'xyz'",
self.assertIn, 'a', 'xyz')
self._testAssertionErrorRE(r"5 was not found in \[0, 1, 2\]",
self.assertIn, 5, xrange(3))
self.assertIn, 5, range(3))
self._testAssertionErrorRE(r"'A' was not found in \['C', 'B'\]",
self.assertIn, 'A', (c.upper() for c in 'cba' if c != 'a'))
## assertLogged, assertNotLogged positive case:

View File

@ -68,7 +68,7 @@ class BanTimeIncr(LogCaptureTestCase):
a.setBanTimeExtra('multipliers', multipliers)
# test algorithm and max time 24 hours :
self.assertEqual(
[a.calcBanTime(600, i) for i in xrange(1, 11)],
[a.calcBanTime(600, i) for i in range(1, 11)],
[1200, 2400, 4800, 9600, 19200, 38400, 76800, 86400, 86400, 86400]
)
# with extra large max time (30 days):
@ -80,38 +80,38 @@ class BanTimeIncr(LogCaptureTestCase):
if multcnt < 11:
arr = arr[0:multcnt-1] + ([arr[multcnt-2]] * (11-multcnt))
self.assertEqual(
[a.calcBanTime(600, i) for i in xrange(1, 11)],
[a.calcBanTime(600, i) for i in range(1, 11)],
arr
)
a.setBanTimeExtra('maxtime', '1d')
# change factor :
a.setBanTimeExtra('factor', '2');
self.assertEqual(
[a.calcBanTime(600, i) for i in xrange(1, 11)],
[a.calcBanTime(600, i) for i in range(1, 11)],
[2400, 4800, 9600, 19200, 38400, 76800, 86400, 86400, 86400, 86400]
)
# factor is float :
a.setBanTimeExtra('factor', '1.33');
self.assertEqual(
[int(a.calcBanTime(600, i)) for i in xrange(1, 11)],
[int(a.calcBanTime(600, i)) for i in range(1, 11)],
[1596, 3192, 6384, 12768, 25536, 51072, 86400, 86400, 86400, 86400]
)
a.setBanTimeExtra('factor', None);
# change max time :
a.setBanTimeExtra('maxtime', '12h')
self.assertEqual(
[a.calcBanTime(600, i) for i in xrange(1, 11)],
[a.calcBanTime(600, i) for i in range(1, 11)],
[1200, 2400, 4800, 9600, 19200, 38400, 43200, 43200, 43200, 43200]
)
a.setBanTimeExtra('maxtime', '24h')
## test randomization - not possibe all 10 times we have random = 0:
a.setBanTimeExtra('rndtime', '5m')
self.assertTrue(
False in [1200 in [a.calcBanTime(600, 1) for i in xrange(10)] for c in xrange(10)]
False in [1200 in [a.calcBanTime(600, 1) for i in range(10)] for c in range(10)]
)
a.setBanTimeExtra('rndtime', None)
self.assertFalse(
False in [1200 in [a.calcBanTime(600, 1) for i in xrange(10)] for c in xrange(10)]
False in [1200 in [a.calcBanTime(600, 1) for i in range(10)] for c in range(10)]
)
# restore default:
a.setBanTimeExtra('multipliers', None)
@ -123,7 +123,7 @@ class BanTimeIncr(LogCaptureTestCase):
# this multipliers has the same values as default formula, we test stop growing after count 9:
self.testDefault('1 2 4 8 16 32 64 128 256')
# this multipliers has exactly the same values as default formula, test endless growing (stops by count 31 only):
self.testDefault(' '.join([str(1<<i) for i in xrange(31)]))
self.testDefault(' '.join([str(1<<i) for i in range(31)]))
def testFormula(self):
a = self.__jail;
@ -135,38 +135,38 @@ class BanTimeIncr(LogCaptureTestCase):
a.setBanTimeExtra('multipliers', None)
# test algorithm and max time 24 hours :
self.assertEqual(
[int(a.calcBanTime(600, i)) for i in xrange(1, 11)],
[int(a.calcBanTime(600, i)) for i in range(1, 11)],
[1200, 2400, 4800, 9600, 19200, 38400, 76800, 86400, 86400, 86400]
)
# with extra large max time (30 days):
a.setBanTimeExtra('maxtime', '30d')
self.assertEqual(
[int(a.calcBanTime(600, i)) for i in xrange(1, 11)],
[int(a.calcBanTime(600, i)) for i in range(1, 11)],
[1200, 2400, 4800, 9600, 19200, 38400, 76800, 153601, 307203, 614407]
)
a.setBanTimeExtra('maxtime', '24h')
# change factor :
a.setBanTimeExtra('factor', '1');
self.assertEqual(
[int(a.calcBanTime(600, i)) for i in xrange(1, 11)],
[int(a.calcBanTime(600, i)) for i in range(1, 11)],
[1630, 4433, 12051, 32758, 86400, 86400, 86400, 86400, 86400, 86400]
)
a.setBanTimeExtra('factor', '2.0 / 2.885385')
# change max time :
a.setBanTimeExtra('maxtime', '12h')
self.assertEqual(
[int(a.calcBanTime(600, i)) for i in xrange(1, 11)],
[int(a.calcBanTime(600, i)) for i in range(1, 11)],
[1200, 2400, 4800, 9600, 19200, 38400, 43200, 43200, 43200, 43200]
)
a.setBanTimeExtra('maxtime', '24h')
## test randomization - not possibe all 10 times we have random = 0:
a.setBanTimeExtra('rndtime', '5m')
self.assertTrue(
False in [1200 in [int(a.calcBanTime(600, 1)) for i in xrange(10)] for c in xrange(10)]
False in [1200 in [int(a.calcBanTime(600, 1)) for i in range(10)] for c in range(10)]
)
a.setBanTimeExtra('rndtime', None)
self.assertFalse(
False in [1200 in [int(a.calcBanTime(600, 1)) for i in xrange(10)] for c in xrange(10)]
False in [1200 in [int(a.calcBanTime(600, 1)) for i in range(10)] for c in range(10)]
)
# restore default:
a.setBanTimeExtra('factor', None);
@ -229,7 +229,7 @@ class BanTimeIncrDB(LogCaptureTestCase):
ticket = FailTicket(ip, stime, [])
# test ticket not yet found
self.assertEqual(
[self.incrBanTime(ticket, 10) for i in xrange(3)],
[self.incrBanTime(ticket, 10) for i in range(3)],
[10, 10, 10]
)
# add a ticket banned
@ -284,7 +284,7 @@ class BanTimeIncrDB(LogCaptureTestCase):
)
# increase ban multiple times:
lastBanTime = 20
for i in xrange(10):
for i in range(10):
ticket.setTime(stime + lastBanTime + 5)
banTime = self.incrBanTime(ticket, 10)
self.assertEqual(banTime, lastBanTime * 2)
@ -483,7 +483,7 @@ class BanTimeIncrDB(LogCaptureTestCase):
ticket = FailTicket(ip, stime-120, [])
failManager = jail.filter.failManager = FailManager()
failManager.setMaxRetry(3)
for i in xrange(3):
for i in range(3):
failManager.addFailure(ticket)
obs.add('failureFound', jail, ticket)
obs.wait_empty(5)

View File

@ -137,7 +137,7 @@ class FilterSamplesRegex(unittest.TestCase):
@staticmethod
def _filterOptions(opts):
return dict((k, v) for k, v in opts.iteritems() if not k.startswith('test.'))
return dict((k, v) for k, v in opts.items() if not k.startswith('test.'))
def testSampleRegexsFactory(name, basedir):
def testFilter(self):
@ -258,12 +258,12 @@ def testSampleRegexsFactory(name, basedir):
self.assertTrue(faildata.get('match', False),
"Line matched when shouldn't have")
self.assertEqual(len(ret), 1,
"Multiple regexs matched %r" % (map(lambda x: x[0], ret)))
"Multiple regexs matched %r" % ([x[0] for x in ret]))
for ret in ret:
failregex, fid, fail2banTime, fail = ret
# Verify match captures (at least fid/host) and timestamp as expected
for k, v in faildata.iteritems():
for k, v in faildata.items():
if k not in ("time", "match", "desc", "constraint"):
fv = fail.get(k, None)
if fv is None:
@ -305,7 +305,7 @@ def testSampleRegexsFactory(name, basedir):
'\n'.join(pprint.pformat(fail).splitlines())))
# check missing samples for regex using each filter-options combination:
for fltName, flt in self._filters.iteritems():
for fltName, flt in self._filters.items():
flt, regexsUsedIdx = flt
regexList = flt.getFailRegex()
for failRegexIndex, failRegex in enumerate(regexList):

View File

@ -127,14 +127,14 @@ class TransmitterBase(LogCaptureTestCase):
self.transm.proceed(["get", jail, cmd]), (0, []))
for n, value in enumerate(values):
ret = self.transm.proceed(["set", jail, cmdAdd, value])
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[:n+1])), level=2)
self.assertSortedEqual((ret[0], list(map(str, ret[1]))), (0, list(map(str, values[:n+1]))), level=2)
ret = self.transm.proceed(["get", jail, cmd])
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[:n+1])), level=2)
self.assertSortedEqual((ret[0], list(map(str, ret[1]))), (0, list(map(str, values[:n+1]))), level=2)
for n, value in enumerate(values):
ret = self.transm.proceed(["set", jail, cmdDel, value])
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[n+1:])), level=2)
self.assertSortedEqual((ret[0], list(map(str, ret[1]))), (0, list(map(str, values[n+1:]))), level=2)
ret = self.transm.proceed(["get", jail, cmd])
self.assertSortedEqual((ret[0], map(str, ret[1])), (0, map(str, values[n+1:])), level=2)
self.assertSortedEqual((ret[0], list(map(str, ret[1]))), (0, list(map(str, values[n+1:]))), level=2)
def jailAddDelRegexTest(self, cmd, inValues, outValues, jail):
cmdAdd = "add" + cmd
@ -930,7 +930,7 @@ class TransmitterLogging(TransmitterBase):
def testLogTarget(self):
logTargets = []
for _ in xrange(3):
for _ in range(3):
tmpFile = tempfile.mkstemp("fail2ban", "transmitter")
logTargets.append(tmpFile[1])
os.close(tmpFile[0])
@ -1003,26 +1003,26 @@ class TransmitterLogging(TransmitterBase):
self.assertEqual(self.transm.proceed(["flushlogs"]), (0, "rolled over"))
l.warning("After flushlogs")
with open(fn2,'r') as f:
line1 = f.next()
line1 = next(f)
if line1.find('Changed logging target to') >= 0:
line1 = f.next()
line1 = next(f)
self.assertTrue(line1.endswith("Before file moved\n"))
line2 = f.next()
line2 = next(f)
self.assertTrue(line2.endswith("After file moved\n"))
try:
n = f.next()
n = next(f)
if n.find("Command: ['flushlogs']") >=0:
self.assertRaises(StopIteration, f.next)
self.assertRaises(StopIteration, f.__next__)
else:
self.fail("Exception StopIteration or Command: ['flushlogs'] expected. Got: %s" % n)
except StopIteration:
pass # on higher debugging levels this is expected
with open(fn,'r') as f:
line1 = f.next()
line1 = next(f)
if line1.find('rollover performed on') >= 0:
line1 = f.next()
line1 = next(f)
self.assertTrue(line1.endswith("After flushlogs\n"))
self.assertRaises(StopIteration, f.next)
self.assertRaises(StopIteration, f.__next__)
f.close()
finally:
os.remove(fn2)
@ -1185,7 +1185,7 @@ class LoggingTests(LogCaptureTestCase):
os.remove(f)
from clientreadertestcase import ActionReader, JailsReader, CONFIG_DIR
from .clientreadertestcase import ActionReader, JailsReader, CONFIG_DIR
class ServerConfigReaderTests(LogCaptureTestCase):

View File

@ -153,7 +153,7 @@ class Socket(LogCaptureTestCase):
org_handler = RequestHandler.found_terminator
try:
RequestHandler.found_terminator = lambda self: self.close()
self.assertRaisesRegexp(Exception, r"reset by peer|Broken pipe",
self.assertRaisesRegex(Exception, r"reset by peer|Broken pipe",
lambda: client.send(testMessage, timeout=unittest.F2B.maxWaitTime(10)))
finally:
RequestHandler.found_terminator = org_handler

View File

@ -35,7 +35,7 @@ import time
import threading
import unittest
from cStringIO import StringIO
from io import StringIO
from functools import wraps
from ..helpers import getLogger, str2LogLevel, getVerbosityFormat, uni_decode
@ -171,8 +171,8 @@ def initProcess(opts):
# Let know the version
if opts.verbosity != 0:
print("Fail2ban %s test suite. Python %s. Please wait..." \
% (version, str(sys.version).replace('\n', '')))
print(("Fail2ban %s test suite. Python %s. Please wait..." \
% (version, str(sys.version).replace('\n', ''))))
return opts;
@ -303,7 +303,7 @@ def initTests(opts):
c.clear = lambda: logSys.warn('clear CACHE_ipToName is disabled in test suite')
# increase max count and max time (too many entries, long time testing):
c.setOptions(maxCount=10000, maxTime=5*60)
for i in xrange(256):
for i in range(256):
c.set('192.0.2.%s' % i, None)
c.set('198.51.100.%s' % i, None)
c.set('203.0.113.%s' % i, None)
@ -531,8 +531,8 @@ def gatherTests(regexps=None, opts=None):
import difflib, pprint
if not hasattr(unittest.TestCase, 'assertDictEqual'):
def assertDictEqual(self, d1, d2, msg=None):
self.assert_(isinstance(d1, dict), 'First argument is not a dictionary')
self.assert_(isinstance(d2, dict), 'Second argument is not a dictionary')
self.assertTrue(isinstance(d1, dict), 'First argument is not a dictionary')
self.assertTrue(isinstance(d2, dict), 'Second argument is not a dictionary')
if d1 != d2:
standardMsg = '%r != %r' % (d1, d2)
diff = ('\n' + '\n'.join(difflib.ndiff(
@ -550,7 +550,7 @@ def assertSortedEqual(self, a, b, level=1, nestedOnly=False, key=repr, msg=None)
# used to recognize having element as nested dict, list or tuple:
def _is_nested(v):
if isinstance(v, dict):
return any(isinstance(v, (dict, list, tuple)) for v in v.itervalues())
return any(isinstance(v, (dict, list, tuple)) for v in v.values())
return any(isinstance(v, (dict, list, tuple)) for v in v)
if nestedOnly:
_nest_sorted = sorted
@ -570,7 +570,7 @@ def assertSortedEqual(self, a, b, level=1, nestedOnly=False, key=repr, msg=None)
return
raise ValueError('%r != %r' % (a, b))
if isinstance(a, dict) and isinstance(b, dict): # compare dict's:
for k, v1 in a.iteritems():
for k, v1 in a.items():
v2 = b[k]
if isinstance(v1, (dict, list, tuple)) and isinstance(v2, (dict, list, tuple)):
_assertSortedEqual(v1, v2, level-1 if level != 0 else 0, nestedOnly, key)
@ -605,14 +605,14 @@ if not hasattr(unittest.TestCase, 'assertRaisesRegexp'):
self.fail('\"%s\" does not match \"%s\"' % (regexp, e))
else:
self.fail('%s not raised' % getattr(exccls, '__name__'))
unittest.TestCase.assertRaisesRegexp = assertRaisesRegexp
unittest.TestCase.assertRaisesRegex = assertRaisesRegexp
# always custom following methods, because we use atm better version of both (support generators)
if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
def assertIn(self, a, b, msg=None):
bb = b
wrap = False
if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring):
if msg is None and hasattr(b, '__iter__') and not isinstance(b, str):
b, bb = itertools.tee(b)
wrap = True
if a not in b:
@ -623,7 +623,7 @@ if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
def assertNotIn(self, a, b, msg=None):
bb = b
wrap = False
if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring):
if msg is None and hasattr(b, '__iter__') and not isinstance(b, str):
b, bb = itertools.tee(b)
wrap = True
if a in b:

View File

@ -68,15 +68,15 @@ class install_scripts_f2b(install_scripts):
if dry_run:
#bindir = self.install_dir
bindir = self.build_dir
print('creating fail2ban-python binding -> %s (dry-run, real path can be different)' % (bindir,))
print('Copying content of %s to %s' % (self.build_dir, self.install_dir));
print(('creating fail2ban-python binding -> %s (dry-run, real path can be different)' % (bindir,)))
print(('Copying content of %s to %s' % (self.build_dir, self.install_dir)));
return outputs
fn = None
for fn in outputs:
if os.path.basename(fn) == 'fail2ban-server':
break
bindir = os.path.dirname(fn)
print('creating fail2ban-python binding -> %s' % (bindir,))
print(('creating fail2ban-python binding -> %s' % (bindir,)))
updatePyExec(bindir)
return outputs
@ -93,7 +93,7 @@ class install_scripts_f2b(install_scripts):
scripts = ['fail2ban.service', 'fail2ban-openrc.init']
for script in scripts:
print('Creating %s/%s (from %s.in): @BINDIR@ -> %s' % (buildroot, script, script, install_dir))
print(('Creating %s/%s (from %s.in): @BINDIR@ -> %s' % (buildroot, script, script, install_dir)))
with open(os.path.join(source_dir, 'files/%s.in' % script), 'r') as fn:
lines = fn.readlines()
fn = None
@ -296,7 +296,7 @@ if obsoleteFiles:
print("Please delete them:")
print("")
for f in obsoleteFiles:
print("\t" + f)
print(("\t" + f))
print("")
if isdir("/usr/lib/fail2ban"):