- Organized imports (pylint)

git-svn-id: https://fail2ban.svn.sourceforge.net/svnroot/fail2ban/trunk@382 a942ae1a-1317-0410-a47c-b1dcaea8d605
0.x
Cyril Jaquier 2006-09-25 17:03:48 +00:00
parent 66526b550f
commit 4502c0f6ba
24 changed files with 106 additions and 108 deletions

View File

@ -24,7 +24,8 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import socket, cPickle
from cPickle import dumps, loads
import socket
class CSocket:
@ -40,7 +41,7 @@ class CSocket:
def send(self, msg):
# Convert every list member to string
obj = cPickle.dumps(map(str, msg))
obj = dumps(map(str, msg))
self.__csock.send(obj + CSocket.END_STRING)
ret = self.receive(self.__csock)
self.__csock.close()
@ -53,4 +54,4 @@ class CSocket:
if chunk == '':
raise RuntimeError, "socket connection broken"
msg = msg + chunk
return cPickle.loads(msg)
return loads(msg)

View File

@ -24,8 +24,8 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import time, logging, os
from subprocess import call
import logging, os
#from subprocess import call
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.actions.action")

View File

@ -25,10 +25,9 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
from banmanager import BanManager
from failmanager import FailManager, FailManagerEmpty
from jailthread import JailThread
from action import Action
import time, logging, os
import time, logging
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.actions")

View File

@ -24,7 +24,7 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import time, logging
import logging
from ticket import Ticket
# Gets the instance of the logger.

View File

@ -24,7 +24,7 @@ __date__ = "$Date: 2006-09-04 21:19:58 +0200 (Mon, 04 Sep 2006) $"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import re, time
import time
from datetemplate import DateTemplate

View File

@ -25,7 +25,7 @@ __date__ = "$Date: 2006-09-04 21:19:58 +0200 (Mon, 04 Sep 2006) $"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import re, time
import time
from datetemplate import DateTemplate

View File

@ -24,7 +24,7 @@ __date__ = "$Date: 2006-09-04 21:19:58 +0200 (Mon, 04 Sep 2006) $"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import re, time
import time
from datetemplate import DateTemplate

View File

@ -24,7 +24,7 @@ __date__ = "$Date: 2006-09-04 21:19:58 +0200 (Mon, 04 Sep 2006) $"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import re, time
import re
class DateTemplate:

View File

@ -24,7 +24,7 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import time, logging
import logging
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban")

View File

@ -27,7 +27,7 @@ __license__ = "GPL"
from faildata import FailData
from failticket import FailTicket
from threading import Lock
import time, logging
import logging
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.filter")

View File

@ -24,7 +24,7 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import time, logging
import logging
from ticket import Ticket
# Gets the instance of the logger.

View File

@ -25,12 +25,11 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
from failmanager import FailManager
from failmanager import FailManagerEmpty
from failticket import FailTicket
from jailthread import JailThread
from datedetector import DateDetector
import time, logging, os, re, sys, socket
import time, logging, re, socket
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.filter")
@ -286,7 +285,7 @@ class Filter(JailThread):
##
# Open the log file.
def openLogFile(self, filename):
def __openLogFile(self, filename):
""" Opens the log file specified on init.
"""
try:
@ -343,7 +342,7 @@ class Filter(JailThread):
def getFailures(self, filename):
ipList = dict()
ret = self.openLogFile(filename)
ret = self.__openLogFile(filename)
if not ret:
logSys.error("Unable to get failures in " + filename)
return False

View File

@ -24,10 +24,7 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
from failmanager import FailManager
from failmanager import FailManagerEmpty
from failticket import FailTicket
from datedetector import DateDetector
from filter import Filter
import time, logging, gamin

View File

@ -24,10 +24,7 @@ __date__ = "$Date: 2006-09-13 23:31:22 +0200 (Wed, 13 Sep 2006) $"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
from failmanager import FailManager
from failmanager import FailManagerEmpty
from failticket import FailTicket
from datedetector import DateDetector
from filter import Filter
import time, logging, os

View File

@ -25,6 +25,10 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
from threading import Thread
import logging
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.server")
class JailThread(Thread):

View File

@ -26,7 +26,7 @@ __license__ = "GPL"
from jails import Jails
from transmitter import Transmitter
import locale, logging, logging.handlers, sys, os, signal
import logging, logging.handlers, sys, os, signal
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.server")
@ -126,7 +126,7 @@ class Server:
def setFindTime(self, name, value):
self.__jails.getFilter(name).setFindTime(value)
def getFindTime(self):
def getFindTime(self, name):
return self.__jails.getFilter(name).getFindTime()
def setFailRegex(self, name, value):

View File

@ -25,7 +25,8 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
from threading import Thread
import socket, time, logging, cPickle, os, os.path
from cPickle import dumps, loads
import socket, logging, os, os.path
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.comm")
@ -109,7 +110,7 @@ class SocketWorker(Thread):
logSys.debug("Connection closed")
def __send(self, socket, msg):
obj = cPickle.dumps(msg)
obj = dumps(msg)
socket.send(obj + SSocket.END_STRING)
def __receive(self, socket):
@ -119,7 +120,7 @@ class SocketWorker(Thread):
if chunk == '':
raise RuntimeError, "socket connection broken"
msg = msg + chunk
return cPickle.loads(msg)
return loads(msg)
class SSocketErrorException(Exception):

View File

@ -24,7 +24,7 @@ __date__ = "$Date$"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import time, logging
import logging
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban")

View File

@ -27,7 +27,7 @@ __license__ = "GPL"
from ssocket import SSocket
from ssocket import SSocketErrorException
from threading import Lock
import re, pickle, logging
import logging, time
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban.comm")

View File

@ -32,25 +32,25 @@ class AddFailure(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.ticket = BanTicket('193.168.0.128', 1167605999.0)
self.banManager = BanManager()
self.assertTrue(self.banManager.addBanTicket(self.ticket))
self.__ticket = BanTicket('193.168.0.128', 1167605999.0)
self.__banManager = BanManager()
self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
def tearDown(self):
"""Call after every test case."""
def testAdd(self):
self.assertEqual(self.banManager.size(), 1)
self.assertEqual(self.__banManager.size(), 1)
def testAddDuplicate(self):
self.assertFalse(self.banManager.addBanTicket(self.ticket))
self.assertEqual(self.banManager.size(), 1)
self.assertFalse(self.__banManager.addBanTicket(self.__ticket))
self.assertEqual(self.__banManager.size(), 1)
def _testInListOK(self):
ticket = BanTicket('193.168.0.128', 1167605999.0)
self.assertTrue(self.banManager.inBanList(ticket))
self.assertTrue(self.__banManager.inBanList(ticket))
def _testInListNOK(self):
ticket = BanTicket('111.111.1.111', 1167605999.0)
self.assertFalse(self.banManager.inBanList(ticket))
self.assertFalse(self.__banManager.inBanList(ticket))

View File

@ -32,8 +32,8 @@ class DateDetectorTest(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.datedetector = DateDetector()
self.datedetector.addDefaultTemplate()
self.__datedetector = DateDetector()
self.__datedetector.addDefaultTemplate()
def tearDown(self):
"""Call after every test case."""
@ -43,18 +43,18 @@ class DateDetectorTest(unittest.TestCase):
date = [2006, 1, 23, 20, 59, 59, 0, 23, 0]
dateUnix = 1138046399.0
self.assertEqual(self.datedetector.getTime(log), date)
self.assertEqual(self.datedetector.getUnixTime(log), dateUnix)
self.assertEqual(self.__datedetector.getTime(log), date)
self.assertEqual(self.__datedetector.getUnixTime(log), dateUnix)
def testGetTime(self):
log = "Jan 23 21:59:59 [sshd] error: PAM: Authentication failure"
date = [2006, 1, 23, 21, 59, 59, 1, 23, -1]
dateUnix = 1138049999.0
self.assertEqual(self.datedetector.getTime(log), date)
self.assertEqual(self.datedetector.getTime(log), date)
self.assertEqual(self.datedetector.getTime(log), date)
self.assertEqual(self.datedetector.getUnixTime(log), dateUnix)
self.assertEqual(self.datedetector.getUnixTime(log), dateUnix)
self.assertEqual(self.datedetector.getUnixTime(log), dateUnix)
self.assertEqual(self.__datedetector.getTime(log), date)
self.assertEqual(self.__datedetector.getTime(log), date)
self.assertEqual(self.__datedetector.getTime(log), date)
self.assertEqual(self.__datedetector.getUnixTime(log), dateUnix)
self.assertEqual(self.__datedetector.getUnixTime(log), dateUnix)
self.assertEqual(self.__datedetector.getUnixTime(log), dateUnix)

View File

@ -33,47 +33,47 @@ class AddFailure(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.items = [['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['87.142.124.10', 1167605999.0],
['87.142.124.10', 1167605999.0],
['87.142.124.10', 1167605999.0]]
self.__items = [['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['193.168.0.128', 1167605999.0],
['87.142.124.10', 1167605999.0],
['87.142.124.10', 1167605999.0],
['87.142.124.10', 1167605999.0]]
self.failManager = FailManager()
for i in self.items:
self.failManager.addFailure(FailTicket(i[0], i[1]))
self.__failManager = FailManager()
for i in self.__items:
self.__failManager.addFailure(FailTicket(i[0], i[1]))
def tearDown(self):
"""Call after every test case."""
def testAdd(self):
self.assertEqual(self.failManager.size(), 2)
self.assertEqual(self.__failManager.size(), 2)
def _testDel(self):
self.failManager.delFailure('193.168.0.128')
self.failManager.delFailure('111.111.1.111')
self.__failManager.delFailure('193.168.0.128')
self.__failManager.delFailure('111.111.1.111')
self.assertEqual(self.failManager.size(), 1)
self.assertEqual(self.__failManager.size(), 1)
def testCleanupOK(self):
timestamp = 1167606999.0
self.failManager.cleanup(timestamp)
self.assertEqual(self.failManager.size(), 0)
self.__failManager.cleanup(timestamp)
self.assertEqual(self.__failManager.size(), 0)
def testCleanupNOK(self):
timestamp = 1167605990.0
self.failManager.cleanup(timestamp)
self.assertEqual(self.failManager.size(), 2)
self.__failManager.cleanup(timestamp)
self.assertEqual(self.__failManager.size(), 2)
def testbanOK(self):
self.failManager.setMaxRetry(5)
self.__failManager.setMaxRetry(5)
#ticket = FailTicket('193.168.0.128', None)
ticket = self.failManager.toBan()
ticket = self.__failManager.toBan()
self.assertEqual(ticket.getIP(), "193.168.0.128")
def testbanNOK(self):
self.failManager.setMaxRetry(10)
self.assertRaises(FailManagerEmpty, self.failManager.toBan)
self.__failManager.setMaxRetry(10)
self.assertRaises(FailManagerEmpty, self.__failManager.toBan)

View File

@ -32,7 +32,7 @@ class IgnoreIP(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.filter = FilterPoll(None)
self.__filter = FilterPoll(None)
def tearDown(self):
"""Call after every test case."""
@ -40,44 +40,44 @@ class IgnoreIP(unittest.TestCase):
def testIgnoreIPOK(self):
ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99"
for ip in ipList:
self.filter.addIgnoreIP(ip)
self.assertTrue(self.filter.inIgnoreIPList(ip))
self.__filter.addIgnoreIP(ip)
self.assertTrue(self.__filter.inIgnoreIPList(ip))
def testIgnoreIPNOK(self):
ipList = "", "999.999.999.999", "abcdef", "192.168.0."
for ip in ipList:
self.filter.addIgnoreIP(ip)
self.assertFalse(self.filter.inIgnoreIPList(ip))
self.__filter.addIgnoreIP(ip)
self.assertFalse(self.__filter.inIgnoreIPList(ip))
class LogFile(unittest.TestCase):
filename = "testcases/files/testcase01.log"
FILENAME = "testcases/files/testcase01.log"
def setUp(self):
"""Call before every test case."""
self.filter = FilterPoll(None)
self.filter.addLogPath(LogFile.filename)
self.__filter = FilterPoll(None)
self.__filter.addLogPath(LogFile.FILENAME)
def tearDown(self):
"""Call after every test case."""
def testOpen(self):
self.filter.openLogFile(LogFile.filename)
#def testOpen(self):
# self.__filter.openLogFile(LogFile.FILENAME)
def testIsModified(self):
self.assertTrue(self.filter.isModified(LogFile.filename))
self.assertTrue(self.__filter.isModified(LogFile.FILENAME))
class GetFailures(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.filter = FilterPoll(None)
self.filter.addLogPath("testcases/files/testcase01.log")
self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
self.filter.setTimePattern("%b %d %H:%M:%S")
self.filter.setFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) (?:::f{4,6}:)?(?P<host>\S*)")
self.__filter = FilterPoll(None)
self.__filter.addLogPath("testcases/files/testcase01.log")
self.__filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
self.__filter.setTimePattern("%b %d %H:%M:%S")
self.__filter.setFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) (?:::f{4,6}:)?(?P<host>\S*)")
def tearDown(self):
"""Call after every test case."""
@ -86,12 +86,12 @@ class GetFailures(unittest.TestCase):
output = [('87.142.124.10', 3, 1167605999.0),
('193.168.0.128', 3, 1167605999.0)]
self.filter.openLogFile()
self.filter.getFailures()
self.__filter.openLogFile()
self.__filter.getFailures()
found = []
for ip in self.filter.failManager.failList:
fData = self.filter.failManager.failList[ip]
for ip in self.__filter.failManager.failList:
fData = self.__filter.failManager.failList[ip]
retry = fData.getRetry()
lTime = fData.getLastTime()
found.append((ip, retry, lTime))

View File

@ -31,33 +31,33 @@ class StartStop(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.server = Server()
self.server.setLogLevel(0)
self.server.start(False)
self.__server = Server()
self.__server.setLogLevel(0)
self.__server.start(False)
def tearDown(self):
"""Call after every test case."""
self.server.quit()
self.__server.quit()
def testStartStopJail(self):
name = "TestCase"
self.server.addJail(name)
self.server.startJail(name)
self.__server.addJail(name)
self.__server.startJail(name)
time.sleep(1)
self.server.stopJail(name)
self.__server.stopJail(name)
class Transmitter(unittest.TestCase):
def setUp(self):
"""Call before every test case."""
self.server = Server()
self.server.setLogLevel(0)
self.server.start(False)
self.__server = Server()
self.__server.setLogLevel(0)
self.__server.start(False)
def tearDown(self):
"""Call after every test case."""
self.server.quit()
self.__server.quit()
def testSetActionOK(self):
name = "TestCase"
@ -79,7 +79,7 @@ class Transmitter(unittest.TestCase):
cnt = 0
for cmd in cmdList:
self.assertEqual(self.server.transm.proceed(cmd), outList[cnt])
self.assertEqual(self.__server.transm.proceed(cmd), outList[cnt])
cnt += 1
def testSetActionNOK(self):
@ -102,7 +102,7 @@ class Transmitter(unittest.TestCase):
cnt = 0
for cmd in cmdList:
msg = self.server.transm.proceed(cmd)
msg = self.__server.transm.proceed(cmd)
self.assertEqual(msg[0], outList[cnt])
cnt += 1
@ -118,10 +118,10 @@ class Transmitter(unittest.TestCase):
["quit"]]
for cmd in cmdList:
self.server.transm.proceed(cmd)
self.__server.transm.proceed(cmd)
if cmd == ["start", name]:
time.sleep(2)
jail = self.server.jails[name]
jail = self.__server.jails[name]
self.assertEqual(jail.getFilter().failManager.size(), 0)
self.assertEqual(jail.getAction().banManager.size(), 2)