mirror of https://github.com/fail2ban/fail2ban
fixes deprecated DNSUtils.IsValidIP in fakegooglebot ignore command + test covered now;
Closes #1559pull/1569/head
parent
973ac9a45c
commit
fa8184d4cc
|
@ -5,31 +5,34 @@
|
|||
# presence of host and cut commands
|
||||
#
|
||||
import sys
|
||||
from fail2ban.server.ipdns import DNSUtils, IPAddr
|
||||
|
||||
def process_args(argv):
|
||||
if len(argv) != 2:
|
||||
sys.stderr.write("Please provide a single IP as an argument. Got: %s\n"
|
||||
raise ValueError("Please provide a single IP as an argument. Got: %s\n"
|
||||
% (argv[1:]))
|
||||
sys.exit(2)
|
||||
|
||||
ip = argv[1]
|
||||
|
||||
from fail2ban.server.ipdns import DNSUtils
|
||||
if not DNSUtils.isValidIP(ip):
|
||||
sys.stderr.write("Argument must be a single valid IP. Got: %s\n"
|
||||
if not IPAddr(ip).isValid:
|
||||
raise ValueError("Argument must be a single valid IP. Got: %s\n"
|
||||
% ip)
|
||||
sys.exit(3)
|
||||
return ip
|
||||
|
||||
google_ips = None
|
||||
|
||||
def is_googlebot(ip):
|
||||
import re
|
||||
from fail2ban.server.ipdns import DNSUtils
|
||||
|
||||
host = DNSUtils.ipToName(ip)
|
||||
if not host or not re.match('.*\.google(bot)?\.com$', host):
|
||||
sys.exit(1)
|
||||
return False
|
||||
host_ips = DNSUtils.dnsToIp(host)
|
||||
sys.exit(0 if ip in host_ips else 1)
|
||||
return (ip in host_ips)
|
||||
|
||||
if __name__ == '__main__':
|
||||
is_googlebot(process_args(sys.argv))
|
||||
if __name__ == '__main__': # pragma: no cover
|
||||
try:
|
||||
ret = is_googlebot(process_args(sys.argv))
|
||||
except ValueError as e:
|
||||
sys.stderr.write(str(e))
|
||||
sys.exit(2)
|
||||
sys.exit(0 if ret else 1)
|
||||
|
|
|
@ -28,10 +28,6 @@ import logging
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
if sys.version_info >= (3, 3):
|
||||
import importlib.machinery
|
||||
else:
|
||||
import imp
|
||||
from collections import Mapping
|
||||
try:
|
||||
from collections import OrderedDict
|
||||
|
@ -87,18 +83,11 @@ class Actions(JailThread, Mapping):
|
|||
|
||||
@staticmethod
|
||||
def _load_python_module(pythonModule):
|
||||
pythonModuleName = os.path.splitext(
|
||||
os.path.basename(pythonModule))[0]
|
||||
if sys.version_info >= (3, 3):
|
||||
mod = importlib.machinery.SourceFileLoader(
|
||||
pythonModuleName, pythonModule).load_module()
|
||||
else:
|
||||
mod = imp.load_source(
|
||||
pythonModuleName, pythonModule)
|
||||
if not hasattr(mod, "Action"):
|
||||
mod = Utils.load_python_module(pythonModule)
|
||||
if not hasattr(mod, "Action"): # pragma: no cover
|
||||
raise RuntimeError(
|
||||
"%s module does not have 'Action' class" % pythonModule)
|
||||
elif not issubclass(mod.Action, ActionBase):
|
||||
elif not issubclass(mod.Action, ActionBase): # pragma: no cover
|
||||
raise RuntimeError(
|
||||
"%s module %s does not implement required methods" % (
|
||||
pythonModule, mod.Action.__name__))
|
||||
|
|
|
@ -30,6 +30,11 @@ import sys
|
|||
import time
|
||||
from ..helpers import getLogger, uni_decode
|
||||
|
||||
if sys.version_info >= (3, 3):
|
||||
import importlib.machinery
|
||||
else:
|
||||
import imp
|
||||
|
||||
# Gets the instance of the logger.
|
||||
logSys = getLogger(__name__)
|
||||
|
||||
|
@ -290,3 +295,15 @@ class Utils():
|
|||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def load_python_module(pythonModule):
|
||||
pythonModuleName = os.path.splitext(
|
||||
os.path.basename(pythonModule))[0]
|
||||
if sys.version_info >= (3, 3):
|
||||
mod = importlib.machinery.SourceFileLoader(
|
||||
pythonModuleName, pythonModule).load_module()
|
||||
else:
|
||||
mod = imp.load_source(
|
||||
pythonModuleName, pythonModule)
|
||||
return mod
|
||||
|
|
|
@ -48,6 +48,9 @@ from .dummyjail import DummyJail
|
|||
|
||||
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
|
||||
|
||||
STOCK_CONF_DIR = "config"
|
||||
STOCK = os.path.exists(os.path.join(STOCK_CONF_DIR, 'fail2ban.conf'))
|
||||
|
||||
|
||||
# yoh: per Steven Hiscocks's insight while troubleshooting
|
||||
# https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
|
||||
|
@ -409,6 +412,33 @@ class IgnoreIPDNS(LogCaptureTestCase):
|
|||
self.assertFalse(self.filter.inIgnoreIPList("128.178.50.11"))
|
||||
self.assertFalse(self.filter.inIgnoreIPList("128.178.50.13"))
|
||||
|
||||
def testIgnoreCmdApacheFakegooglebot(self):
|
||||
if not STOCK: # pragma: no cover
|
||||
raise unittest.SkipTest('Skip test because of no STOCK config')
|
||||
cmd = os.path.join(STOCK_CONF_DIR, "filter.d/ignorecommands/apache-fakegooglebot")
|
||||
## below test direct as python module:
|
||||
mod = Utils.load_python_module(cmd)
|
||||
self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "128.178.50.12"])))
|
||||
self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "192.0.2.1"])))
|
||||
bot_ips = ['66.249.66.1']
|
||||
for ip in bot_ips:
|
||||
self.assertTrue(mod.is_googlebot(mod.process_args([cmd, str(ip)])), "test of googlebot ip %s failed" % ip)
|
||||
self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd])))
|
||||
self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd, "192.0"])))
|
||||
## via command:
|
||||
self.filter.setIgnoreCommand(cmd + " <ip>")
|
||||
for ip in bot_ips:
|
||||
self.assertTrue(self.filter.inIgnoreIPList(str(ip)), "test of googlebot ip %s failed" % ip)
|
||||
self.assertLogged('-- returned successfully')
|
||||
self.pruneLog()
|
||||
self.assertFalse(self.filter.inIgnoreIPList("192.0"))
|
||||
self.assertLogged('Argument must be a single valid IP.')
|
||||
self.pruneLog()
|
||||
self.filter.setIgnoreCommand(cmd + " bad arguments <ip>")
|
||||
self.assertFalse(self.filter.inIgnoreIPList("192.0"))
|
||||
self.assertLogged('Please provide a single IP as an argument.')
|
||||
|
||||
|
||||
|
||||
class LogFile(LogCaptureTestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue