- Added firewall check command. Thanks to Yaroslav Halchenko

git-svn-id: https://fail2ban.svn.sourceforge.net/svnroot/fail2ban/branches/FAIL2BAN-0_5@204 a942ae1a-1317-0410-a47c-b1dcaea8d605
0.5
Cyril Jaquier 2005-10-26 21:47:46 +00:00
parent 31abe6f37c
commit 2701d7500b
1 changed files with 78 additions and 8 deletions

View File

@ -28,6 +28,10 @@ import time, os, logging, re
from utils.process import executeCmd
from utils.strings import replaceTag
# unfortunately but I have to bring ExternalError in especially for
# flushBanList: if one of IPs got flushed manually outside or something, we
# might endup with not "full" flush unless we handle exception within the loop
from utils.process import ExternalError
# Gets the instance of the logger.
logSys = logging.getLogger("fail2ban")
@ -37,41 +41,102 @@ class Firewall:
the IP.
"""
def __init__(self, banRule, unBanRule, banTime):
def __init__(self, startRule, endRule, banRule, unBanRule, checkRule,
banTime):
self.banRule = banRule
self.unBanRule = unBanRule
self.checkRule = checkRule
self.startRule = startRule
self.endRule = endRule
self.banTime = banTime
self.banList = dict()
self.section = ""
self.mustCheck = True
def setSection(self, section):
""" Set optional section name for clarify of logging
"""
self.section = section
def getMustCheck(self):
""" Return true if the runCheck test is executed
"""
return self.mustCheck
def setMustCheck(self, value):
""" Enable or disable the execution of runCheck test
"""
self.mustCheck = value
def initialize(self, debug):
logSys.debug("%s: Initialize firewall rules"%self.section)
executeCmd(self.startRule, debug)
def restore(self, debug):
logSys.debug("%s: Restore firewall rules"%self.section)
try:
self.flushBanList(debug)
executeCmd(self.endRule, debug)
except ExternalError:
pass
def addBanIP(self, aInfo, debug):
""" Bans an IP.
"""
ip = aInfo["ip"]
if not self.inBanList(ip):
crtTime = time.time()
logSys.warn("Ban " + ip)
logSys.warn("%s: Ban "%self.section + ip)
self.banList[ip] = crtTime
aInfo["bantime"] = crtTime
executeCmd(self.banIP(aInfo), debug)
self.runCheck(debug)
cmd = self.banIP(aInfo)
if executeCmd(cmd, debug):
raise ExternalError("Firewall: execution of fwban command " +
"'%s' failed"%cmd)
else:
logSys.error(ip+" already in ban list")
self.runCheck(debug)
logSys.error("%s: "%self.section+ip+" already in ban list")
def delBanIP(self, aInfo, debug):
""" Unban an IP.
"""
ip = aInfo["ip"]
if self.inBanList(ip):
logSys.warn("Unban "+ip)
logSys.warn("%s: Unban "%self.section + ip)
del self.banList[ip]
self.runCheck(debug)
executeCmd(self.unBanIP(aInfo), debug)
else:
logSys.error(ip+" not in ban list")
logSys.error("%s: "%self.section+ip+" not in ban list")
def reBan(self, debug):
""" Re-Bans known IPs.
TODO: implement "failures" and "failtime"
"""
for ip in self.banList:
aInfo = {"ip": ip,
"bantime":self.banList[ip]}
logSys.warn("%s: ReBan "%self.section + ip)
# next piece is similar to the on in addBanIp
# so might be one more function will not hurt
self.runCheck(debug)
executeCmd(self.banIP(aInfo), debug)
def inBanList(self, ip):
""" Checks if IP is in ban list.
"""
return self.banList.has_key(ip)
def runCheck(self, debug):
""" Runs fwcheck command and throws an exception if it returns non-0
result
"""
if self.mustCheck:
executeCmd(self.checkRule, debug)
else:
return None
def checkForUnBan(self, debug):
""" Check for IP to remove from ban list.
"""
@ -93,7 +158,12 @@ class Firewall:
aInfo = {"ip": element[0],
"bantime": element[1],
"unbantime": time.time()}
self.delBanIP(aInfo, debug)
try:
self.delBanIP(aInfo, debug)
except ExternalError:
# we must let it fail here in the loop, or we don't
# flush properly
pass
def banIP(self, aInfo):
""" Returns query to ban IP.