|
|
|
@ -25,6 +25,8 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
|
|
|
|
|
__license__ = "GPL"
|
|
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
from server.filterpoll import FilterPoll
|
|
|
|
|
from server.filter import FileFilter
|
|
|
|
|
from server.failmanager import FailManager
|
|
|
|
@ -94,23 +96,34 @@ class GetFailures(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
|
"""Call after every test case."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _assertEqualEntries(self, found, output):
|
|
|
|
|
"""Little helper to unify comparisons with the target entries
|
|
|
|
|
|
|
|
|
|
and report helpful failure reports instead of millions of seconds ;)
|
|
|
|
|
"""
|
|
|
|
|
self.assertEqual(found[:2], output[:2])
|
|
|
|
|
found_time, output_time = \
|
|
|
|
|
time.localtime(found[2]),\
|
|
|
|
|
time.localtime(output[2])
|
|
|
|
|
self.assertEqual(found_time, output_time)
|
|
|
|
|
|
|
|
|
|
def testGetFailures01(self):
|
|
|
|
|
output = ('193.168.0.128', 3, 1124013599.0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.__filter.addLogPath(GetFailures.FILENAME_01)
|
|
|
|
|
self.__filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
|
|
|
|
|
|
|
|
|
|
self.__filter.getFailures(GetFailures.FILENAME_01)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ticket = self.__filter.failManager.toBan()
|
|
|
|
|
|
|
|
|
|
attempts = ticket.getAttempt()
|
|
|
|
|
date = ticket.getTime()
|
|
|
|
|
ip = ticket.getIP()
|
|
|
|
|
found = (ip, attempts, date)
|
|
|
|
|
|
|
|
|
|
self.assertEqual(found, output)
|
|
|
|
|
|
|
|
|
|
self._assertEqualEntries(found, output)
|
|
|
|
|
|
|
|
|
|
def testGetFailures02(self):
|
|
|
|
|
output = ('141.3.81.106', 4, 1124013539.0)
|
|
|
|
@ -127,7 +140,7 @@ class GetFailures(unittest.TestCase):
|
|
|
|
|
ip = ticket.getIP()
|
|
|
|
|
found = (ip, attempts, date)
|
|
|
|
|
|
|
|
|
|
self.assertEqual(found, output)
|
|
|
|
|
self._assertEqualEntries(found, output)
|
|
|
|
|
|
|
|
|
|
def testGetFailures03(self):
|
|
|
|
|
output = ('203.162.223.135', 6, 1124013544.0)
|
|
|
|
@ -144,7 +157,7 @@ class GetFailures(unittest.TestCase):
|
|
|
|
|
ip = ticket.getIP()
|
|
|
|
|
found = (ip, attempts, date)
|
|
|
|
|
|
|
|
|
|
self.assertEqual(found, output)
|
|
|
|
|
self._assertEqualEntries(found, output)
|
|
|
|
|
|
|
|
|
|
def testGetFailures04(self):
|
|
|
|
|
output = [('212.41.96.186', 4, 1124013600.0),
|
|
|
|
@ -182,7 +195,7 @@ class GetFailures(unittest.TestCase):
|
|
|
|
|
ip = ticket.getIP()
|
|
|
|
|
found = (ip, attempts, date)
|
|
|
|
|
|
|
|
|
|
self.assertEqual(found, output)
|
|
|
|
|
self._assertEqualEntries(found, output)
|
|
|
|
|
|
|
|
|
|
def testGetFailuresIgnoreRegex(self):
|
|
|
|
|
output = ('141.3.81.106', 8, 1124013541.0)
|
|
|
|
|