mirror of https://github.com/fail2ban/fail2ban
substituteRecursiveTags optimization + moved in helpers facilities (because currently used commonly in server and in client)
parent
3fae8a7e43
commit
a6318b159b
|
@ -28,6 +28,7 @@ import os
|
||||||
|
|
||||||
from .configreader import DefinitionInitConfigReader
|
from .configreader import DefinitionInitConfigReader
|
||||||
from ..helpers import getLogger
|
from ..helpers import getLogger
|
||||||
|
from ..server.action import CommandAction
|
||||||
|
|
||||||
# Gets the instance of the logger.
|
# Gets the instance of the logger.
|
||||||
logSys = getLogger(__name__)
|
logSys = getLogger(__name__)
|
||||||
|
@ -69,7 +70,8 @@ class ActionReader(DefinitionInitConfigReader):
|
||||||
return self._name
|
return self._name
|
||||||
|
|
||||||
def convert(self):
|
def convert(self):
|
||||||
opts = self.getCombined(ignore=('timeout', 'bantime'))
|
opts = self.getCombined(
|
||||||
|
ignore=CommandAction._escapedTags | set(('timeout', 'bantime')))
|
||||||
# type-convert only after combined (otherwise boolean converting prevents substitution):
|
# type-convert only after combined (otherwise boolean converting prevents substitution):
|
||||||
if opts.get('norestored'):
|
if opts.get('norestored'):
|
||||||
opts['norestored'] = self._convert_to_boolean(opts['norestored'])
|
opts['norestored'] = self._convert_to_boolean(opts['norestored'])
|
||||||
|
|
|
@ -29,8 +29,7 @@ import os
|
||||||
from ConfigParser import NoOptionError, NoSectionError
|
from ConfigParser import NoOptionError, NoSectionError
|
||||||
|
|
||||||
from .configparserinc import sys, SafeConfigParserWithIncludes, logLevel
|
from .configparserinc import sys, SafeConfigParserWithIncludes, logLevel
|
||||||
from ..helpers import getLogger
|
from ..helpers import getLogger, substituteRecursiveTags
|
||||||
from ..server.action import CommandAction
|
|
||||||
|
|
||||||
# Gets the instance of the logger.
|
# Gets the instance of the logger.
|
||||||
logSys = getLogger(__name__)
|
logSys = getLogger(__name__)
|
||||||
|
@ -328,6 +327,11 @@ class DefinitionInitConfigReader(ConfigReader):
|
||||||
return value.lower() in ("1", "yes", "true", "on")
|
return value.lower() in ("1", "yes", "true", "on")
|
||||||
|
|
||||||
def getCombOption(self, optname):
|
def getCombOption(self, optname):
|
||||||
|
"""Get combined definition option (as string) using pre-set and init
|
||||||
|
options as preselection (values with higher precedence as specified in section).
|
||||||
|
|
||||||
|
Can be used only after calling of getOptions.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
return self._defCache[optname]
|
return self._defCache[optname]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -352,7 +356,7 @@ class DefinitionInitConfigReader(ConfigReader):
|
||||||
n, cond = cond.groups()
|
n, cond = cond.groups()
|
||||||
ignore.add(n)
|
ignore.add(n)
|
||||||
# substiture options already specified direct:
|
# substiture options already specified direct:
|
||||||
opts = CommandAction.substituteRecursiveTags(combinedopts,
|
opts = substituteRecursiveTags(combinedopts,
|
||||||
ignore=ignore, addrepl=self.getCombOption)
|
ignore=ignore, addrepl=self.getCombOption)
|
||||||
if not opts:
|
if not opts:
|
||||||
raise ValueError('recursive tag definitions unable to be resolved')
|
raise ValueError('recursive tag definitions unable to be resolved')
|
||||||
|
|
|
@ -200,6 +200,105 @@ else:
|
||||||
raise
|
raise
|
||||||
return uni_decode(x, enc, 'replace')
|
return uni_decode(x, enc, 'replace')
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# Following facilities used for safe recursive interpolation of
|
||||||
|
# tags (<tag>) in tagged options.
|
||||||
|
#
|
||||||
|
|
||||||
|
# max tag replacement count:
|
||||||
|
MAX_TAG_REPLACE_COUNT = 10
|
||||||
|
|
||||||
|
# compiled RE for tag name (replacement name)
|
||||||
|
TAG_CRE = re.compile(r'<([^ <>]+)>')
|
||||||
|
|
||||||
|
def substituteRecursiveTags(inptags, conditional='',
|
||||||
|
ignore=(), addrepl=None
|
||||||
|
):
|
||||||
|
"""Sort out tag definitions within other tags.
|
||||||
|
Since v.0.9.2 supports embedded interpolation (see test cases for examples).
|
||||||
|
|
||||||
|
so: becomes:
|
||||||
|
a = 3 a = 3
|
||||||
|
b = <a>_3 b = 3_3
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
inptags : dict
|
||||||
|
Dictionary of tags(keys) and their values.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
dict
|
||||||
|
Dictionary of tags(keys) and their values, with tags
|
||||||
|
within the values recursively replaced.
|
||||||
|
"""
|
||||||
|
# copy return tags dict to prevent modifying of inptags:
|
||||||
|
tags = inptags.copy()
|
||||||
|
t = TAG_CRE
|
||||||
|
ignore = set(ignore)
|
||||||
|
done = set()
|
||||||
|
# repeat substitution while embedded-recursive (repFlag is True)
|
||||||
|
while True:
|
||||||
|
repFlag = False
|
||||||
|
# substitute each value:
|
||||||
|
for tag in tags.iterkeys():
|
||||||
|
# ignore escaped or already done (or in ignore list):
|
||||||
|
if tag in ignore or tag in done: continue
|
||||||
|
value = orgval = str(tags[tag])
|
||||||
|
# search and replace all tags within value, that can be interpolated using other tags:
|
||||||
|
m = t.search(value)
|
||||||
|
refCounts = {}
|
||||||
|
#logSys.log(5, 'TAG: %s, value: %s' % (tag, value))
|
||||||
|
while m:
|
||||||
|
found_tag = m.group(1)
|
||||||
|
# don't replace tags that should be currently ignored (pre-replacement):
|
||||||
|
if found_tag in ignore:
|
||||||
|
m = t.search(value, m.end())
|
||||||
|
continue
|
||||||
|
#logSys.log(5, 'found: %s' % found_tag)
|
||||||
|
if found_tag == tag or refCounts.get(found_tag, 1) > MAX_TAG_REPLACE_COUNT:
|
||||||
|
# recursive definitions are bad
|
||||||
|
#logSys.log(5, 'recursion fail tag: %s value: %s' % (tag, value) )
|
||||||
|
raise ValueError(
|
||||||
|
"properties contain self referencing definitions "
|
||||||
|
"and cannot be resolved, fail tag: %s, found: %s in %s, value: %s" %
|
||||||
|
(tag, found_tag, refCounts, value))
|
||||||
|
repl = None
|
||||||
|
if conditional:
|
||||||
|
repl = tags.get(found_tag + '?' + conditional)
|
||||||
|
if repl is None:
|
||||||
|
repl = tags.get(found_tag)
|
||||||
|
# try to find tag using additional replacement (callable):
|
||||||
|
if repl is None and addrepl is not None:
|
||||||
|
repl = addrepl(found_tag)
|
||||||
|
if repl is None:
|
||||||
|
# Missing tags - just continue on searching after end of match
|
||||||
|
# Missing tags are ok - cInfo can contain aInfo elements like <HOST> and valid shell
|
||||||
|
# constructs like <STDIN>.
|
||||||
|
m = t.search(value, m.end())
|
||||||
|
continue
|
||||||
|
value = value.replace('<%s>' % found_tag, repl)
|
||||||
|
#logSys.log(5, 'value now: %s' % value)
|
||||||
|
# increment reference count:
|
||||||
|
refCounts[found_tag] = refCounts.get(found_tag, 0) + 1
|
||||||
|
# the next match for replace:
|
||||||
|
m = t.search(value, m.start())
|
||||||
|
#logSys.log(5, 'TAG: %s, newvalue: %s' % (tag, value))
|
||||||
|
# was substituted?
|
||||||
|
if orgval != value:
|
||||||
|
# check still contains any tag - should be repeated (possible embedded-recursive substitution):
|
||||||
|
if t.search(value):
|
||||||
|
repFlag = True
|
||||||
|
tags[tag] = value
|
||||||
|
# no more sub tags (and no possible composite), add this tag to done set (just to be faster):
|
||||||
|
if '<' not in value: done.add(tag)
|
||||||
|
# stop interpolation, if no replacements anymore:
|
||||||
|
if not repFlag:
|
||||||
|
break
|
||||||
|
return tags
|
||||||
|
|
||||||
|
|
||||||
class BgService(object):
|
class BgService(object):
|
||||||
"""Background servicing
|
"""Background servicing
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ __license__ = "GPL"
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import signal
|
import signal
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
|
@ -35,7 +34,7 @@ from collections import MutableMapping
|
||||||
from .ipdns import asip
|
from .ipdns import asip
|
||||||
from .mytime import MyTime
|
from .mytime import MyTime
|
||||||
from .utils import Utils
|
from .utils import Utils
|
||||||
from ..helpers import getLogger
|
from ..helpers import getLogger, substituteRecursiveTags
|
||||||
|
|
||||||
# Gets the instance of the logger.
|
# Gets the instance of the logger.
|
||||||
logSys = getLogger(__name__)
|
logSys = getLogger(__name__)
|
||||||
|
@ -46,12 +45,6 @@ _cmd_lock = threading.Lock()
|
||||||
# Todo: make it configurable resp. automatically set, ex.: `[ -f /proc/net/if_inet6 ] && echo 'yes' || echo 'no'`:
|
# Todo: make it configurable resp. automatically set, ex.: `[ -f /proc/net/if_inet6 ] && echo 'yes' || echo 'no'`:
|
||||||
allowed_ipv6 = True
|
allowed_ipv6 = True
|
||||||
|
|
||||||
# max tag replacement count:
|
|
||||||
MAX_TAG_REPLACE_COUNT = 10
|
|
||||||
|
|
||||||
# compiled RE for tag name (replacement name)
|
|
||||||
TAG_CRE = re.compile(r'<([^ <>]+)>')
|
|
||||||
|
|
||||||
|
|
||||||
class CallingMap(MutableMapping):
|
class CallingMap(MutableMapping):
|
||||||
"""A Mapping type which returns the result of callable values.
|
"""A Mapping type which returns the result of callable values.
|
||||||
|
@ -364,93 +357,6 @@ class CommandAction(ActionBase):
|
||||||
"""
|
"""
|
||||||
return self._executeOperation('<actionreload>', 'reloading')
|
return self._executeOperation('<actionreload>', 'reloading')
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def substituteRecursiveTags(cls, inptags, conditional='',
|
|
||||||
ignore=(), addrepl=None
|
|
||||||
):
|
|
||||||
"""Sort out tag definitions within other tags.
|
|
||||||
Since v.0.9.2 supports embedded interpolation (see test cases for examples).
|
|
||||||
|
|
||||||
so: becomes:
|
|
||||||
a = 3 a = 3
|
|
||||||
b = <a>_3 b = 3_3
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
inptags : dict
|
|
||||||
Dictionary of tags(keys) and their values.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
dict
|
|
||||||
Dictionary of tags(keys) and their values, with tags
|
|
||||||
within the values recursively replaced.
|
|
||||||
"""
|
|
||||||
# copy return tags dict to prevent modifying of inptags:
|
|
||||||
tags = inptags.copy()
|
|
||||||
t = TAG_CRE
|
|
||||||
ignore = set(ignore)
|
|
||||||
done = cls._escapedTags.copy() | ignore
|
|
||||||
# repeat substitution while embedded-recursive (repFlag is True)
|
|
||||||
while True:
|
|
||||||
repFlag = False
|
|
||||||
# substitute each value:
|
|
||||||
for tag in tags.iterkeys():
|
|
||||||
# ignore escaped or already done (or in ignore list):
|
|
||||||
if tag in done: continue
|
|
||||||
value = orgval = str(tags[tag])
|
|
||||||
# search and replace all tags within value, that can be interpolated using other tags:
|
|
||||||
m = t.search(value)
|
|
||||||
refCounts = {}
|
|
||||||
#logSys.log(5, 'TAG: %s, value: %s' % (tag, value))
|
|
||||||
while m:
|
|
||||||
found_tag = m.group(1)
|
|
||||||
# don't replace tags that should be currently ignored (pre-replacement):
|
|
||||||
if found_tag in ignore:
|
|
||||||
m = t.search(value, m.end())
|
|
||||||
continue
|
|
||||||
#logSys.log(5, 'found: %s' % found_tag)
|
|
||||||
if found_tag == tag or refCounts.get(found_tag, 1) > MAX_TAG_REPLACE_COUNT:
|
|
||||||
# recursive definitions are bad
|
|
||||||
#logSys.log(5, 'recursion fail tag: %s value: %s' % (tag, value) )
|
|
||||||
raise ValueError(
|
|
||||||
"properties contain self referencing definitions "
|
|
||||||
"and cannot be resolved, fail tag: %s, found: %s in %s, value: %s" %
|
|
||||||
(tag, found_tag, refCounts, value))
|
|
||||||
repl = None
|
|
||||||
if found_tag not in cls._escapedTags:
|
|
||||||
if conditional:
|
|
||||||
repl = tags.get(found_tag + '?' + conditional)
|
|
||||||
if repl is None:
|
|
||||||
repl = tags.get(found_tag)
|
|
||||||
if repl is None and addrepl is not None:
|
|
||||||
repl = addrepl(found_tag)
|
|
||||||
if repl is None:
|
|
||||||
# Escaped or missing tags - just continue on searching after end of match
|
|
||||||
# Missing tags are ok - cInfo can contain aInfo elements like <HOST> and valid shell
|
|
||||||
# constructs like <STDIN>.
|
|
||||||
m = t.search(value, m.end())
|
|
||||||
continue
|
|
||||||
value = value.replace('<%s>' % found_tag, repl)
|
|
||||||
#logSys.log(5, 'value now: %s' % value)
|
|
||||||
# increment reference count:
|
|
||||||
refCounts[found_tag] = refCounts.get(found_tag, 0) + 1
|
|
||||||
# the next match for replace:
|
|
||||||
m = t.search(value, m.start())
|
|
||||||
#logSys.log(5, 'TAG: %s, newvalue: %s' % (tag, value))
|
|
||||||
# was substituted?
|
|
||||||
if orgval != value:
|
|
||||||
# check still contains any tag - should be repeated (possible embedded-recursive substitution):
|
|
||||||
if t.search(value):
|
|
||||||
repFlag = True
|
|
||||||
tags[tag] = value
|
|
||||||
# no more sub tags (and no possible composite), add this tag to done set (just to be faster):
|
|
||||||
if '<' not in value: done.add(tag)
|
|
||||||
# stop interpolation, if no replacements anymore:
|
|
||||||
if not repFlag:
|
|
||||||
break
|
|
||||||
return tags
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def escapeTag(value):
|
def escapeTag(value):
|
||||||
"""Escape characters which may be used for command injection.
|
"""Escape characters which may be used for command injection.
|
||||||
|
@ -501,7 +407,7 @@ class CommandAction(ActionBase):
|
||||||
return string
|
return string
|
||||||
# replace:
|
# replace:
|
||||||
string = query
|
string = query
|
||||||
aInfo = cls.substituteRecursiveTags(aInfo, conditional)
|
aInfo = substituteRecursiveTags(aInfo, conditional, ignore=cls._escapedTags)
|
||||||
for tag in aInfo:
|
for tag in aInfo:
|
||||||
if "<%s>" % tag in query:
|
if "<%s>" % tag in query:
|
||||||
value = aInfo.get(tag + '?' + conditional)
|
value = aInfo.get(tag + '?' + conditional)
|
||||||
|
|
|
@ -29,7 +29,7 @@ import tempfile
|
||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from ..server.action import CommandAction, CallingMap
|
from ..server.action import CommandAction, CallingMap, substituteRecursiveTags
|
||||||
from ..server.actions import OrderedDict
|
from ..server.actions import OrderedDict
|
||||||
from ..server.utils import Utils
|
from ..server.utils import Utils
|
||||||
|
|
||||||
|
@ -56,30 +56,30 @@ class CommandActionTest(LogCaptureTestCase):
|
||||||
}
|
}
|
||||||
# Recursion is bad
|
# Recursion is bad
|
||||||
self.assertRaises(ValueError,
|
self.assertRaises(ValueError,
|
||||||
lambda: CommandAction.substituteRecursiveTags({'A': '<A>'}))
|
lambda: substituteRecursiveTags({'A': '<A>'}))
|
||||||
self.assertRaises(ValueError,
|
self.assertRaises(ValueError,
|
||||||
lambda: CommandAction.substituteRecursiveTags({'A': '<B>', 'B': '<A>'}))
|
lambda: substituteRecursiveTags({'A': '<B>', 'B': '<A>'}))
|
||||||
self.assertRaises(ValueError,
|
self.assertRaises(ValueError,
|
||||||
lambda: CommandAction.substituteRecursiveTags({'A': '<B>', 'B': '<C>', 'C': '<A>'}))
|
lambda: substituteRecursiveTags({'A': '<B>', 'B': '<C>', 'C': '<A>'}))
|
||||||
# Unresolveable substition
|
# Unresolveable substition
|
||||||
self.assertRaises(ValueError,
|
self.assertRaises(ValueError,
|
||||||
lambda: CommandAction.substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''}))
|
lambda: substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''}))
|
||||||
self.assertRaises(ValueError,
|
self.assertRaises(ValueError,
|
||||||
lambda: CommandAction.substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''}))
|
lambda: substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''}))
|
||||||
# We need here an ordered, because the sequence of iteration is very important for this test
|
# We need here an ordered, because the sequence of iteration is very important for this test
|
||||||
if OrderedDict:
|
if OrderedDict:
|
||||||
# No cyclic recursion, just multiple replacement of tag <T>, should be successful:
|
# No cyclic recursion, just multiple replacement of tag <T>, should be successful:
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict(
|
self.assertEqual(substituteRecursiveTags( OrderedDict(
|
||||||
(('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
|
(('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
|
||||||
), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
|
), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
|
||||||
)
|
)
|
||||||
# No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
|
# No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict(
|
self.assertEqual(substituteRecursiveTags( OrderedDict(
|
||||||
(('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
|
(('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
|
||||||
), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
|
), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
|
||||||
)
|
)
|
||||||
# No cyclic recursion, just multiple replacement of same tags, should be successful:
|
# No cyclic recursion, just multiple replacement of same tags, should be successful:
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict((
|
self.assertEqual(substituteRecursiveTags( OrderedDict((
|
||||||
('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
|
('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
|
||||||
('ipmset', 'f2b-<name>'),
|
('ipmset', 'f2b-<name>'),
|
||||||
('name', 'any'),
|
('name', 'any'),
|
||||||
|
@ -111,42 +111,42 @@ class CommandActionTest(LogCaptureTestCase):
|
||||||
))
|
))
|
||||||
)
|
)
|
||||||
# Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
|
# Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
|
||||||
self.assertRaises(ValueError, lambda: CommandAction.substituteRecursiveTags( OrderedDict((
|
self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
|
||||||
('A', '<<B><C>>'),
|
('A', '<<B><C>>'),
|
||||||
('B', 'D'), ('C', 'E'),
|
('B', 'D'), ('C', 'E'),
|
||||||
('DE', 'cycle <A>'),
|
('DE', 'cycle <A>'),
|
||||||
)) ))
|
)) ))
|
||||||
self.assertRaises(ValueError, lambda: CommandAction.substituteRecursiveTags( OrderedDict((
|
self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
|
||||||
('DE', 'cycle <A>'),
|
('DE', 'cycle <A>'),
|
||||||
('A', '<<B><C>>'),
|
('A', '<<B><C>>'),
|
||||||
('B', 'D'), ('C', 'E'),
|
('B', 'D'), ('C', 'E'),
|
||||||
)) ))
|
)) ))
|
||||||
|
|
||||||
# missing tags are ok
|
# missing tags are ok
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'})
|
self.assertEqual(substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'})
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<C> <D> <X>','X':'fun'}), {'A': '<C> <D> fun', 'X':'fun'})
|
self.assertEqual(substituteRecursiveTags({'A': '<C> <D> <X>','X':'fun'}), {'A': '<C> <D> fun', 'X':'fun'})
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<C> <B>', 'B': 'cool'}), {'A': '<C> cool', 'B': 'cool'})
|
self.assertEqual(substituteRecursiveTags({'A': '<C> <B>', 'B': 'cool'}), {'A': '<C> cool', 'B': 'cool'})
|
||||||
# Escaped tags should be ignored
|
# Escaped tags should be ignored
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<matches> <B>', 'B': 'cool'}), {'A': '<matches> cool', 'B': 'cool'})
|
self.assertEqual(substituteRecursiveTags({'A': '<matches> <B>', 'B': 'cool'}), {'A': '<matches> cool', 'B': 'cool'})
|
||||||
# Multiple stuff on same line is ok
|
# Multiple stuff on same line is ok
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP> evilperson=<honeypot>', 'honeypot': 'pokie', 'ignoreregex': ''}),
|
self.assertEqual(substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP> evilperson=<honeypot>', 'honeypot': 'pokie', 'ignoreregex': ''}),
|
||||||
{ 'failregex': "to=pokie fromip=<IP> evilperson=pokie",
|
{ 'failregex': "to=pokie fromip=<IP> evilperson=pokie",
|
||||||
'honeypot': 'pokie',
|
'honeypot': 'pokie',
|
||||||
'ignoreregex': '',
|
'ignoreregex': '',
|
||||||
})
|
})
|
||||||
# rest is just cool
|
# rest is just cool
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags(aInfo),
|
self.assertEqual(substituteRecursiveTags(aInfo),
|
||||||
{ 'HOST': "192.0.2.0",
|
{ 'HOST': "192.0.2.0",
|
||||||
'ABC': '123 192.0.2.0',
|
'ABC': '123 192.0.2.0',
|
||||||
'xyz': '890 123 192.0.2.0',
|
'xyz': '890 123 192.0.2.0',
|
||||||
})
|
})
|
||||||
# obscure embedded case
|
# obscure embedded case
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4'}),
|
self.assertEqual(substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4'}),
|
||||||
{'A': '<IPV4HOST>', 'PREF': 'IPV4'})
|
{'A': '<IPV4HOST>', 'PREF': 'IPV4'})
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'}),
|
self.assertEqual(substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'}),
|
||||||
{'A': '1.2.3.4', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'})
|
{'A': '1.2.3.4', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'})
|
||||||
# more embedded within a string and two interpolations
|
# more embedded within a string and two interpolations
|
||||||
self.assertEqual(CommandAction.substituteRecursiveTags({'A': 'A <IP<PREF>HOST> B IP<PREF> C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}),
|
self.assertEqual(substituteRecursiveTags({'A': 'A <IP<PREF>HOST> B IP<PREF> C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}),
|
||||||
{'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'})
|
{'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'})
|
||||||
|
|
||||||
def testReplaceTag(self):
|
def testReplaceTag(self):
|
||||||
|
|
Loading…
Reference in New Issue