mirror of https://github.com/fail2ban/fail2ban
Merge branch 'ignore-file-ip-addr-set':
configuration `ignoreip` and fail2ban-client commands `addignoreip`/`delignoreip` extended with `file:...` syntax to ignore IPs from file-ip-set (containing IP, subnet, dns/fqdn or raw strings); the file would be read lazy on demand, by first ban (and automatically reloaded by update after small latency to avoid expensive stats check on every compare); the entries inside the file can be separated by comma, space or new line with optional comments (text following chars # or ; after space or newline would be ignored up to next newline)pull/3955/head
commit
c54f1a4603
|
@ -282,7 +282,18 @@ def excepthook(exctype, value, traceback):
|
||||||
"Unhandled exception in Fail2Ban:", exc_info=True)
|
"Unhandled exception in Fail2Ban:", exc_info=True)
|
||||||
return sys.__excepthook__(exctype, value, traceback)
|
return sys.__excepthook__(exctype, value, traceback)
|
||||||
|
|
||||||
def splitwords(s):
|
RE_REM_COMMENTS = re.compile(r'(?m)(?:^|\s)[\#;].*')
|
||||||
|
def removeComments(s):
|
||||||
|
"""Helper to remove comments:
|
||||||
|
# comment ...
|
||||||
|
; comment ...
|
||||||
|
no comment # comment ...
|
||||||
|
no comment ; comment ...
|
||||||
|
"""
|
||||||
|
return RE_REM_COMMENTS.sub('', s)
|
||||||
|
|
||||||
|
RE_SPLT_WORDS = re.compile(r'[\s,]+')
|
||||||
|
def splitwords(s, ignoreComments=False):
|
||||||
"""Helper to split words on any comma, space, or a new line
|
"""Helper to split words on any comma, space, or a new line
|
||||||
|
|
||||||
Returns empty list if input is empty (or None) and filters
|
Returns empty list if input is empty (or None) and filters
|
||||||
|
@ -290,7 +301,9 @@ def splitwords(s):
|
||||||
"""
|
"""
|
||||||
if not s:
|
if not s:
|
||||||
return []
|
return []
|
||||||
return list(filter(bool, [v.strip() for v in re.split(r'[\s,]+', s)]))
|
if ignoreComments:
|
||||||
|
s = removeComments(s)
|
||||||
|
return list(filter(bool, [v.strip() for v in RE_SPLT_WORDS.split(s)]))
|
||||||
|
|
||||||
def _merge_dicts(x, y):
|
def _merge_dicts(x, y):
|
||||||
"""Helper to merge dicts.
|
"""Helper to merge dicts.
|
||||||
|
|
|
@ -32,7 +32,7 @@ import time
|
||||||
|
|
||||||
from .actions import Actions
|
from .actions import Actions
|
||||||
from .failmanager import FailManagerEmpty, FailManager
|
from .failmanager import FailManagerEmpty, FailManager
|
||||||
from .ipdns import DNSUtils, IPAddr
|
from .ipdns import DNSUtils, IPAddr, FileIPAddrSet
|
||||||
from .observer import Observers
|
from .observer import Observers
|
||||||
from .ticket import FailTicket
|
from .ticket import FailTicket
|
||||||
from .jailthread import JailThread
|
from .jailthread import JailThread
|
||||||
|
@ -510,6 +510,12 @@ class Filter(JailThread):
|
||||||
# An empty string is always false
|
# An empty string is always false
|
||||||
if ipstr == "":
|
if ipstr == "":
|
||||||
return
|
return
|
||||||
|
# File?
|
||||||
|
ip = FileIPAddrSet.RE_FILE_IGN_IP.match(ipstr)
|
||||||
|
if ip:
|
||||||
|
ip = DNSUtils.getIPsFromFile(ip.group(1)) # FileIPAddrSet
|
||||||
|
self.__ignoreIpList.append(ip)
|
||||||
|
return
|
||||||
# Create IP address object
|
# Create IP address object
|
||||||
ip = IPAddr(ipstr)
|
ip = IPAddr(ipstr)
|
||||||
# Avoid exact duplicates
|
# Avoid exact duplicates
|
||||||
|
@ -532,6 +538,11 @@ class Filter(JailThread):
|
||||||
return
|
return
|
||||||
# delete by ip:
|
# delete by ip:
|
||||||
logSys.debug(" Remove %r from ignore list", ip)
|
logSys.debug(" Remove %r from ignore list", ip)
|
||||||
|
# File?
|
||||||
|
if FileIPAddrSet.RE_FILE_IGN_IP.match(ip):
|
||||||
|
self.__ignoreIpList.remove(ip)
|
||||||
|
return
|
||||||
|
# IP / DNS
|
||||||
if ip in self.__ignoreIpSet:
|
if ip in self.__ignoreIpSet:
|
||||||
self.__ignoreIpSet.remove(ip)
|
self.__ignoreIpSet.remove(ip)
|
||||||
else:
|
else:
|
||||||
|
@ -588,7 +599,7 @@ class Filter(JailThread):
|
||||||
return True
|
return True
|
||||||
for net in self.__ignoreIpList:
|
for net in self.__ignoreIpList:
|
||||||
if ip.isInNet(net):
|
if ip.isInNet(net):
|
||||||
self.logIgnoreIp(ip, log_ignore, ignore_source=("ip" if net.isValid else "dns"))
|
self.logIgnoreIp(ip, log_ignore, ignore_source=(net.instanceType))
|
||||||
if self.__ignoreCache: c.set(key, True)
|
if self.__ignoreCache: c.set(key, True)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
|
@ -23,10 +23,11 @@ __license__ = "GPL"
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
import struct
|
import struct
|
||||||
|
import os
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from .utils import Utils
|
from .utils import Utils
|
||||||
from ..helpers import getLogger
|
from ..helpers import getLogger, MyTime, splitwords
|
||||||
|
|
||||||
# Gets the instance of the logger.
|
# Gets the instance of the logger.
|
||||||
logSys = getLogger(__name__)
|
logSys = getLogger(__name__)
|
||||||
|
@ -79,6 +80,8 @@ class DNSUtils:
|
||||||
# todo: make configurable the expired time and max count of cache entries:
|
# todo: make configurable the expired time and max count of cache entries:
|
||||||
CACHE_nameToIp = Utils.Cache(maxCount=1000, maxTime=5*60)
|
CACHE_nameToIp = Utils.Cache(maxCount=1000, maxTime=5*60)
|
||||||
CACHE_ipToName = Utils.Cache(maxCount=1000, maxTime=5*60)
|
CACHE_ipToName = Utils.Cache(maxCount=1000, maxTime=5*60)
|
||||||
|
# static cache used to hold sets read from files:
|
||||||
|
CACHE_fileToIp = Utils.Cache(maxCount=100, maxTime=5*60)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def dnsToIp(dns):
|
def dnsToIp(dns):
|
||||||
|
@ -229,6 +232,20 @@ class DNSUtils:
|
||||||
DNSUtils.CACHE_nameToIp.set(DNSUtils._getSelfIPs_key, ips)
|
DNSUtils.CACHE_nameToIp.set(DNSUtils._getSelfIPs_key, ips)
|
||||||
return ips
|
return ips
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def getIPsFromFile(fileName, noError=True):
|
||||||
|
"""Get set of IP addresses or subnets from file"""
|
||||||
|
# to find cached IPs:
|
||||||
|
ips = DNSUtils.CACHE_fileToIp.get(fileName)
|
||||||
|
if ips is not None:
|
||||||
|
return ips
|
||||||
|
# try to obtain set from file:
|
||||||
|
ips = FileIPAddrSet(fileName)
|
||||||
|
#ips.load() - load on demand
|
||||||
|
# cache and return :
|
||||||
|
DNSUtils.CACHE_fileToIp.set(fileName, ips)
|
||||||
|
return ips
|
||||||
|
|
||||||
_IPv6IsAllowed = None
|
_IPv6IsAllowed = None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -457,6 +474,10 @@ class IPAddr(object):
|
||||||
def familyStr(self):
|
def familyStr(self):
|
||||||
return IPAddr.FAM2STR.get(self._family)
|
return IPAddr.FAM2STR.get(self._family)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def instanceType(self):
|
||||||
|
return "ip" if self.isValid else "dns"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def plen(self):
|
def plen(self):
|
||||||
return self._plen
|
return self._plen
|
||||||
|
@ -598,6 +619,9 @@ class IPAddr(object):
|
||||||
def isInNet(self, net):
|
def isInNet(self, net):
|
||||||
"""Return either the IP object is in the provided network
|
"""Return either the IP object is in the provided network
|
||||||
"""
|
"""
|
||||||
|
# if addr-set:
|
||||||
|
if isinstance(net, IPAddrSet):
|
||||||
|
return self in net
|
||||||
# if it isn't a valid IP address, try DNS resolution
|
# if it isn't a valid IP address, try DNS resolution
|
||||||
if not net.isValid and net.raw != "":
|
if not net.isValid and net.raw != "":
|
||||||
# Check if IP in DNS
|
# Check if IP in DNS
|
||||||
|
@ -675,15 +699,32 @@ IPAddr.IP6_4COMPAT = IPAddr("::ffff:0:0", 96)
|
||||||
|
|
||||||
class IPAddrSet(set):
|
class IPAddrSet(set):
|
||||||
|
|
||||||
hasSubNet = False
|
hasSubNet = 0
|
||||||
|
|
||||||
def __init__(self, ips=[]):
|
def __init__(self, ips=[]):
|
||||||
|
ips, subnet = IPAddrSet._list2set(ips)
|
||||||
|
set.__init__(self, ips)
|
||||||
|
self.hasSubNet = subnet
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _list2set(ips):
|
||||||
ips2 = set()
|
ips2 = set()
|
||||||
|
subnet = 0
|
||||||
for ip in ips:
|
for ip in ips:
|
||||||
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
|
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
|
||||||
ips2.add(ip)
|
ips2.add(ip)
|
||||||
self.hasSubNet |= not ip.isSingle
|
subnet += not ip.isSingle
|
||||||
set.__init__(self, ips2)
|
return ips2, subnet
|
||||||
|
|
||||||
|
@property
|
||||||
|
def instanceType(self):
|
||||||
|
return "ip-set"
|
||||||
|
|
||||||
|
def set(self, ips):
|
||||||
|
ips, subnet = IPAddrSet._list2set(ips)
|
||||||
|
self.clear()
|
||||||
|
self.update(ips)
|
||||||
|
self.hasSubNet = subnet
|
||||||
|
|
||||||
def add(self, ip):
|
def add(self, ip):
|
||||||
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
|
if not isinstance(ip, IPAddr): ip = IPAddr(ip)
|
||||||
|
@ -696,6 +737,69 @@ class IPAddrSet(set):
|
||||||
return set.__contains__(self, ip) or (self.hasSubNet and any(n.contains(ip) for n in self))
|
return set.__contains__(self, ip) or (self.hasSubNet and any(n.contains(ip) for n in self))
|
||||||
|
|
||||||
|
|
||||||
|
class FileIPAddrSet(IPAddrSet):
|
||||||
|
|
||||||
|
# RE matching file://...
|
||||||
|
RE_FILE_IGN_IP = re.compile(r'^file:/{0,2}(.*)$')
|
||||||
|
|
||||||
|
fileName = ''
|
||||||
|
_shortRepr = None
|
||||||
|
maxUpdateLatency = 1 # latency in seconds to update by changes
|
||||||
|
_nextCheck = 0
|
||||||
|
_fileStats = ()
|
||||||
|
|
||||||
|
def __init__(self, fileName=''):
|
||||||
|
self.fileName = fileName
|
||||||
|
# self.load() - lazy load on demand by first check (in, __contains__ etc)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def instanceType(self):
|
||||||
|
return repr(self)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if id(self) == id(other): return 1
|
||||||
|
# to allow remove file-set from list (delIgnoreIP) by its name:
|
||||||
|
if isinstance(other, FileIPAddrSet):
|
||||||
|
return self.fileName == other.fileName
|
||||||
|
m = FileIPAddrSet.RE_FILE_IGN_IP.match(other)
|
||||||
|
if m:
|
||||||
|
return self.fileName == m.group(1)
|
||||||
|
|
||||||
|
def load(self, ifNeeded=True, noError=True):
|
||||||
|
try:
|
||||||
|
if ifNeeded:
|
||||||
|
tm = MyTime.time()
|
||||||
|
if tm > self._nextCheck:
|
||||||
|
self._nextCheck = tm + self.maxUpdateLatency
|
||||||
|
stats = os.stat(self.fileName)
|
||||||
|
stats = stats.st_mtime, stats.st_ino, stats.st_size
|
||||||
|
if self._fileStats == stats:
|
||||||
|
return
|
||||||
|
self._fileStats = stats
|
||||||
|
with open(self.fileName, 'r') as f:
|
||||||
|
ips = f.read()
|
||||||
|
ips = splitwords(ips, ignoreComments=True)
|
||||||
|
self.set(ips)
|
||||||
|
except Exception as e: # pragma: no cover
|
||||||
|
if not noError: raise e
|
||||||
|
logSys.warning("Retrieving IPs set from %r failed: %s", self.fileName, e)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
if not self._shortRepr:
|
||||||
|
shortfn = os.path.basename(self.fileName)
|
||||||
|
if shortfn != self.fileName:
|
||||||
|
shortfn = '.../' + shortfn
|
||||||
|
self._shortRepr = 'file:' + shortfn + ')'
|
||||||
|
return self._shortRepr
|
||||||
|
|
||||||
|
def __contains__(self, ip):
|
||||||
|
# check it is uptodate (not often than maxUpdateLatency):
|
||||||
|
if self.fileName:
|
||||||
|
self.load(ifNeeded=True)
|
||||||
|
# inherited contains:
|
||||||
|
return IPAddrSet.__contains__(self, ip)
|
||||||
|
|
||||||
|
|
||||||
def _NetworkInterfacesAddrs(withMask=False):
|
def _NetworkInterfacesAddrs(withMask=False):
|
||||||
|
|
||||||
# Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand:
|
# Closure implementing lazy load modules and libc and define _NetworkInterfacesAddrs on demand:
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
test-local-net ; test local subnet
|
||||||
|
test-subnet-a, test-subnet-b ; further test subnets
|
||||||
|
192.0.2.200, 2001:0db8::00c8 # 2 IPs
|
||||||
|
192.0.2.216/29, 2001:db8::d8/125 # 2 subnets by IP/CIDR notation
|
|
@ -399,6 +399,82 @@ class IgnoreIP(LogCaptureTestCase):
|
||||||
self.filter.addIgnoreIP('192.168.1.0/255.255.0.0')
|
self.filter.addIgnoreIP('192.168.1.0/255.255.0.0')
|
||||||
self.assertRaises(ValueError, self.filter.addIgnoreIP, '192.168.1.0/255.255.0.128')
|
self.assertRaises(ValueError, self.filter.addIgnoreIP, '192.168.1.0/255.255.0.128')
|
||||||
|
|
||||||
|
def testIgnoreIPDNS(self):
|
||||||
|
# test subnets are pre-cached (as IPAddrSet), so it shall work even without network:
|
||||||
|
for dns in ("test-subnet-a", "test-subnet-b"):
|
||||||
|
self.filter.addIgnoreIP(dns)
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.0.2.1')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.0.2.7')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.0.2.16')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('192.0.2.23')))
|
||||||
|
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.0.2.8')))
|
||||||
|
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('192.0.2.15')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:db8::00')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:db8::07')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:0db8:0000:0000:0000:0000:0000:0000')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:0db8:0000:0000:0000:0000:0000:0007')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:db8::10')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:db8::17')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:0db8:0000:0000:0000:0000:0000:0010')))
|
||||||
|
self.assertTrue(self.filter.inIgnoreIPList(IPAddr('2001:0db8:0000:0000:0000:0000:0000:0017')))
|
||||||
|
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('2001:db8::08')))
|
||||||
|
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('2001:db8::0f')))
|
||||||
|
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('2001:0db8:0000:0000:0000:0000:0000:0008')))
|
||||||
|
self.assertFalse(self.filter.inIgnoreIPList(IPAddr('2001:0db8:0000:0000:0000:0000:0000:000f')))
|
||||||
|
|
||||||
|
# to test several IPs in ip-set from file "files/test-ign-ips-file":
|
||||||
|
TEST_IPS_IGN_FILE = {
|
||||||
|
'127.0.0.1': True,
|
||||||
|
'127.255.255.255': True,
|
||||||
|
'127.0.0.1/8': True,
|
||||||
|
'192.0.2.1': True,
|
||||||
|
'192.0.2.7': True,
|
||||||
|
'192.0.2.0/29': True,
|
||||||
|
'192.0.2.16': True,
|
||||||
|
'192.0.2.23': True,
|
||||||
|
'192.0.2.200': True,
|
||||||
|
'192.0.2.216': True,
|
||||||
|
'192.0.2.223': True,
|
||||||
|
'192.0.2.216/29': True,
|
||||||
|
'192.0.2.8': False,
|
||||||
|
'192.0.2.15': False,
|
||||||
|
'192.0.2.100': False,
|
||||||
|
'192.0.2.224': False,
|
||||||
|
'::1': True,
|
||||||
|
'2001:db8::00': True,
|
||||||
|
'2001:db8::07': True,
|
||||||
|
'2001:db8::0/125': True,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:0000': True,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:0007': True,
|
||||||
|
'2001:db8::10': True,
|
||||||
|
'2001:db8::17': True,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:0010': True,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:0017': True,
|
||||||
|
'2001:db8::c8': True,
|
||||||
|
'2001:db8::d8': True,
|
||||||
|
'2001:db8::df': True,
|
||||||
|
'2001:db8::d8/125': True,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:00d8': True,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:00df': True,
|
||||||
|
'2001:db8::08': False,
|
||||||
|
'2001:db8::0f': False,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:0008': False,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:000f': False,
|
||||||
|
'2001:db8::e0': False,
|
||||||
|
'2001:0db8:0000:0000:0000:0000:0000:00e0': False,
|
||||||
|
}
|
||||||
|
|
||||||
|
def testIgnoreIPFileIPAddr(self):
|
||||||
|
fname = 'file://' + os.path.join(TEST_FILES_DIR, "test-ign-ips-file")
|
||||||
|
self.filter.ignoreSelf = False
|
||||||
|
self.filter.addIgnoreIP(fname)
|
||||||
|
for ip, v in IgnoreIP.TEST_IPS_IGN_FILE.items():
|
||||||
|
self.assertEqual(self.filter.inIgnoreIPList(IPAddr(ip)), v, ("for %r in ignoreip, file://test-ign-ips-file)" % (ip,)))
|
||||||
|
# now remove it:
|
||||||
|
self.filter.delIgnoreIP(fname)
|
||||||
|
for ip in IgnoreIP.TEST_IPS_IGN_FILE.keys():
|
||||||
|
self.assertEqual(self.filter.inIgnoreIPList(IPAddr(ip)), False, ("for %r ignoreip, without file://test-ign-ips-file)" % (ip,)))
|
||||||
|
|
||||||
def testIgnoreInProcessLine(self):
|
def testIgnoreInProcessLine(self):
|
||||||
setUpMyTime()
|
setUpMyTime()
|
||||||
try:
|
try:
|
||||||
|
@ -2427,6 +2503,53 @@ class DNSUtilsNetworkTests(unittest.TestCase):
|
||||||
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getSelfIPs_key)
|
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getSelfIPs_key)
|
||||||
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getNetIntrfIPs_key)
|
DNSUtils.CACHE_nameToIp.unset(DNSUtils._getNetIntrfIPs_key)
|
||||||
|
|
||||||
|
def test_FileIPAddrSet(self):
|
||||||
|
fname = os.path.join(TEST_FILES_DIR, "test-ign-ips-file")
|
||||||
|
ips = DNSUtils.getIPsFromFile(fname)
|
||||||
|
for ip, v in IgnoreIP.TEST_IPS_IGN_FILE.items():
|
||||||
|
self.assertEqual(IPAddr(ip) in ips, v, ("for %r in test-ign-ips-file\n containing %s)" % (ip, set(ips))))
|
||||||
|
|
||||||
|
def test_FileIPAddrSet_Update(self):
|
||||||
|
fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.ips')
|
||||||
|
f = open(fname, 'wb')
|
||||||
|
try:
|
||||||
|
f.write(b"192.0.2.200, 192.0.2.201\n")
|
||||||
|
f.flush()
|
||||||
|
ips = DNSUtils.getIPsFromFile(fname)
|
||||||
|
self.assertTrue(IPAddr('192.0.2.200') in ips)
|
||||||
|
self.assertTrue(IPAddr('192.0.2.201') in ips)
|
||||||
|
self.assertFalse(IPAddr('192.0.2.202') in ips)
|
||||||
|
# +1m, jump to next minute to force next check for update:
|
||||||
|
MyTime.setTime(MyTime.time() + 60)
|
||||||
|
# add .202, some comment and check all 3 IPs are there:
|
||||||
|
f.write(b"""192.0.2.202\n
|
||||||
|
# 2001:db8::ca/127 ; IPv6 commented yet
|
||||||
|
""")
|
||||||
|
f.flush()
|
||||||
|
self.assertTrue(IPAddr('192.0.2.200') in ips)
|
||||||
|
self.assertTrue(IPAddr('192.0.2.201') in ips)
|
||||||
|
self.assertTrue(IPAddr('192.0.2.202') in ips)
|
||||||
|
self.assertFalse(IPAddr('2001:db8::ca') in ips)
|
||||||
|
self.assertFalse(IPAddr('2001:db8::cb') in ips)
|
||||||
|
# +1m, jump to next minute to force next check for update:
|
||||||
|
MyTime.setTime(MyTime.time() + 60)
|
||||||
|
# remove .200, add IPv6-subnet and check all new IPs are there:
|
||||||
|
f.seek(0); f.truncate()
|
||||||
|
f.write(b"""
|
||||||
|
# 192.0.2.200 ; commented
|
||||||
|
192.0.2.201, 192.0.2.202 # no .200 anymore
|
||||||
|
2001:db8::ca/127 ; but 2 new IPv6
|
||||||
|
""")
|
||||||
|
f.flush()
|
||||||
|
self.assertFalse(IPAddr('192.0.2.200') in ips)
|
||||||
|
self.assertTrue(IPAddr('192.0.2.201') in ips)
|
||||||
|
self.assertTrue(IPAddr('192.0.2.202') in ips)
|
||||||
|
self.assertTrue(IPAddr('2001:db8::ca') in ips)
|
||||||
|
self.assertTrue(IPAddr('2001:db8::cb') in ips)
|
||||||
|
finally:
|
||||||
|
tearDownMyTime()
|
||||||
|
_killfile(f, fname)
|
||||||
|
|
||||||
def testFQDN(self):
|
def testFQDN(self):
|
||||||
unittest.F2B.SkipIfNoNetwork()
|
unittest.F2B.SkipIfNoNetwork()
|
||||||
sname = DNSUtils.getHostname(fqdn=False)
|
sname = DNSUtils.getHostname(fqdn=False)
|
||||||
|
|
|
@ -34,7 +34,7 @@ from io import StringIO
|
||||||
from .utils import LogCaptureTestCase, logSys as DefLogSys
|
from .utils import LogCaptureTestCase, logSys as DefLogSys
|
||||||
|
|
||||||
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, \
|
from ..helpers import formatExceptionInfo, mbasename, TraceBack, FormatterWithTraceBack, getLogger, \
|
||||||
getVerbosityFormat, splitwords, uni_decode, uni_string
|
getVerbosityFormat, removeComments, splitwords, uni_decode, uni_string
|
||||||
from ..server.mytime import MyTime
|
from ..server.mytime import MyTime
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,6 +68,16 @@ class HelpersTest(unittest.TestCase):
|
||||||
self.assertEqual(splitwords(' 1\n 2, 3'), ['1', '2', '3'])
|
self.assertEqual(splitwords(' 1\n 2, 3'), ['1', '2', '3'])
|
||||||
self.assertEqual(splitwords('\t1\t 2,\r\n 3\n'), ['1', '2', '3']); # other spaces
|
self.assertEqual(splitwords('\t1\t 2,\r\n 3\n'), ['1', '2', '3']); # other spaces
|
||||||
|
|
||||||
|
def testSplitNoComments(self):
|
||||||
|
s = '''
|
||||||
|
# comment ...
|
||||||
|
; comment ...
|
||||||
|
line1 A # comment ...
|
||||||
|
line2 B ; comment ...
|
||||||
|
'''
|
||||||
|
self.assertEqual(splitwords(s, ignoreComments=True), ['line1', 'A', 'line2', 'B'])
|
||||||
|
self.assertEqual(splitwords(removeComments(s)), ['line1', 'A', 'line2', 'B'])
|
||||||
|
|
||||||
|
|
||||||
def _sh_call(cmd):
|
def _sh_call(cmd):
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
|
@ -39,7 +39,7 @@ from io import StringIO
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
from ..helpers import getLogger, str2LogLevel, getVerbosityFormat, uni_decode
|
from ..helpers import getLogger, str2LogLevel, getVerbosityFormat, uni_decode
|
||||||
from ..server.ipdns import IPAddr, DNSUtils
|
from ..server.ipdns import IPAddr, IPAddrSet, DNSUtils
|
||||||
from ..server.mytime import MyTime
|
from ..server.mytime import MyTime
|
||||||
from ..server.utils import Utils
|
from ..server.utils import Utils
|
||||||
# for action_d.test_smtp :
|
# for action_d.test_smtp :
|
||||||
|
@ -335,6 +335,12 @@ def initTests(opts):
|
||||||
ips = set([IPAddr('127.0.0.1'), IPAddr('::1')]); # DNSUtils.dnsToIp('localhost')
|
ips = set([IPAddr('127.0.0.1'), IPAddr('::1')]); # DNSUtils.dnsToIp('localhost')
|
||||||
for i in DNSUtils.getSelfNames():
|
for i in DNSUtils.getSelfNames():
|
||||||
c.set(i, ips)
|
c.set(i, ips)
|
||||||
|
# some test subnets (although normally they are not resolved to addr/cidr,
|
||||||
|
# we'll use IPAddrSet here to seek through the resolved subnet in tests):
|
||||||
|
c = DNSUtils.CACHE_nameToIp
|
||||||
|
c.set('test-local-net', IPAddrSet([IPAddr('127.0.0.1/8'), IPAddr('::1')]))
|
||||||
|
c.set('test-subnet-a', IPAddrSet([IPAddr('192.0.2.0/29'), IPAddr('2001:db8::0/125')])); # 192.0.2.0 .. 192.0.2.7, 2001:db8::00 .. 2001:db8::07
|
||||||
|
c.set('test-subnet-b', IPAddrSet([IPAddr('192.0.2.16/29'), IPAddr('2001:db8::10/125')])); # 192.0.2.16 .. 192.0.2.23, 2001:db8::10 .. 2001:db8::17
|
||||||
|
|
||||||
|
|
||||||
def mtimesleep():
|
def mtimesleep():
|
||||||
|
|
|
@ -247,7 +247,8 @@ Values can also be quoted (required when value includes a ","). More that one ac
|
||||||
boolean value (default true) indicates the banning of own IP addresses should be prevented
|
boolean value (default true) indicates the banning of own IP addresses should be prevented
|
||||||
.TP
|
.TP
|
||||||
.B ignoreip
|
.B ignoreip
|
||||||
list of IPs not to ban. They can include a DNS resp. CIDR mask too. The option affects additionally to \fBignoreself\fR (if true) and don't need to contain own DNS resp. IPs of the running host.
|
list of IPs not to ban. They can also include CIDR mask or can be DNS (FQDN), or even raw string (if jail banning IDs instead of IPs). The option affects additionally to \fBignoreself\fR (if true) and don't need to contain own DNS resp. IPs of the running host.
|
||||||
|
This can also contain a filename (prefixed with "file:") with entries to ignore, which will be lazy loaded to the runtime on demand by first ban and automatically reloaded by update after small latency.
|
||||||
.TP
|
.TP
|
||||||
.B ignorecommand
|
.B ignorecommand
|
||||||
command that is executed to determine if the current candidate IP for banning (or failure-ID for raw IDs) should not be banned. This option operates alongside the \fBignoreself\fR and \fBignoreip\fR options. It is executed first, only if neither \fBignoreself\fR nor \fBignoreip\fR match the criteria.
|
command that is executed to determine if the current candidate IP for banning (or failure-ID for raw IDs) should not be banned. This option operates alongside the \fBignoreself\fR and \fBignoreip\fR options. It is executed first, only if neither \fBignoreself\fR nor \fBignoreip\fR match the criteria.
|
||||||
|
|
Loading…
Reference in New Issue