mirror of
https://github.com/fail2ban/fail2ban.git
synced 2025-11-26 14:20:19 +08:00
Now it was already a mix, and Cyril is not working on this code any longer so no need to maintain this convention.
370 lines
11 KiB
Python
Executable File
370 lines
11 KiB
Python
Executable File
#!/usr/bin/python
|
|
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
|
|
# vi: set ft=python sts=4 ts=4 sw=4 noet :
|
|
#
|
|
# This file is part of Fail2Ban.
|
|
#
|
|
# Fail2Ban is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Fail2Ban is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Fail2Ban; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
__author__ = "Cyril Jaquier, Yaroslav Halchenko"
|
|
__copyright__ = "Copyright (c) 2004 Cyril Jaquier, 2012 Yaroslav Halchenko"
|
|
__license__ = "GPL"
|
|
|
|
import getopt, sys, time, logging, os
|
|
|
|
# Inserts our own modules path first in the list
|
|
# fix for bug #343821
|
|
try:
|
|
from common.version import version
|
|
except ImportError, e:
|
|
sys.path.insert(1, "/usr/share/fail2ban")
|
|
from common.version import version
|
|
|
|
from client.configparserinc import SafeConfigParserWithIncludes
|
|
from ConfigParser import NoOptionError, NoSectionError, MissingSectionHeaderError
|
|
from server.filter import Filter
|
|
from server.failregex import RegexException
|
|
|
|
# Gets the instance of the logger.
|
|
logSys = logging.getLogger("fail2ban.regex")
|
|
|
|
class RegexStat:
|
|
|
|
def __init__(self, failregex):
|
|
self.__stats = 0
|
|
self.__failregex = failregex
|
|
self.__ipList = list()
|
|
|
|
def __str__(self):
|
|
return "%s(%r) %d failed: %s" \
|
|
% (self.__class__, self.__failregex, self.__stats, self.__ipList)
|
|
|
|
def inc(self):
|
|
self.__stats += 1
|
|
|
|
def getStats(self):
|
|
return self.__stats
|
|
|
|
def getFailRegex(self):
|
|
return self.__failregex
|
|
|
|
def appendIP(self, value):
|
|
self.__ipList.extend(value)
|
|
|
|
def getIPList(self):
|
|
return self.__ipList
|
|
|
|
class Fail2banRegex:
|
|
|
|
test = None
|
|
|
|
CONFIG_DEFAULTS = {'configpath' : "/etc/fail2ban/"}
|
|
|
|
def __init__(self):
|
|
self.__filter = Filter(None)
|
|
self.__ignoreregex = list()
|
|
self.__failregex = list()
|
|
self.__verbose = False
|
|
# Setup logging
|
|
logging.getLogger("fail2ban").handlers = []
|
|
self.__hdlr = logging.StreamHandler(Fail2banRegex.test)
|
|
# set a format which is simpler for console use
|
|
formatter = logging.Formatter("%(message)s")
|
|
# tell the handler to use this format
|
|
self.__hdlr.setFormatter(formatter)
|
|
self.__logging_level = self.__verbose and logging.DEBUG or logging.WARN
|
|
logging.getLogger("fail2ban").addHandler(self.__hdlr)
|
|
logging.getLogger("fail2ban").setLevel(logging.ERROR)
|
|
|
|
#@staticmethod
|
|
def dispVersion():
|
|
print "Fail2Ban v" + version
|
|
print
|
|
print "Copyright (c) 2004-2008 Cyril Jaquier"
|
|
print "Copyright of modifications held by their respective authors."
|
|
print "Licensed under the GNU General Public License v2 (GPL)."
|
|
print
|
|
print "Written by Cyril Jaquier <cyril.jaquier@fail2ban.org>."
|
|
print "Many contributions by Yaroslav O. Halchenko <debian@onerussian.com>."
|
|
dispVersion = staticmethod(dispVersion)
|
|
|
|
#@staticmethod
|
|
def dispUsage():
|
|
print "Usage: "+sys.argv[0]+" [OPTIONS] <LOG> <REGEX> [IGNOREREGEX]"
|
|
print
|
|
print "Fail2Ban v" + version + " reads log file that contains password failure report"
|
|
print "and bans the corresponding IP addresses using firewall rules."
|
|
print
|
|
print "This tools can test regular expressions for \"fail2ban\"."
|
|
print
|
|
print "Options:"
|
|
print " -h, --help display this help message"
|
|
print " -V, --version print the version"
|
|
print " -v, --verbose verbose output"
|
|
print
|
|
print "Log:"
|
|
print " string a string representing a log line"
|
|
print " filename path to a log file (/var/log/auth.log)"
|
|
print
|
|
print "Regex:"
|
|
print " string a string representing a 'failregex'"
|
|
print " filename path to a filter file (filter.d/sshd.conf)"
|
|
print
|
|
print "IgnoreRegex:"
|
|
print " string a string representing an 'ignoreregex'"
|
|
print " filename path to a filter file (filter.d/sshd.conf)"
|
|
print
|
|
print "Report bugs to https://github.com/fail2ban/fail2ban/issues"
|
|
dispUsage = staticmethod(dispUsage)
|
|
|
|
def getCmdLineOptions(self, optList):
|
|
""" Gets the command line options
|
|
"""
|
|
for opt in optList:
|
|
if opt[0] in ["-h", "--help"]:
|
|
self.dispUsage()
|
|
sys.exit(0)
|
|
elif opt[0] in ["-V", "--version"]:
|
|
self.dispVersion()
|
|
sys.exit(0)
|
|
elif opt[0] in ["-v", "--verbose"]:
|
|
self.__verbose = True
|
|
|
|
#@staticmethod
|
|
def logIsFile(value):
|
|
return os.path.isfile(value)
|
|
logIsFile = staticmethod(logIsFile)
|
|
|
|
def readIgnoreRegex(self, value):
|
|
if os.path.isfile(value):
|
|
reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS)
|
|
try:
|
|
reader.read(value)
|
|
print "Use ignoreregex file : " + value
|
|
self.__ignoreregex = [RegexStat(m)
|
|
for m in reader.get("Definition", "ignoreregex").split('\n')]
|
|
except NoSectionError:
|
|
print "No [Definition] section in " + value
|
|
print
|
|
return False
|
|
except NoOptionError:
|
|
print "No failregex option in " + value
|
|
print
|
|
return False
|
|
except MissingSectionHeaderError:
|
|
print "No section headers in " + value
|
|
print
|
|
return False
|
|
else:
|
|
if len(value) > 53:
|
|
stripReg = value[0:50] + "..."
|
|
else:
|
|
stripReg = value
|
|
print "Use ignoreregex line : " + stripReg
|
|
self.__ignoreregex = [RegexStat(value)]
|
|
return True
|
|
|
|
def readRegex(self, value):
|
|
if os.path.isfile(value):
|
|
reader = SafeConfigParserWithIncludes(defaults=self.CONFIG_DEFAULTS)
|
|
try:
|
|
reader.read(value)
|
|
print "Use regex file : " + value
|
|
self.__failregex = [RegexStat(m)
|
|
for m in reader.get("Definition", "failregex").split('\n')]
|
|
except NoSectionError:
|
|
print "No [Definition] section in " + value
|
|
print
|
|
return False
|
|
except NoOptionError:
|
|
print "No failregex option in " + value
|
|
print
|
|
return False
|
|
except MissingSectionHeaderError:
|
|
print "No section headers in " + value
|
|
print
|
|
return False
|
|
else:
|
|
if len(value) > 53:
|
|
stripReg = value[0:50] + "..."
|
|
else:
|
|
stripReg = value
|
|
print "Use regex line : " + stripReg
|
|
self.__failregex = [RegexStat(value)]
|
|
return True
|
|
|
|
def testIgnoreRegex(self, line):
|
|
found = False
|
|
for regex in self.__ignoreregex:
|
|
logging.getLogger("fail2ban").setLevel(self.__logging_level)
|
|
try:
|
|
self.__filter.addIgnoreRegex(regex.getFailRegex())
|
|
try:
|
|
ret = self.__filter.ignoreLine(line)
|
|
if ret:
|
|
regex.inc()
|
|
except RegexException, e:
|
|
print e
|
|
return False
|
|
finally:
|
|
self.__filter.delIgnoreRegex(0)
|
|
logging.getLogger("fail2ban").setLevel(self.__logging_level)
|
|
|
|
def testRegex(self, line):
|
|
found = False
|
|
for regex in self.__ignoreregex:
|
|
self.__filter.addIgnoreRegex(regex.getFailRegex())
|
|
for regex in self.__failregex:
|
|
logging.getLogger("fail2ban").setLevel(logging.DEBUG)
|
|
try:
|
|
self.__filter.addFailRegex(regex.getFailRegex())
|
|
try:
|
|
ret = self.__filter.processLine(line)
|
|
if not len(ret) == 0:
|
|
if found == True:
|
|
ret[0].append(True)
|
|
else:
|
|
found = True
|
|
ret[0].append(False)
|
|
regex.inc()
|
|
regex.appendIP(ret)
|
|
except RegexException, e:
|
|
print e
|
|
return False
|
|
except IndexError:
|
|
print "Sorry, but no <host> found in regex"
|
|
return False
|
|
finally:
|
|
self.__filter.delFailRegex(0)
|
|
logging.getLogger("fail2ban").setLevel(logging.CRITICAL)
|
|
for regex in self.__ignoreregex:
|
|
self.__filter.delIgnoreRegex(0)
|
|
|
|
def printStats(self):
|
|
print
|
|
print "Results"
|
|
print "======="
|
|
print
|
|
|
|
def print_failregexes(title, failregexes):
|
|
# Print title
|
|
total, out = 0, []
|
|
for cnt, failregex in enumerate(failregexes):
|
|
match = failregex.getStats()
|
|
total += match
|
|
if (match or self.__verbose):
|
|
out.append("| %d) [%d] %s" % (cnt+1, match, failregex.getFailRegex()))
|
|
print "%s: %d total" % (title, total)
|
|
if len(out):
|
|
print "|- #) [# of hits] regular expression"
|
|
print '\n'.join(out)
|
|
print '`-'
|
|
print
|
|
return total
|
|
|
|
# Print title
|
|
total = print_failregexes("Failregex", self.__failregex)
|
|
_ = print_failregexes("Ignoreregex", self.__ignoreregex)
|
|
|
|
print "Summary"
|
|
print "======="
|
|
print
|
|
|
|
if total == 0:
|
|
print "Sorry, no match"
|
|
print
|
|
print "Look at the above section 'Running tests' which could contain important"
|
|
print "information."
|
|
return False
|
|
else:
|
|
# Print stats
|
|
print "Addresses found:"
|
|
for cnt, failregex in enumerate(self.__failregex):
|
|
if self.__verbose or len(failregex.getIPList()):
|
|
print "[%d]" % (cnt+1)
|
|
for ip in failregex.getIPList():
|
|
timeTuple = time.localtime(ip[1])
|
|
timeString = time.strftime("%a %b %d %H:%M:%S %Y", timeTuple)
|
|
print " %s (%s)%s" % (
|
|
ip[0], timeString, ip[2] and " (already matched)" or "")
|
|
print
|
|
|
|
print "Date template hits:"
|
|
for template in self.__filter.dateDetector.getTemplates():
|
|
if self.__verbose or template.getHits():
|
|
print `template.getHits()` + " hit(s): " + template.getName()
|
|
print
|
|
|
|
print "Success, the total number of match is " + str(total)
|
|
print
|
|
print "However, look at the above section 'Running tests' which could contain important"
|
|
print "information."
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
fail2banRegex = Fail2banRegex()
|
|
# Reads the command line options.
|
|
try:
|
|
cmdOpts = 'hVcv'
|
|
cmdLongOpts = ['help', 'version', 'verbose']
|
|
optList, args = getopt.getopt(sys.argv[1:], cmdOpts, cmdLongOpts)
|
|
except getopt.GetoptError:
|
|
fail2banRegex.dispUsage()
|
|
sys.exit(-1)
|
|
# Process command line
|
|
fail2banRegex.getCmdLineOptions(optList)
|
|
|
|
# We need 2 or 3 parameters
|
|
if not len(args) in (2, 3):
|
|
fail2banRegex.dispUsage()
|
|
sys.exit(-1)
|
|
else:
|
|
print
|
|
print "Running tests"
|
|
print "============="
|
|
print
|
|
|
|
cmd_log, cmd_regex = args[:2]
|
|
|
|
if len(args) == 3:
|
|
fail2banRegex.readIgnoreRegex(args[2]) or sys.exit(-1)
|
|
|
|
fail2banRegex.readRegex(cmd_regex) or sys.exit(-1)
|
|
|
|
if fail2banRegex.logIsFile(cmd_log):
|
|
try:
|
|
hdlr = open(cmd_log)
|
|
print "Use log file : " + cmd_log
|
|
print
|
|
for line in hdlr:
|
|
fail2banRegex.testIgnoreRegex(line)
|
|
fail2banRegex.testRegex(line)
|
|
except IOError, e:
|
|
print e
|
|
print
|
|
sys.exit(-1)
|
|
else:
|
|
if len(sys.argv[1]) > 53:
|
|
stripLog = cmd_log[0:50] + "..."
|
|
else:
|
|
stripLog = cmd_log
|
|
print "Use single line: " + stripLog
|
|
print
|
|
fail2banRegex.testIgnoreRegex(cmd_log)
|
|
fail2banRegex.testRegex(cmd_log)
|
|
|
|
fail2banRegex.printStats() or sys.exit(-1)
|