diff --git a/fail2ban/client/configreader.py b/fail2ban/client/configreader.py
index b7da271b..04502504 100644
--- a/fail2ban/client/configreader.py
+++ b/fail2ban/client/configreader.py
@@ -29,24 +29,12 @@ import os
from ConfigParser import NoOptionError, NoSectionError
from .configparserinc import sys, SafeConfigParserWithIncludes, logLevel
-from ..helpers import getLogger, substituteRecursiveTags
+from ..helpers import getLogger, _merge_dicts, substituteRecursiveTags
# Gets the instance of the logger.
logSys = getLogger(__name__)
-# if sys.version_info >= (3,5):
-# def _merge_dicts(x, y):
-# return {**x, **y}
-# else:
-def _merge_dicts(x, y):
- r = x
- if y:
- r = x.copy()
- r.update(y)
- return r
-
-
class ConfigReader():
"""Generic config reader class.
diff --git a/fail2ban/helpers.py b/fail2ban/helpers.py
index de26dbcd..556ca173 100644
--- a/fail2ban/helpers.py
+++ b/fail2ban/helpers.py
@@ -169,6 +169,36 @@ def splitwords(s):
return []
return filter(bool, map(str.strip, re.split('[ ,\n]+', s)))
+if sys.version_info >= (3,5):
+ eval(compile(r'''if 1:
+ def _merge_dicts(x, y):
+ """Helper to merge dicts.
+ """
+ if y:
+ return {**x, **y}
+ return x
+
+ def _merge_copy_dicts(x, y):
+ """Helper to merge dicts to guarantee a copy result (r is never x).
+ """
+ return {**x, **y}
+ ''', __file__, 'exec'))
+else:
+ def _merge_dicts(x, y):
+ """Helper to merge dicts.
+ """
+ r = x
+ if y:
+ r = x.copy()
+ r.update(y)
+ return r
+ def _merge_copy_dicts(x, y):
+ """Helper to merge dicts to guarantee a copy result (r is never x).
+ """
+ r = x.copy()
+ if y:
+ r.update(y)
+ return r
#
# Following "uni_decode" function unified python independent any to string converting
diff --git a/fail2ban/server/action.py b/fail2ban/server/action.py
index 31a1a3dc..2a773638 100644
--- a/fail2ban/server/action.py
+++ b/fail2ban/server/action.py
@@ -36,7 +36,7 @@ from .failregex import mapTag2Opt
from .ipdns import asip
from .mytime import MyTime
from .utils import Utils
-from ..helpers import getLogger, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
+from ..helpers import getLogger, _merge_copy_dicts, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
# Gets the instance of the logger.
logSys = getLogger(__name__)
@@ -148,7 +148,7 @@ class CallingMap(MutableMapping, object):
return len(self.data)
def copy(self): # pargma: no cover
- return self.__class__(self.data.copy())
+ return self.__class__(_merge_copy_dicts(self.data, self.storage))
class ActionBase(object):
diff --git a/fail2ban/tests/actiontestcase.py b/fail2ban/tests/actiontestcase.py
index 610857d6..70562baf 100644
--- a/fail2ban/tests/actiontestcase.py
+++ b/fail2ban/tests/actiontestcase.py
@@ -158,18 +158,35 @@ class CommandActionTest(LogCaptureTestCase):
{'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'})
def testSubstRec_DontTouchUnusedCallable(self):
- cm = CallingMap(
- A=0,
- B=lambda self: '',
- C=lambda self,i=0: 5 // int(self['A']) # raise error by access
- )
+ cm = CallingMap({
+ 'A':0,
+ 'B':lambda self: '',
+ 'C':'',
+ 'D':''
+ })
+ #
+ # should raise no exceptions:
+ substituteRecursiveTags(cm)
+ # add exception tag:
+ cm['C'] = lambda self,i=0: 5 // int(self['A']) # raise error by access
+ # test direct get of callable (should raise an error):
+ self.assertRaises(ZeroDivisionError, lambda: cm['C'])
+ # should raise no exceptions (tag "C" still unused):
+ substituteRecursiveTags(cm)
+ # add reference to "broken" tag:
+ cm['D'] = 'test='
+ # should raise an exception (BOOM by replacement of tag "D" recursive):
+ self.assertRaises(ZeroDivisionError, lambda: substituteRecursiveTags(cm))
+ #
# should raise no exceptions:
self.assertEqual(self.__action.replaceTag('test=', cm), "test=0")
# **Important**: recursive replacement of dynamic data from calling map should be prohibited,
# otherwise may be vulnerable on foreign user-input:
self.assertEqual(self.__action.replaceTag('test=----', cm), "test=0----0")
- # should raise an exception:
+ # should raise an exception (BOOM by replacement of tag "C"):
self.assertRaises(ZeroDivisionError, lambda: self.__action.replaceTag('test=', cm))
+ # should raise no exceptions (replaces tag "D" only):
+ self.assertEqual(self.__action.replaceTag('', cm), "test=")
def testReplaceTag(self):
aInfo = {