|
|
|
@ -302,12 +302,18 @@ class DatabaseTest(LogCaptureTestCase):
|
|
|
|
|
def testGetBansMerged_MaxEntries(self):
|
|
|
|
|
self.testAddJail()
|
|
|
|
|
maxEntries = 2
|
|
|
|
|
failures = ["abc\n", "123\n", "ABC\n", "1234\n"]
|
|
|
|
|
failures = [
|
|
|
|
|
{"matches": ["abc\n"], "user": set(['test'])},
|
|
|
|
|
{"matches": ["123\n"], "user": set(['test'])},
|
|
|
|
|
{"matches": ["ABC\n"], "user": set(['test', 'root'])},
|
|
|
|
|
{"matches": ["1234\n"], "user": set(['test', 'root'])},
|
|
|
|
|
]
|
|
|
|
|
matches2find = [f["matches"][0] for f in failures]
|
|
|
|
|
# add failures sequential:
|
|
|
|
|
i = 80
|
|
|
|
|
for f in failures:
|
|
|
|
|
i -= 10
|
|
|
|
|
ticket = FailTicket("127.0.0.1", MyTime.time() - i, [f])
|
|
|
|
|
ticket = FailTicket("127.0.0.1", MyTime.time() - i, data=f)
|
|
|
|
|
ticket.setAttempt(1)
|
|
|
|
|
self.db.addBan(self.jail, ticket)
|
|
|
|
|
# should retrieve 2 matches only, but count of all attempts:
|
|
|
|
@ -316,9 +322,10 @@ class DatabaseTest(LogCaptureTestCase):
|
|
|
|
|
self.assertEqual(ticket.getIP(), "127.0.0.1")
|
|
|
|
|
self.assertEqual(ticket.getAttempt(), len(failures))
|
|
|
|
|
self.assertEqual(len(ticket.getMatches()), maxEntries)
|
|
|
|
|
self.assertEqual(ticket.getMatches(), failures[len(failures) - maxEntries:])
|
|
|
|
|
self.assertEqual(ticket.getMatches(), matches2find[-maxEntries:])
|
|
|
|
|
# add more failures at once:
|
|
|
|
|
ticket = FailTicket("127.0.0.1", MyTime.time() - 10, failures)
|
|
|
|
|
ticket = FailTicket("127.0.0.1", MyTime.time() - 10, matches2find,
|
|
|
|
|
data={"user": set(['test', 'root'])})
|
|
|
|
|
ticket.setAttempt(len(failures))
|
|
|
|
|
self.db.addBan(self.jail, ticket)
|
|
|
|
|
# should retrieve 2 matches only, but count of all attempts:
|
|
|
|
@ -326,7 +333,13 @@ class DatabaseTest(LogCaptureTestCase):
|
|
|
|
|
ticket = self.db.getBansMerged("127.0.0.1")
|
|
|
|
|
self.assertEqual(ticket.getAttempt(), 2 * len(failures))
|
|
|
|
|
self.assertEqual(len(ticket.getMatches()), maxEntries)
|
|
|
|
|
self.assertEqual(ticket.getMatches(), failures[len(failures) - maxEntries:])
|
|
|
|
|
self.assertEqual(ticket.getMatches(), matches2find[-maxEntries:])
|
|
|
|
|
# also using getCurrentBans:
|
|
|
|
|
ticket = self.db.getCurrentBans(self.jail, "127.0.0.1", fromtime=MyTime.time()-100)
|
|
|
|
|
self.assertTrue(ticket is not None)
|
|
|
|
|
self.assertEqual(ticket.getAttempt(), len(failures))
|
|
|
|
|
self.assertEqual(len(ticket.getMatches()), maxEntries)
|
|
|
|
|
self.assertEqual(ticket.getMatches(), matches2find[-maxEntries:])
|
|
|
|
|
|
|
|
|
|
def testGetBansMerged(self):
|
|
|
|
|
self.testAddJail()
|
|
|
|
|