mirror of https://github.com/fail2ban/fail2ban
test coverage
parent
57e9c25449
commit
32f3c1dbf3
|
@ -109,33 +109,40 @@ class ConfigReader():
|
||||||
self._cfg = ConfigReaderUnshared(**self._cfg_share_kwargs)
|
self._cfg = ConfigReaderUnshared(**self._cfg_share_kwargs)
|
||||||
|
|
||||||
def sections(self):
|
def sections(self):
|
||||||
if self._cfg is not None:
|
try:
|
||||||
return self._cfg.sections()
|
return self._cfg.sections()
|
||||||
return []
|
except AttributeError:
|
||||||
|
return []
|
||||||
|
|
||||||
def has_section(self, sec):
|
def has_section(self, sec):
|
||||||
if self._cfg is not None:
|
try:
|
||||||
return self._cfg.has_section(sec)
|
return self._cfg.has_section(sec)
|
||||||
return False
|
except AttributeError:
|
||||||
|
return False
|
||||||
def merge_section(self, *args, **kwargs):
|
|
||||||
if self._cfg is not None:
|
|
||||||
return self._cfg.merge_section(*args, **kwargs)
|
|
||||||
|
|
||||||
|
def merge_section(self, section, *args, **kwargs):
|
||||||
|
try:
|
||||||
|
return self._cfg.merge_section(section, *args, **kwargs)
|
||||||
|
except AttributeError:
|
||||||
|
raise NoSectionError(section)
|
||||||
|
|
||||||
def options(self, section, onlyOwn=False):
|
def options(self, section, onlyOwn=False):
|
||||||
if self._cfg is not None:
|
try:
|
||||||
return self._cfg.options(section, onlyOwn)
|
return self._cfg.options(section, onlyOwn)
|
||||||
return {}
|
except AttributeError:
|
||||||
|
raise NoSectionError(section)
|
||||||
|
|
||||||
def get(self, sec, opt, raw=False, vars={}):
|
def get(self, sec, opt, raw=False, vars={}):
|
||||||
if self._cfg is not None:
|
try:
|
||||||
return self._cfg.get(sec, opt, raw=raw, vars=vars)
|
return self._cfg.get(sec, opt, raw=raw, vars=vars)
|
||||||
return None
|
except AttributeError:
|
||||||
|
raise NoSectionError(sec)
|
||||||
|
|
||||||
def getOptions(self, *args, **kwargs):
|
def getOptions(self, section, *args, **kwargs):
|
||||||
if self._cfg is not None:
|
try:
|
||||||
return self._cfg.getOptions(*args, **kwargs)
|
return self._cfg.getOptions(section, *args, **kwargs)
|
||||||
return {}
|
except AttributeError:
|
||||||
|
raise NoSectionError(section)
|
||||||
|
|
||||||
|
|
||||||
class ConfigReaderUnshared(SafeConfigParserWithIncludes):
|
class ConfigReaderUnshared(SafeConfigParserWithIncludes):
|
||||||
|
|
|
@ -319,7 +319,7 @@ class Utils():
|
||||||
return e.errno == errno.EPERM
|
return e.errno == errno.EPERM
|
||||||
else:
|
else:
|
||||||
return True
|
return True
|
||||||
else:
|
else: # pragma : no cover (no windows currently supported)
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def pid_exists(pid):
|
def pid_exists(pid):
|
||||||
import ctypes
|
import ctypes
|
||||||
|
|
|
@ -28,7 +28,7 @@ import re
|
||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
from ..client.configreader import ConfigReader, ConfigReaderUnshared
|
from ..client.configreader import ConfigReader, ConfigReaderUnshared, NoSectionError
|
||||||
from ..client import configparserinc
|
from ..client import configparserinc
|
||||||
from ..client.jailreader import JailReader
|
from ..client.jailreader import JailReader
|
||||||
from ..client.filterreader import FilterReader
|
from ..client.filterreader import FilterReader
|
||||||
|
@ -317,7 +317,17 @@ class JailReaderTest(LogCaptureTestCase):
|
||||||
self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2)
|
self.assertLogged('File %s is a dangling link, thus cannot be monitored' % f2)
|
||||||
self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), [])
|
self.assertEqual(JailReader._glob(os.path.join(d, 'nonexisting')), [])
|
||||||
|
|
||||||
|
def testCommonFunction(self):
|
||||||
|
c = ConfigReader(share_config={})
|
||||||
|
# test common functionalities (no shared, without read of config):
|
||||||
|
self.assertEqual(c.sections(), [])
|
||||||
|
self.assertFalse(c.has_section('test'))
|
||||||
|
self.assertRaises(NoSectionError, c.merge_section, 'test', {})
|
||||||
|
self.assertRaises(NoSectionError, c.options, 'test')
|
||||||
|
self.assertRaises(NoSectionError, c.get, 'test', 'any')
|
||||||
|
self.assertRaises(NoSectionError, c.getOptions, 'test', {})
|
||||||
|
|
||||||
|
|
||||||
class FilterReaderTest(unittest.TestCase):
|
class FilterReaderTest(unittest.TestCase):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
@ -712,6 +722,7 @@ class JailsReaderTest(LogCaptureTestCase):
|
||||||
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
|
self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock')
|
||||||
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
|
self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid')
|
||||||
|
|
||||||
|
configurator.readAll()
|
||||||
configurator.getOptions()
|
configurator.getOptions()
|
||||||
configurator.convertToProtocol()
|
configurator.convertToProtocol()
|
||||||
commands = configurator.getConfigStream()
|
commands = configurator.getConfigStream()
|
||||||
|
|
Loading…
Reference in New Issue