small amend with several fixes and test coverage

pull/1698/head
sebres 8 years ago
parent 9ebf70cd6a
commit a8c0cec4ac

@ -233,9 +233,11 @@ def substituteRecursiveTags(inptags, conditional='',
Dictionary of tags(keys) and their values, with tags
within the values recursively replaced.
"""
#logSys = getLogger("fail2ban")
tre_search = TAG_CRE.search
# copy return tags dict to prevent modifying of inptags:
tags = inptags.copy()
t = TAG_CRE
# init:
ignore = set(ignore)
done = set()
# repeat substitution while embedded-recursive (repFlag is True)
@ -247,48 +249,49 @@ def substituteRecursiveTags(inptags, conditional='',
if tag in ignore or tag in done: continue
value = orgval = str(tags[tag])
# search and replace all tags within value, that can be interpolated using other tags:
m = t.search(value)
m = tre_search(value)
refCounts = {}
#logSys.log(5, 'TAG: %s, value: %s' % (tag, value))
while m:
found_tag = m.group(1)
# found replacement tag:
rtag = m.group(1)
# don't replace tags that should be currently ignored (pre-replacement):
if found_tag in ignore:
m = t.search(value, m.end())
if rtag in ignore:
m = tre_search(value, m.end())
continue
#logSys.log(5, 'found: %s' % found_tag)
if found_tag == tag or refCounts.get(found_tag, 1) > MAX_TAG_REPLACE_COUNT:
#logSys.log(5, 'found: %s' % rtag)
if rtag == tag or refCounts.get(rtag, 1) > MAX_TAG_REPLACE_COUNT:
# recursive definitions are bad
#logSys.log(5, 'recursion fail tag: %s value: %s' % (tag, value) )
raise ValueError(
"properties contain self referencing definitions "
"and cannot be resolved, fail tag: %s, found: %s in %s, value: %s" %
(tag, found_tag, refCounts, value))
(tag, rtag, refCounts, value))
repl = None
if conditional:
repl = tags.get(found_tag + '?' + conditional)
repl = tags.get(rtag + '?' + conditional)
if repl is None:
repl = tags.get(found_tag)
repl = tags.get(rtag)
# try to find tag using additional replacement (callable):
if repl is None and addrepl is not None:
repl = addrepl(found_tag)
repl = addrepl(rtag)
if repl is None:
# Missing tags - just continue on searching after end of match
# Missing tags are ok - cInfo can contain aInfo elements like <HOST> and valid shell
# constructs like <STDIN>.
m = t.search(value, m.end())
m = tre_search(value, m.end())
continue
value = value.replace('<%s>' % found_tag, repl)
value = value.replace('<%s>' % rtag, repl)
#logSys.log(5, 'value now: %s' % value)
# increment reference count:
refCounts[found_tag] = refCounts.get(found_tag, 0) + 1
refCounts[rtag] = refCounts.get(rtag, 0) + 1
# the next match for replace:
m = t.search(value, m.start())
m = tre_search(value, m.start())
#logSys.log(5, 'TAG: %s, newvalue: %s' % (tag, value))
# was substituted?
if orgval != value:
# check still contains any tag - should be repeated (possible embedded-recursive substitution):
if t.search(value):
if tre_search(value):
repFlag = True
tags[tag] = value
# no more sub tags (and no possible composite), add this tag to done set (just to be faster):

@ -252,6 +252,16 @@ class CommandAction(ActionBase):
# set:
self.__dict__[name] = value
def __delattr__(self, name):
if not name.startswith('_'):
# parameters changed - clear properties and substitution cache:
self.__properties = None
self.__substCache.clear()
#self._logSys.debug("Unset action %r %s", self._name, name)
self._logSys.debug(" Unset %s", name)
# del:
del self.__dict__[name]
@property
def _properties(self):
"""A dictionary of the actions properties.
@ -404,15 +414,19 @@ class CommandAction(ActionBase):
# use cache if allowed:
if cache is not None:
ckey = (query, conditional)
value = cache.get(ckey)
if value is not None:
return value
try:
return cache[ckey]
except KeyError:
pass
# first try get cached tags dictionary:
subInfo = csubkey = None
if cache is not None:
csubkey = ('subst-tags', id(aInfo), conditional)
subInfo = cache.get(csubkey)
try:
subInfo = cache[csubkey]
except KeyError:
pass
# interpolation of dictionary:
if subInfo is None:
subInfo = substituteRecursiveTags(aInfo, conditional, ignore=cls._escapedTags)
@ -424,7 +438,7 @@ class CommandAction(ActionBase):
cache[csubkey] = subInfo
# substitution callable, used by interpolation of each tag
repeatSubst = {}
repeatSubst = {0: 0}
def substVal(m):
tag = m.group(1) # tagname from match
value = None
@ -441,19 +455,19 @@ class CommandAction(ActionBase):
value = cls.escapeTag(value)
# possible contains tags:
if '<' in value:
repeatSubst[1] = True
repeatSubst[0] = 1
return value
# interpolation of query:
count = MAX_TAG_REPLACE_COUNT + 1
while True:
repeatSubst = {}
repeatSubst[0] = 0
value = TAG_CRE.sub(substVal, query)
# possible recursion ?
if not repeatSubst or value == query: break
query = value
count -= 1
if count <= 0: # pragma: no cover - almost impossible (because resolved above)
if count <= 0:
raise ValueError(
"unexpected too long replacement interpolation, "
"possible self referencing definitions in query: %s" % (query,))

@ -40,11 +40,19 @@ class CommandActionTest(LogCaptureTestCase):
def setUp(self):
"""Call before every test case."""
self.__action = CommandAction(None, "Test")
LogCaptureTestCase.setUp(self)
self.__action = CommandAction(None, "Test")
# prevent execute stop if start fails (or event not started at all):
self.__action_started = False
orgstart = self.__action.start
def _action_start():
self.__action_started = True
return orgstart()
self.__action.start = _action_start
def tearDown(self):
"""Call after every test case."""
if self.__action_started:
self.__action.stop()
LogCaptureTestCase.tearDown(self)
@ -196,6 +204,26 @@ class CommandActionTest(LogCaptureTestCase):
self.__action.replaceTag("abc",
CallingMap(matches=lambda: int("a"))), "abc")
def testReplaceTagSelfRecursion(self):
setattr(self.__action, 'a', "<a")
setattr(self.__action, 'b', "c>")
setattr(self.__action, 'b?family=inet6', "b>")
setattr(self.__action, 'ac', "<a><b>")
setattr(self.__action, 'ab', "<ac>")
setattr(self.__action, 'x?family=inet6', "")
# produce self-referencing properties except:
self.assertRaisesRegexp(ValueError, r"properties contain self referencing definitions",
lambda: self.__action.replaceTag("<a><b>",
self.__action._properties, conditional="family=inet4")
)
# remore self-referencing in props:
delattr(self.__action, 'ac')
# produce self-referencing query except:
self.assertRaisesRegexp(ValueError, r"possible self referencing definitions in query",
lambda: self.__action.replaceTag("<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x>>>>>>>>>>>>>>>>>>>>>",
self.__action._properties, conditional="family=inet6")
)
def testReplaceTagConditionalCached(self):
setattr(self.__action, 'abc', "123")
setattr(self.__action, 'abc?family=inet4', "345")

Loading…
Cancel
Save