amend for gh-1419: tags substitution bug - wrong recognition of cyclic recursion, new test cases covered this

pull/1452/head
sebres 2016-05-25 21:33:26 +02:00
parent cce63926ce
commit a80043ce80
2 changed files with 69 additions and 15 deletions

View File

@ -55,6 +55,9 @@ _RETCODE_HINTS = {
signame = dict((num, name) signame = dict((num, name)
for name, num in signal.__dict__.iteritems() if name.startswith("SIG")) for name, num in signal.__dict__.iteritems() if name.startswith("SIG"))
# max tag replacement count:
MAX_TAG_REPLACE_COUNT = 10
class CallingMap(MutableMapping): class CallingMap(MutableMapping):
"""A Mapping type which returns the result of callable values. """A Mapping type which returns the result of callable values.
@ -390,26 +393,22 @@ class CommandAction(ActionBase):
""" """
t = re.compile(r'<([^ <>]+)>') t = re.compile(r'<([^ <>]+)>')
# repeat substitution while embedded-recursive (repFlag is True) # repeat substitution while embedded-recursive (repFlag is True)
done = cls._escapedTags.copy()
while True: while True:
repFlag = False repFlag = False
# substitute each value: # substitute each value:
for tag in tags.iterkeys(): for tag in tags.iterkeys():
if tag in cls._escapedTags: # ignore escaped or already done:
# Escaped so won't match if tag in done: continue
continue
value = str(tags[tag]) value = str(tags[tag])
# search and replace all tags within value, that can be interpolated using other tags: # search and replace all tags within value, that can be interpolated using other tags:
m = t.search(value) m = t.search(value)
done = {} refCounts = {}
last_found = tag
#logSys.log(5, 'TAG: %s, value: %s' % (tag, value)) #logSys.log(5, 'TAG: %s, value: %s' % (tag, value))
while m: while m:
found_tag = m.group(1) found_tag = m.group(1)
#logSys.log(5, 'found: %s' % found_tag) #logSys.log(5, 'found: %s' % found_tag)
curdone = done.get(last_found) if found_tag == tag or refCounts.get(found_tag, 1) > MAX_TAG_REPLACE_COUNT:
if curdone is None:
done[last_found] = curdone = []
if found_tag == tag or found_tag in curdone:
# recursive definitions are bad # recursive definitions are bad
#logSys.log(5, 'recursion fail tag: %s value: %s' % (tag, value) ) #logSys.log(5, 'recursion fail tag: %s value: %s' % (tag, value) )
return False return False
@ -421,8 +420,9 @@ class CommandAction(ActionBase):
continue continue
value = value.replace('<%s>' % found_tag , tags[found_tag]) value = value.replace('<%s>' % found_tag , tags[found_tag])
#logSys.log(5, 'value now: %s' % value) #logSys.log(5, 'value now: %s' % value)
curdone.append(found_tag) # increment reference count:
last_found = found_tag refCounts[found_tag] = refCounts.get(found_tag, 0) + 1
# the next match for replace:
m = t.search(value, m.start()) m = t.search(value, m.start())
#logSys.log(5, 'TAG: %s, newvalue: %s' % (tag, value)) #logSys.log(5, 'TAG: %s, newvalue: %s' % (tag, value))
# was substituted? # was substituted?
@ -431,6 +431,9 @@ class CommandAction(ActionBase):
if t.search(value): if t.search(value):
repFlag = True repFlag = True
tags[tag] = value tags[tag] = value
# no more sub tags (and no possible composite), add this tag to done set (just to be faster):
if '<' not in value: done.add(tag)
# stop interpolation, if no replacements anymore:
if not repFlag: if not repFlag:
break break
return tags return tags

View File

@ -59,12 +59,62 @@ class CommandActionTest(LogCaptureTestCase):
# Unresolveable substition # Unresolveable substition
self.assertFalse(CommandAction.substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''})) self.assertFalse(CommandAction.substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''}))
self.assertFalse(CommandAction.substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''})) self.assertFalse(CommandAction.substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''}))
# No-recursion, just multiple replacement of tag <T>, should be successful # We need here an ordered, because the sequence of iteration is very important for this test
if OrderedDict: # we need here an ordered, because the sequence of iteration is very important for this test if OrderedDict:
self.assertEqual(CommandAction.substituteRecursiveTags( # No cyclic recursion, just multiple replacement of tag <T>, should be successful:
OrderedDict((('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>'))) self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict(
(('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'} ), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
) )
# No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict(
(('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
)
# No cyclic recursion, just multiple replacement of same tags, should be successful:
self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict((
('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
('ipmset', 'f2b-<name>'),
('name', 'any'),
('bantime', '600'),
('ipsetfamily', 'inet'),
('iptables', 'iptables <lockingopt>'),
('lockingopt', '-w'),
('chain', 'INPUT'),
('actiontype', '<multiport>'),
('multiport', '-p <protocol> -m multiport --dports <port> -m set --match-set <ipmset> src -j <blocktype>'),
('protocol', 'tcp'),
('port', 'ssh'),
('blocktype', 'REJECT',),
))
), OrderedDict((
('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('ipmset', 'f2b-any'),
('name', 'any'),
('bantime', '600'),
('ipsetfamily', 'inet'),
('iptables', 'iptables -w'),
('lockingopt', '-w'),
('chain', 'INPUT'),
('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
('protocol', 'tcp'),
('port', 'ssh'),
('blocktype', 'REJECT')
))
)
# Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
self.assertFalse(CommandAction.substituteRecursiveTags( OrderedDict((
('A', '<<B><C>>'),
('B', 'D'), ('C', 'E'),
('DE', 'cycle <A>'),
)) ))
self.assertFalse(CommandAction.substituteRecursiveTags( OrderedDict((
('DE', 'cycle <A>'),
('A', '<<B><C>>'),
('B', 'D'), ('C', 'E'),
)) ))
# missing tags are ok # missing tags are ok
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'}) self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'})
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<C> <D> <X>','X':'fun'}), {'A': '<C> <D> fun', 'X':'fun'}) self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<C> <D> <X>','X':'fun'}), {'A': '<C> <D> fun', 'X':'fun'})
@ -132,6 +182,7 @@ class CommandActionTest(LogCaptureTestCase):
CallingMap(matches=lambda: str(10))), CallingMap(matches=lambda: str(10))),
"09 10 11") "09 10 11")
def testReplaceNoTag(self):
# As tag not present, therefore callable should not be called # As tag not present, therefore callable should not be called
# Will raise ValueError if it is # Will raise ValueError if it is
self.assertEqual( self.assertEqual(