Merge pull request #1716 from sebres/fix-stop-replace-in-callable

Prohibit recursive replacement of action info (calling map)
pull/1717/head
Serg G. Brester 8 years ago committed by GitHub
commit 77229a65b5

@ -29,24 +29,12 @@ import os
from ConfigParser import NoOptionError, NoSectionError
from .configparserinc import sys, SafeConfigParserWithIncludes, logLevel
from ..helpers import getLogger, substituteRecursiveTags
from ..helpers import getLogger, _merge_dicts, substituteRecursiveTags
# Gets the instance of the logger.
logSys = getLogger(__name__)
# if sys.version_info >= (3,5):
# def _merge_dicts(x, y):
# return {**x, **y}
# else:
def _merge_dicts(x, y):
r = x
if y:
r = x.copy()
r.update(y)
return r
class ConfigReader():
"""Generic config reader class.

@ -169,6 +169,36 @@ def splitwords(s):
return []
return filter(bool, map(str.strip, re.split('[ ,\n]+', s)))
if sys.version_info >= (3,5):
eval(compile(r'''if 1:
def _merge_dicts(x, y):
"""Helper to merge dicts.
"""
if y:
return {**x, **y}
return x
def _merge_copy_dicts(x, y):
"""Helper to merge dicts to guarantee a copy result (r is never x).
"""
return {**x, **y}
''', __file__, 'exec'))
else:
def _merge_dicts(x, y):
"""Helper to merge dicts.
"""
r = x
if y:
r = x.copy()
r.update(y)
return r
def _merge_copy_dicts(x, y):
"""Helper to merge dicts to guarantee a copy result (r is never x).
"""
r = x.copy()
if y:
r.update(y)
return r
#
# Following "uni_decode" function unified python independent any to string converting
@ -240,6 +270,7 @@ def substituteRecursiveTags(inptags, conditional='',
# init:
ignore = set(ignore)
done = set()
noRecRepl = hasattr(tags, "getRawItem")
# repeat substitution while embedded-recursive (repFlag is True)
while True:
repFlag = False
@ -247,6 +278,8 @@ def substituteRecursiveTags(inptags, conditional='',
for tag in tags.iterkeys():
# ignore escaped or already done (or in ignore list):
if tag in ignore or tag in done: continue
# ignore replacing callable items from calling map - should be converted on demand only (by get):
if noRecRepl and callable(tags.getRawItem(tag)): continue
value = orgval = str(tags[tag])
# search and replace all tags within value, that can be interpolated using other tags:
m = tre_search(value)
@ -281,6 +314,8 @@ def substituteRecursiveTags(inptags, conditional='',
# constructs like <STDIN>.
m = tre_search(value, m.end())
continue
# if calling map - be sure we've string:
if noRecRepl: repl = str(repl)
value = value.replace('<%s>' % rtag, repl)
#logSys.log(5, 'value now: %s' % value)
# increment reference count:

@ -36,7 +36,7 @@ from .failregex import mapTag2Opt
from .ipdns import asip
from .mytime import MyTime
from .utils import Utils
from ..helpers import getLogger, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
from ..helpers import getLogger, _merge_copy_dicts, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
# Gets the instance of the logger.
logSys = getLogger(__name__)
@ -98,6 +98,13 @@ class CallingMap(MutableMapping, object):
except:
return dict(self.data, **self.storage)
def getRawItem(self, key):
try:
value = self.storage[key]
except KeyError:
value = self.data[key]
return value
def __getitem__(self, key):
try:
value = self.storage[key]
@ -141,7 +148,7 @@ class CallingMap(MutableMapping, object):
return len(self.data)
def copy(self): # pargma: no cover
return self.__class__(self.data.copy())
return self.__class__(_merge_copy_dicts(self.data, self.storage))
class ActionBase(object):
@ -446,7 +453,7 @@ class CommandAction(ActionBase):
return value
@classmethod
def replaceTag(cls, query, aInfo, conditional='', cache=None):
def replaceTag(cls, query, aInfo, conditional='', cache=None, substRec=True):
"""Replaces tags in `query` with property values.
Parameters
@ -471,23 +478,29 @@ class CommandAction(ActionBase):
except KeyError:
pass
# first try get cached tags dictionary:
subInfo = csubkey = None
if cache is not None:
csubkey = ('subst-tags', id(aInfo), conditional)
try:
subInfo = cache[csubkey]
except KeyError:
pass
# interpolation of dictionary:
if subInfo is None:
subInfo = substituteRecursiveTags(aInfo, conditional, ignore=cls._escapedTags)
# cache if possible:
if csubkey is not None:
cache[csubkey] = subInfo
# **Important**: don't replace if calling map - contains dynamic values only,
# no recursive tags, otherwise may be vulnerable on foreign user-input:
noRecRepl = isinstance(aInfo, CallingMap)
if noRecRepl:
subInfo = aInfo
else:
# substitute tags recursive (and cache if possible),
# first try get cached tags dictionary:
subInfo = csubkey = None
if cache is not None:
csubkey = ('subst-tags', id(aInfo), conditional)
try:
subInfo = cache[csubkey]
except KeyError:
pass
# interpolation of dictionary:
if subInfo is None:
subInfo = substituteRecursiveTags(aInfo, conditional, ignore=cls._escapedTags)
# cache if possible:
if csubkey is not None:
cache[csubkey] = subInfo
# substitution callable, used by interpolation of each tag
repeatSubst = {0: 0}
def substVal(m):
tag = m.group(1) # tagname from match
value = None
@ -503,18 +516,17 @@ class CommandAction(ActionBase):
# That one needs to be escaped since its content is
# out of our control
value = cls.escapeTag(value)
# possible contains tags:
if '<' in value:
repeatSubst[0] = 1
# replacement for tag:
return value
# interpolation of query:
count = MAX_TAG_REPLACE_COUNT + 1
while True:
repeatSubst[0] = 0
value = TAG_CRE.sub(substVal, query)
# **Important**: no recursive replacement for tags from calling map (properties only):
if noRecRepl: break
# possible recursion ?
if not repeatSubst or value == query: break
if value == query or '<' not in value: break
query = value
count -= 1
if count <= 0:

@ -157,6 +157,37 @@ class CommandActionTest(LogCaptureTestCase):
self.assertEqual(substituteRecursiveTags({'A': 'A <IP<PREF>HOST> B IP<PREF> C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}),
{'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'})
def testSubstRec_DontTouchUnusedCallable(self):
cm = CallingMap({
'A':0,
'B':lambda self: '<A><A>',
'C':'',
'D':''
})
#
# should raise no exceptions:
substituteRecursiveTags(cm)
# add exception tag:
cm['C'] = lambda self,i=0: 5 // int(self['A']) # raise error by access
# test direct get of callable (should raise an error):
self.assertRaises(ZeroDivisionError, lambda: cm['C'])
# should raise no exceptions (tag "C" still unused):
substituteRecursiveTags(cm)
# add reference to "broken" tag:
cm['D'] = 'test=<C>'
# should raise an exception (BOOM by replacement of tag "D" recursive):
self.assertRaises(ZeroDivisionError, lambda: substituteRecursiveTags(cm))
#
# should raise no exceptions:
self.assertEqual(self.__action.replaceTag('test=<A>', cm), "test=0")
# **Important**: recursive replacement of dynamic data from calling map should be prohibited,
# otherwise may be vulnerable on foreign user-input:
self.assertEqual(self.__action.replaceTag('test=<A>--<B>--<A>', cm), "test=0--<A><A>--0")
# should raise an exception (BOOM by replacement of tag "C"):
self.assertRaises(ZeroDivisionError, lambda: self.__action.replaceTag('test=<C>', cm))
# should raise no exceptions (replaces tag "D" only):
self.assertEqual(self.__action.replaceTag('<D>', cm), "test=<C>")
def testReplaceTag(self):
aInfo = {
'HOST': "192.0.2.0",

Loading…
Cancel
Save