code review, increase coverage, etc.

pull/1716/head
sebres 2017-03-13 21:56:06 +01:00
parent 5030e3a122
commit ccfd1ccb2d
4 changed files with 56 additions and 21 deletions

View File

@ -29,24 +29,12 @@ import os
from ConfigParser import NoOptionError, NoSectionError from ConfigParser import NoOptionError, NoSectionError
from .configparserinc import sys, SafeConfigParserWithIncludes, logLevel from .configparserinc import sys, SafeConfigParserWithIncludes, logLevel
from ..helpers import getLogger, substituteRecursiveTags from ..helpers import getLogger, _merge_dicts, substituteRecursiveTags
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
# if sys.version_info >= (3,5):
# def _merge_dicts(x, y):
# return {**x, **y}
# else:
def _merge_dicts(x, y):
r = x
if y:
r = x.copy()
r.update(y)
return r
class ConfigReader(): class ConfigReader():
"""Generic config reader class. """Generic config reader class.

View File

@ -169,6 +169,36 @@ def splitwords(s):
return [] return []
return filter(bool, map(str.strip, re.split('[ ,\n]+', s))) return filter(bool, map(str.strip, re.split('[ ,\n]+', s)))
if sys.version_info >= (3,5):
eval(compile(r'''if 1:
def _merge_dicts(x, y):
"""Helper to merge dicts.
"""
if y:
return {**x, **y}
return x
def _merge_copy_dicts(x, y):
"""Helper to merge dicts to guarantee a copy result (r is never x).
"""
return {**x, **y}
''', __file__, 'exec'))
else:
def _merge_dicts(x, y):
"""Helper to merge dicts.
"""
r = x
if y:
r = x.copy()
r.update(y)
return r
def _merge_copy_dicts(x, y):
"""Helper to merge dicts to guarantee a copy result (r is never x).
"""
r = x.copy()
if y:
r.update(y)
return r
# #
# Following "uni_decode" function unified python independent any to string converting # Following "uni_decode" function unified python independent any to string converting

View File

@ -36,7 +36,7 @@ from .failregex import mapTag2Opt
from .ipdns import asip from .ipdns import asip
from .mytime import MyTime from .mytime import MyTime
from .utils import Utils from .utils import Utils
from ..helpers import getLogger, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT from ..helpers import getLogger, _merge_copy_dicts, substituteRecursiveTags, TAG_CRE, MAX_TAG_REPLACE_COUNT
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
@ -148,7 +148,7 @@ class CallingMap(MutableMapping, object):
return len(self.data) return len(self.data)
def copy(self): # pargma: no cover def copy(self): # pargma: no cover
return self.__class__(self.data.copy()) return self.__class__(_merge_copy_dicts(self.data, self.storage))
class ActionBase(object): class ActionBase(object):

View File

@ -158,18 +158,35 @@ class CommandActionTest(LogCaptureTestCase):
{'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}) {'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'})
def testSubstRec_DontTouchUnusedCallable(self): def testSubstRec_DontTouchUnusedCallable(self):
cm = CallingMap( cm = CallingMap({
A=0, 'A':0,
B=lambda self: '<A><A>', 'B':lambda self: '<A><A>',
C=lambda self,i=0: 5 // int(self['A']) # raise error by access 'C':'',
) 'D':''
})
#
# should raise no exceptions:
substituteRecursiveTags(cm)
# add exception tag:
cm['C'] = lambda self,i=0: 5 // int(self['A']) # raise error by access
# test direct get of callable (should raise an error):
self.assertRaises(ZeroDivisionError, lambda: cm['C'])
# should raise no exceptions (tag "C" still unused):
substituteRecursiveTags(cm)
# add reference to "broken" tag:
cm['D'] = 'test=<C>'
# should raise an exception (BOOM by replacement of tag "D" recursive):
self.assertRaises(ZeroDivisionError, lambda: substituteRecursiveTags(cm))
#
# should raise no exceptions: # should raise no exceptions:
self.assertEqual(self.__action.replaceTag('test=<A>', cm), "test=0") self.assertEqual(self.__action.replaceTag('test=<A>', cm), "test=0")
# **Important**: recursive replacement of dynamic data from calling map should be prohibited, # **Important**: recursive replacement of dynamic data from calling map should be prohibited,
# otherwise may be vulnerable on foreign user-input: # otherwise may be vulnerable on foreign user-input:
self.assertEqual(self.__action.replaceTag('test=<A>--<B>--<A>', cm), "test=0--<A><A>--0") self.assertEqual(self.__action.replaceTag('test=<A>--<B>--<A>', cm), "test=0--<A><A>--0")
# should raise an exception: # should raise an exception (BOOM by replacement of tag "C"):
self.assertRaises(ZeroDivisionError, lambda: self.__action.replaceTag('test=<C>', cm)) self.assertRaises(ZeroDivisionError, lambda: self.__action.replaceTag('test=<C>', cm))
# should raise no exceptions (replaces tag "D" only):
self.assertEqual(self.__action.replaceTag('<D>', cm), "test=<C>")
def testReplaceTag(self): def testReplaceTag(self):
aInfo = { aInfo = {