BF: usedns=no was not working at all

it was not adding any detected address, IP or not to the list of failed attempts
This commit also adds appropriate unittest
pull/43/merge
Yaroslav Halchenko 2012-06-15 23:43:11 -04:00
parent 971406f722
commit 3989d24967
3 changed files with 70 additions and 38 deletions

View File

@ -54,7 +54,7 @@ class Filter(JailThread):
# Initialize the filter object with default values.
# @param jail the jail object
def __init__(self, jail):
def __init__(self, jail, useDns='warn'):
JailThread.__init__(self)
## The jail which contains this filter.
self.jail = jail
@ -65,7 +65,7 @@ class Filter(JailThread):
## The regular expression list with expressions to ignore.
self.__ignoreRegex = list()
## Use DNS setting
self.__useDns = "warn"
self.setUseDns(useDns)
## The amount of time to look back.
self.__findTime = 6000
## The ignore IP list.
@ -149,6 +149,13 @@ class Filter(JailThread):
# @param value the usedns mode
def setUseDns(self, value):
if isinstance(value, bool):
value = {True: 'yes', False: 'no'}[value]
value = value.lower() # must be a string by now
if not (value in ('yes', 'no', 'warn')):
logSys.error("Incorrect value %r specified for usedns. "
"Using safe 'no'" % (value,))
value = 'no'
logSys.debug("Setting usedns = %s for %s" % (value, self))
self.__useDns = value
@ -373,8 +380,8 @@ class Filter(JailThread):
class FileFilter(Filter):
def __init__(self, jail):
Filter.__init__(self, jail)
def __init__(self, jail, **kwargs):
Filter.__init__(self, jail, **kwargs)
## The log file path.
self.__logPath = []
@ -589,25 +596,24 @@ class DNSUtils:
def textToIp(text, useDns):
""" Return the IP of DNS found in a given text.
"""
if useDns == "no":
return None
else:
ipList = list()
# Search for plain IP
plainIP = DNSUtils.searchIP(text)
if not plainIP is None:
plainIPStr = plainIP.group(0)
if DNSUtils.isValidIP(plainIPStr):
ipList.append(plainIPStr)
if not ipList:
# Try to get IP from possible DNS
ip = DNSUtils.dnsToIp(text)
for e in ip:
ipList.append(e)
if useDns == "warn":
logSys.warning("Determined IP using DNS Reverse Lookup: %s = %s",
text, ipList)
return ipList
ipList = list()
# Search for plain IP
plainIP = DNSUtils.searchIP(text)
if not plainIP is None:
plainIPStr = plainIP.group(0)
if DNSUtils.isValidIP(plainIPStr):
ipList.append(plainIPStr)
# If we are allowed to resolve -- give it a try if nothing was found
if useDns in ("yes", "warn") and not ipList:
# Try to get IP from possible DNS
ip = DNSUtils.dnsToIp(text)
ipList.extend(ip)
if ip and useDns == "warn":
logSys.warning("Determined IP using DNS Reverse Lookup: %s = %s",
text, ipList)
return ipList
textToIp = staticmethod(textToIp)
#@staticmethod

View File

@ -0,0 +1,2 @@
Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2
Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2

View File

@ -84,14 +84,11 @@ class GetFailures(unittest.TestCase):
FILENAME_02 = "testcases/files/testcase02.log"
FILENAME_03 = "testcases/files/testcase03.log"
FILENAME_04 = "testcases/files/testcase04.log"
FILENAME_USEDNS = "testcases/files/testcase-usedns.log"
def setUp(self):
"""Call before every test case."""
# TODO: RF tests so we could do parametric testing easily.
# useDns='warn' and useDns='yes'
# both must be tested appropriately but ATM more
# critical is to assure correct behavior of 'no'
self.__filter = FileFilter(None, useDns='warn')
self.__filter = FileFilter(None)
self.__filter.setActive(True)
# TODO Test this
#self.__filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
@ -111,14 +108,14 @@ class GetFailures(unittest.TestCase):
time.localtime(output[2])
self.assertEqual(found_time, output_time)
if len(output) > 3: # match matches
self.assertEqual(found[3], output[3])
self.assertEqual(repr(found[3]), repr(output[3]))
def _assertCorrectLastAtempt(self, output):
def _assertCorrectLastAtempt(self, filter_, output):
"""Additional helper to wrap most common test case
Test current filter to contain target ticket
Test filter to contain target ticket
"""
ticket = self.__filter.failManager.toBan()
ticket = filter_.failManager.toBan()
attempts = ticket.getAttempt()
date = ticket.getTime()
@ -136,7 +133,7 @@ class GetFailures(unittest.TestCase):
self.__filter.addLogPath(GetFailures.FILENAME_01)
self.__filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
self.__filter.getFailures(GetFailures.FILENAME_01)
self._assertCorrectLastAtempt(output)
self._assertCorrectLastAtempt(self.__filter, output)
def testGetFailures02(self):
@ -147,7 +144,7 @@ class GetFailures(unittest.TestCase):
self.__filter.addLogPath(GetFailures.FILENAME_02)
self.__filter.addFailRegex("Failed .* from <HOST>")
self.__filter.getFailures(GetFailures.FILENAME_02)
self._assertCorrectLastAtempt(output)
self._assertCorrectLastAtempt(self.__filter, output)
def testGetFailures03(self):
output = ('203.162.223.135', 6, 1124013544.0)
@ -155,7 +152,7 @@ class GetFailures(unittest.TestCase):
self.__filter.addLogPath(GetFailures.FILENAME_03)
self.__filter.addFailRegex("error,relay=<HOST>,.*550 User unknown")
self.__filter.getFailures(GetFailures.FILENAME_03)
self._assertCorrectLastAtempt(output)
self._assertCorrectLastAtempt(self.__filter, output)
def testGetFailures04(self):
output = [('212.41.96.186', 4, 1124013600.0),
@ -167,10 +164,37 @@ class GetFailures(unittest.TestCase):
try:
for i, out in enumerate(output):
self._assertCorrectLastAtempt(out)
self._assertCorrectLastAtempt(self.__filter, out)
except FailManagerEmpty:
pass
def testGetFailuresUseDNS(self):
# We should still catch failures with usedns = no ;-)
output_yes = ('192.0.43.10', 2, 1124013539.0,
['Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2\n',
'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
output_no = ('192.0.43.10', 1, 1124013539.0,
['Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:192.0.43.10 port 51332 ssh2\n'])
# Actually no exception would be raised -- it will be just set to 'no'
#self.assertRaises(ValueError,
# FileFilter, None, useDns='wrong_value_for_useDns')
for useDns, output in (('yes', output_yes),
('no', output_no),
('warn', output_yes)):
filter_ = FileFilter(None, useDns=useDns)
filter_.setActive(True)
filter_.failManager.setMaxRetry(1) # we might have just few failures
filter_.addLogPath(GetFailures.FILENAME_USEDNS)
filter_.addFailRegex("Failed .* from <HOST>")
filter_.getFailures(GetFailures.FILENAME_USEDNS)
self._assertCorrectLastAtempt(filter_, output)
def testGetFailuresMultiRegex(self):
output = ('141.3.81.106', 8, 1124013541.0)
@ -178,7 +202,7 @@ class GetFailures(unittest.TestCase):
self.__filter.addFailRegex("Failed .* from <HOST>")
self.__filter.addFailRegex("Accepted .* from <HOST>")
self.__filter.getFailures(GetFailures.FILENAME_02)
self._assertCorrectLastAtempt(output)
self._assertCorrectLastAtempt(self.__filter, output)
def testGetFailuresIgnoreRegex(self):
output = ('141.3.81.106', 8, 1124013541.0)
@ -196,7 +220,7 @@ class DNSUtilsTests(unittest.TestCase):
def testUseDns(self):
res = DNSUtils.textToIp('www.example.com', 'no')
self.assertEqual(res, None)
self.assertEqual(res, [])
res = DNSUtils.textToIp('www.example.com', 'warn')
self.assertEqual(res, ['192.0.43.10'])
res = DNSUtils.textToIp('www.example.com', 'yes')