BF: Remove unused imports and variables.

All highlighted by using pyflakes.
pull/652/head
Steven Hiscocks 2014-03-16 14:31:34 +00:00
parent 16125ec81a
commit 41cbbbc248
22 changed files with 50 additions and 116 deletions

View File

@ -17,7 +17,6 @@
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import sys
import socket
import smtplib
from email.mime.text import MIMEText

View File

@ -26,7 +26,6 @@ __license__ = "GPL"
import logging
from .configreader import ConfigReader
from .fail2banreader import Fail2banReader
from .jailsreader import JailsReader

View File

@ -21,7 +21,7 @@ __author__ = "Cyril Jaquier and Fail2Ban Contributors"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import sys, time, logging
import logging
from threading import Lock
from .datetemplate import DatePatternRegex, DateTai64n, DateEpoch

View File

@ -24,13 +24,10 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import re, time, calendar
import re
import logging
from abc import abstractmethod
from datetime import datetime
from datetime import timedelta
from .mytime import MyTime
from .strptime import reGroupDictStrptime, timeRE
logSys = logging.getLogger(__name__)

View File

@ -21,7 +21,7 @@ __author__ = "Cyril Jaquier and Fail2Ban Contributors"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2011-2013 Yaroslav Halchenko"
__license__ = "GPL"
import logging, re, os, fcntl, time, sys, locale, codecs
import logging, re, os, fcntl, sys, locale, codecs
from .failmanager import FailManagerEmpty, FailManager
from .ticket import FailTicket
@ -408,7 +408,6 @@ class Filter(JailThread):
"""Processes the line for failures and populates failManager
"""
for element in self.processLine(line, date)[1]:
failregex = element[0]
ip = element[1]
unixTime = element[2]
lines = element[3]
@ -470,9 +469,8 @@ class Filter(JailThread):
date = self.__lastDate
else:
# Lets split into time part and log part of the line
# Lets get the time part
date = dateTimeMatch[0]
timeMatch = dateTimeMatch[1]
self.__lastTimeText = timeText
self.__lastDate = date

View File

@ -23,10 +23,12 @@ __author__ = "Cyril Jaquier, Lee Clemens, Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2011-2012 Lee Clemens, 2012 Yaroslav Halchenko"
__license__ = "GPL"
import time, logging, pyinotify
import logging
from distutils.version import LooseVersion
from os.path import dirname, sep as pathsep
import pyinotify
from .failmanager import FailManagerEmpty
from .filter import FileFilter
from .mytime import MyTime
@ -157,7 +159,7 @@ class FilterPyinotify(FileFilter):
# Remove watches for the directory
# since there is no other monitored file under this directory
wdInt = self.__watches.pop(path_dir)
_ = self.__monitor.rm_watch(wdInt)
self.__monitor.rm_watch(wdInt)
logSys.debug("Removed monitor for the parent directory %s", path_dir)

View File

@ -114,29 +114,26 @@ class Jail:
def _initPolling(self):
logSys.info("Jail '%s' uses poller" % self.name)
from filterpoll import FilterPoll
logSys.info("Jail '%s' uses poller" % self.name)
self.__filter = FilterPoll(self)
def _initGamin(self):
# Try to import gamin
import gamin
logSys.info("Jail '%s' uses Gamin" % self.name)
from filtergamin import FilterGamin
logSys.info("Jail '%s' uses Gamin" % self.name)
self.__filter = FilterGamin(self)
def _initPyinotify(self):
# Try to import pyinotify
import pyinotify
logSys.info("Jail '%s' uses pyinotify" % self.name)
from filterpyinotify import FilterPyinotify
logSys.info("Jail '%s' uses pyinotify" % self.name)
self.__filter = FilterPyinotify(self)
def _initSystemd(self): # pragma: systemd no cover
# Try to import systemd
import systemd
logSys.info("Jail '%s' uses systemd" % self.name)
from filtersystemd import FilterSystemd
logSys.info("Jail '%s' uses systemd" % self.name)
self.__filter = FilterSystemd(self)
@property

View File

@ -32,7 +32,6 @@ from .filter import FileFilter, JournalFilter
from .transmitter import Transmitter
from .asyncserver import AsyncServer, AsyncServerException
from .database import Fail2BanDb
from .action import CommandAction
from .. import version
# Gets the instance of the logger.

View File

@ -49,7 +49,6 @@ def reGroupDictStrptime(found_dict):
year = month = day = hour = minute = None
hour = minute = None
second = fraction = 0
tz = -1
tzoffset = None
# Default to -1 to signify that values not known; not critical to have,
# though
@ -140,21 +139,6 @@ def reGroupDictStrptime(found_dict):
tzoffset += int(z[-2:]) # ...and minutes
if z.startswith("-"):
tzoffset = -tzoffset
elif group_key == 'Z':
# Since -1 is default value only need to worry about setting tz if
# it can be something other than -1.
found_zone = found_dict['Z'].lower()
for value, tz_values in enumerate(locale_time.timezone):
if found_zone in tz_values:
# Deal with bad locale setup where timezone names are the
# same and yet time.daylight is true; too ambiguous to
# be able to tell what timezone has daylight savings
if (time.tzname[0] == time.tzname[1] and
time.daylight and found_zone not in ("utc", "gmt")):
break
else:
tz = value
break
# Fail2Ban will assume it's this year
assume_year = False

View File

@ -20,10 +20,6 @@
import os
import unittest
import sys
if sys.version_info >= (3, 3):
import importlib
else:
import imp
from ..dummyjail import DummyJail

View File

@ -24,8 +24,9 @@ __author__ = "Daniel Black"
__copyright__ = "Copyright (c) 2013 Daniel Black"
__license__ = "GPL"
import unittest, time
import sys, os, tempfile
import time
import os
import tempfile
from ..server.actions import Actions
from .dummyjail import DummyJail

View File

@ -25,7 +25,6 @@ __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import time
import logging, sys
from ..server.action import CommandAction, CallingMap

View File

@ -21,7 +21,7 @@ __author__ = "Cyril Jaquier, Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2011-2013 Yaroslav Halchenko"
__license__ = "GPL"
import os, glob, shutil, sys, tempfile, unittest
import os, glob, shutil, tempfile, unittest
from ..client.configreader import ConfigReader
from ..client.jailreader import JailReader
@ -326,7 +326,6 @@ class FilterReaderTest(unittest.TestCase):
self.assertEqual(sorted(c), sorted(output))
def testFilterReaderSubstitionFail(self):
output = [['set', 'jailname', 'addfailregex', 'to=sour@example.com fromip=<IP>']]
filterReader = FilterReader('substition', "jailname", {'honeypot': '<sweet>', 'sweet': '<honeypot>'})
filterReader.setBaseDir(TEST_FILES_DIR)
filterReader.read()

View File

@ -24,7 +24,10 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import unittest, calendar, time, datetime, re, pprint
import unittest
import time
import datetime
from ..server.datedetector import DateDetector
from ..server.datetemplate import DateTemplate
from .utils import setUpMyTime, tearDownMyTime
@ -52,7 +55,6 @@ class DateDetectorTest(unittest.TestCase):
def testGetTime(self):
log = "Jan 23 21:59:59 [sshd] error: PAM: Authentication failure"
date = [2005, 1, 23, 21, 59, 59, 6, 23, -1]
dateUnix = 1106513999.0
# yoh: testing only up to 6 elements, since the day of the week
# is not correctly determined atm, since year is not present
@ -65,7 +67,6 @@ class DateDetectorTest(unittest.TestCase):
def testVariousTimes(self):
"""Test detection of various common date/time formats f2b should understand
"""
date = [2005, 1, 23, 21, 59, 59, 6, 23, -1]
dateUnix = 1106513999.0
for anchored, sdate in (

View File

@ -24,7 +24,7 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import unittest, socket, time, pickle
import unittest
from ..server.failmanager import FailManager, FailManagerEmpty
from ..server.ticket import FailTicket

View File

@ -1,6 +1,4 @@
from fail2ban.server.action import ActionBase
class TestAction():
def __init__(self, jail, name):

View File

@ -14,7 +14,7 @@ def auth(v):
nonce = v['nonce'][1:-1]
nc=v.get('nc') or ''
cnonce = v.get('cnonce') or ''
opaque = v.get('opaque') or ''
#opaque = v.get('opaque') or ''
qop = v['qop'][1:-1]
algorithm = v['algorithm']
response = md5.new(ha1 + ':' + nonce + ':' + nc + ':' + cnonce + ':' + qop + ':' + ha2).hexdigest()

View File

@ -28,6 +28,7 @@ import os
import sys
import time
import tempfile
import uuid
try:
from systemd import journal
@ -37,7 +38,7 @@ except ImportError:
from ..server.jail import Jail
from ..server.filterpoll import FilterPoll
from ..server.filter import Filter, FileFilter, DNSUtils
from ..server.failmanager import FailManager, FailManagerEmpty
from ..server.failmanager import FailManagerEmpty
from ..server.mytime import MyTime
from .utils import setUpMyTime, tearDownMyTime, mtimesleep, LogCaptureTestCase
from .dummyjail import DummyJail
@ -132,7 +133,7 @@ def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line
fin = in_
# Skip
for i in xrange(skip):
_ = fin.readline()
fin.readline()
# Read
i = 0
lines = []
@ -169,7 +170,7 @@ def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # p
})
# Skip
for i in xrange(skip):
_ = fin.readline()
fin.readline()
# Read/Write
i = 0
while n is None or i < n:
@ -444,8 +445,6 @@ class LogFileMonitor(LogCaptureTestCase):
self.assertEqual(self.filter.failManager.getFailTotal(), 3)
from threading import Lock
def get_monitor_failures_testcase(Filter_):
"""Generator of TestCase's for different filters/backends
"""
@ -674,7 +673,6 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
self.filter = Filter_(self.jail)
# UUID used to ensure that only meeages generated
# as part of this test are picked up by the filter
import uuid
self.test_uuid = str(uuid.uuid4())
self.name = "monitorjournalfailures-%s" % self.test_uuid
self.filter.addJournalMatch([
@ -720,7 +718,7 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
attempts = ticket.getAttempt()
ip = ticket.getIP()
matches = ticket.getMatches()
ticket.getMatches()
self.assertEqual(ip, test_ip)
self.assertEqual(attempts, test_attempts)
@ -915,8 +913,6 @@ class GetFailures(unittest.TestCase):
_assert_correct_last_attempt(self, self.filter, output)
def testGetFailuresIgnoreRegex(self):
output = ('141.3.81.106', 8, 1124017141.0)
self.filter.addLogPath(GetFailures.FILENAME_02)
self.filter.addFailRegex("Failed .* from <HOST>")
self.filter.addFailRegex("Accepted .* from <HOST>")
@ -1009,5 +1005,5 @@ class JailTests(unittest.TestCase):
def testSetBackend_gh83(self):
# smoke test
# Must not fail to initiate
jail = Jail('test', backend='polling')
Jail('test', backend='polling')

View File

@ -22,14 +22,20 @@ __copyright__ = "Copyright (c) 2013 Yaroslav Halchenko"
__license__ = "GPL"
import logging
import os, sys, unittest
import os
import sys
import unittest
import tempfile
import shutil
import fnmatch
import datetime
from glob import glob
from StringIO import StringIO
from .utils import mbasename, TraceBack, FormatterWithTraceBack
from ..helpers import formatExceptionInfo
from ..server.datetemplate import DatePatternRegex
class HelpersTest(unittest.TestCase):
@ -53,7 +59,6 @@ class HelpersTest(unittest.TestCase):
# based on
# http://stackoverflow.com/questions/2186525/use-a-glob-to-find-files-recursively-in-python
def recursive_glob(treeroot, pattern):
import fnmatch, os
results = []
for base, dirs, files in os.walk(treeroot):
goodfiles = fnmatch.filter(dirs + files, pattern)
@ -150,7 +155,6 @@ class TestsUtilsTest(unittest.TestCase):
def testFormatterWithTraceBack(self):
from StringIO import StringIO
strout = StringIO()
Formatter = FormatterWithTraceBack
@ -170,11 +174,6 @@ class TestsUtilsTest(unittest.TestCase):
self.assertTrue(pindex > 10) # we should have some traceback
self.assertEqual(s[:pindex], s[pindex+1:pindex*2 + 1])
import datetime
import time
from ..server.datetemplate import DatePatternRegex
iso8601 = DatePatternRegex("%Y-%m-%d[T ]%H:%M:%S(?:\.%f)?%z")
class CustomDateFormatsTest(unittest.TestCase):

View File

@ -24,14 +24,18 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
import unittest, socket, time, tempfile, os, locale, sys, logging
import unittest
import time
import tempfile
import os
import locale
import sys
import logging
from ..server.failregex import Regex, FailRegex, RegexException
from ..server.server import Server, logSys
from ..server.server import Server
from ..server.jail import Jail
from ..exceptions import UnknownJailException
from .utils import LogCaptureTestCase
#from bin.fail2ban-client import Fail2banClient
try:
from ..server import filtersystemd
except ImportError: # pragma: no cover
@ -39,37 +43,6 @@ except ImportError: # pragma: no cover
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
class StartStop(LogCaptureTestCase):
def setUp(self):
self.client = Fail2banClient()
LogCaptureTestCase.setUp(self)
sock_fd, sock_name = tempfile.mkstemp('fail2ban.sock', 'transmitter')
os.close(sock_fd)
os.remove(sock_name)
pidfile_fd, pidfile_name = tempfile.mkstemp(
'fail2ban.pid', 'transmitter')
os.close(pidfile_fd)
os.remove(pidfile_name)
self.client.__getCmdLineOptions([
('-c', os.path.join('fail2ban', 'tests', 'config')),
('-s', sock_name),
('-p', pidfile_name)])
self.client.__startServerAsync(sock_name, pidfile_name, False)
self.client.__waitOnServer()
def tearDown(self):
self.__server.quit()
LogCaptureTestCase.tearDown(self)
def testStartStopJail(self):
name = "TestCase"
self.__server.addJail(name, "auto")
self.__server.startJail(name)
time.sleep(1)
self.__server.stopJail(name)
self.printLog()
class TestServer(Server):
def setLogLevel(self, *args, **kwargs):
pass

View File

@ -22,10 +22,9 @@ __author__ = "Yaroslav Halchenko"
__copyright__ = "Copyright (c) 2013 Yaroslav Halchenko"
__license__ = "GPL"
import logging, os, re, traceback, time, unittest, sys
import logging, os, re, traceback, time, unittest
from os.path import basename, dirname
from StringIO import StringIO
import json
from ..server.mytime import MyTime

View File

@ -79,9 +79,7 @@ else:
# Get version number, avoiding importing fail2ban.
# This is due to tests not functioning for python3 as 2to3 takes place later
f = open(join("fail2ban", "version.py"))
exec(f.read())
f.close()
exec(open(join("fail2ban", "version.py")).read())
setup(
name = "fail2ban",