speedup server start/stop (waiting for communicate, etc);

extend server socket with timeouts, extend ping with timeout parameter;
pull/1562/head
sebres 2016-09-29 21:02:37 +02:00
parent 542419acab
commit 62b8664175
4 changed files with 105 additions and 40 deletions

View File

@ -32,10 +32,13 @@ import sys
class CSocket: class CSocket:
def __init__(self, sock="/var/run/fail2ban/fail2ban.sock"): def __init__(self, sock="/var/run/fail2ban/fail2ban.sock", timeout=-1):
# Create an INET, STREAMing socket # Create an INET, STREAMing socket
#self.csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #self.csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__csock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.__csock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.__deftout = self.__csock.gettimeout()
if timeout != -1:
self.settimeout(timeout)
#self.csock.connect(("localhost", 2222)) #self.csock.connect(("localhost", 2222))
self.__csock.connect(sock) self.__csock.connect(sock)
@ -50,6 +53,9 @@ class CSocket:
self.__csock.send(obj + CSPROTO.END) self.__csock.send(obj + CSPROTO.END)
return self.receive(self.__csock) return self.receive(self.__csock)
def settimeout(self, timeout):
self.__csock.settimeout(timeout if timeout != -1 else self.__deftout)
def close(self, sendEnd=True): def close(self, sendEnd=True):
if not self.__csock: if not self.__csock:
return return

View File

@ -36,6 +36,8 @@ from .beautifier import Beautifier
from .fail2bancmdline import Fail2banCmdLine, ServerExecutionException, ExitException, \ from .fail2bancmdline import Fail2banCmdLine, ServerExecutionException, ExitException, \
logSys, exit, output logSys, exit, output
from ..server.utils import Utils
PROMPT = "fail2ban> " PROMPT = "fail2ban> "
@ -69,8 +71,9 @@ class Fail2banClient(Fail2banCmdLine, Thread):
logSys.warning("Caught signal %d. Exiting" % signum) logSys.warning("Caught signal %d. Exiting" % signum)
exit(-1) exit(-1)
def __ping(self): def __ping(self, timeout=0.1):
return self.__processCmd([["ping"]], False) return self.__processCmd([["ping"] + ([timeout] if timeout != -1 else [])],
False, timeout=timeout)
@property @property
def beautifier(self): def beautifier(self):
@ -79,7 +82,7 @@ class Fail2banClient(Fail2banCmdLine, Thread):
self._beautifier = Beautifier() self._beautifier = Beautifier()
return self._beautifier return self._beautifier
def __processCmd(self, cmd, showRet=True): def __processCmd(self, cmd, showRet=True, timeout=-1):
client = None client = None
try: try:
beautifier = self.beautifier beautifier = self.beautifier
@ -88,7 +91,11 @@ class Fail2banClient(Fail2banCmdLine, Thread):
beautifier.setInputCmd(c) beautifier.setInputCmd(c)
try: try:
if not client: if not client:
client = CSocket(self._conf["socket"]) client = CSocket(self._conf["socket"], timeout=timeout)
elif timeout != -1:
client.settimeout(timeout)
if self._conf["verbose"] > 2:
logSys.log(5, "CMD: %r", c)
ret = client.send(c) ret = client.send(c)
if ret[0] == 0: if ret[0] == 0:
logSys.log(5, "OK : %r", ret[1]) logSys.log(5, "OK : %r", ret[1])
@ -101,10 +108,10 @@ class Fail2banClient(Fail2banCmdLine, Thread):
streamRet = False streamRet = False
except socket.error as e: except socket.error as e:
if showRet or self._conf["verbose"] > 1: if showRet or self._conf["verbose"] > 1:
if showRet or c != ["ping"]: if showRet or c[0] != "ping":
self.__logSocketError() self.__logSocketError(e, c[0] == "ping")
else: else:
logSys.log(5, " -- ping failed -- %r", e) logSys.log(5, " -- %s failed -- %r", c, e)
return False return False
except Exception as e: # pragma: no cover except Exception as e: # pragma: no cover
if showRet or self._conf["verbose"] > 1: if showRet or self._conf["verbose"] > 1:
@ -125,14 +132,18 @@ class Fail2banClient(Fail2banCmdLine, Thread):
sys.stdout.flush() sys.stdout.flush()
return streamRet return streamRet
def __logSocketError(self): def __logSocketError(self, prevError="", errorOnly=False):
try: try:
if os.access(self._conf["socket"], os.F_OK): # pragma: no cover if os.access(self._conf["socket"], os.F_OK): # pragma: no cover
# This doesn't check if path is a socket, # This doesn't check if path is a socket,
# but socket.error should be raised # but socket.error should be raised
if os.access(self._conf["socket"], os.W_OK): if os.access(self._conf["socket"], os.W_OK):
# Permissions look good, but socket.error was raised # Permissions look good, but socket.error was raised
logSys.error("Unable to contact server. Is it running?") if errorOnly:
logSys.error(prevError)
else:
logSys.error("%sUnable to contact server. Is it running?",
("[%s] " % prevError) if prevError else '')
else: else:
logSys.error("Permission denied to socket: %s," logSys.error("Permission denied to socket: %s,"
" (you must be root)", self._conf["socket"]) " (you must be root)", self._conf["socket"])
@ -188,7 +199,7 @@ class Fail2banClient(Fail2banCmdLine, Thread):
# Start the server or just initialize started one: # Start the server or just initialize started one:
try: try:
if background: if background:
# Start server daemon as fork of client process: # Start server daemon as fork of client process (or new process):
Fail2banServer.startServerAsync(self._conf) Fail2banServer.startServerAsync(self._conf)
# Send config stream to server: # Send config stream to server:
if not self.__processStartStreamAfterWait(stream, False): if not self.__processStartStreamAfterWait(stream, False):
@ -233,6 +244,11 @@ class Fail2banClient(Fail2banCmdLine, Thread):
logSys.log(5, ' client phase %s', phase) logSys.log(5, ' client phase %s', phase)
if not stream: if not stream:
return False return False
# wait a litle bit for phase "start-ready" before enter active waiting:
if phase is not None:
Utils.wait_for(lambda: phase.get('start-ready', None) is not None, 0.5, 0.001)
phase['configure'] = (True if stream else False)
logSys.log(5, ' client phase %s', phase)
# configure server with config stream: # configure server with config stream:
ret = self.__processStartStreamAfterWait(stream, False) ret = self.__processStartStreamAfterWait(stream, False)
if phase is not None: if phase is not None:
@ -293,7 +309,7 @@ class Fail2banClient(Fail2banCmdLine, Thread):
return False return False
# stop options - jail name or --all # stop options - jail name or --all
break break
if self.__ping(): if self.__ping(timeout=-1):
if len(cmd) == 1: if len(cmd) == 1:
jail = '--all' jail = '--all'
ret, stream = self.readConfig() ret, stream = self.readConfig()
@ -311,6 +327,9 @@ class Fail2banClient(Fail2banCmdLine, Thread):
logSys.error("Could not find server") logSys.error("Could not find server")
return False return False
elif len(cmd) > 1 and cmd[0] == "ping":
return self.__processCmd([cmd], timeout=float(cmd[1]))
else: else:
return self.__processCmd([cmd]) return self.__processCmd([cmd])
@ -342,21 +361,23 @@ class Fail2banClient(Fail2banCmdLine, Thread):
# Wait for the server to start (the server has 30 seconds to answer ping) # Wait for the server to start (the server has 30 seconds to answer ping)
starttime = time.time() starttime = time.time()
logSys.log(5, "__waitOnServer: %r", (alive, maxtime)) logSys.log(5, "__waitOnServer: %r", (alive, maxtime))
test = lambda: os.path.exists(self._conf["socket"]) and self.__ping()
with VisualWait(self._conf["verbose"]) as vis:
sltime = 0.0125 / 2 sltime = 0.0125 / 2
test = lambda: os.path.exists(self._conf["socket"]) and self.__ping(timeout=sltime)
with VisualWait(self._conf["verbose"]) as vis:
while self._alive: while self._alive:
runf = test() runf = test()
if runf == alive: if runf == alive:
return True return True
now = time.time() waittime = time.time() - starttime
logSys.log(5, " wait-time: %s", waittime)
# Wonderful visual :) # Wonderful visual :)
if now > starttime + 1: if waittime > 1:
vis.heartbeat() vis.heartbeat()
# f end time reached: # f end time reached:
if now - starttime >= maxtime: if waittime >= maxtime:
raise ServerExecutionException("Failed to start server") raise ServerExecutionException("Failed to start server")
sltime = min(sltime * 2, 0.5) # first 200ms faster:
sltime = min(sltime * 2, 0.5 if waittime > 0.2 else 0.1)
time.sleep(sltime) time.sleep(sltime)
return False return False

View File

@ -178,13 +178,17 @@ class Fail2banServer(Fail2banCmdLine):
logSys.debug('Configure via async client thread') logSys.debug('Configure via async client thread')
cli.configureServer(async=True, phase=phase) cli.configureServer(async=True, phase=phase)
# wait, do not continue if configuration is not 100% valid: # wait, do not continue if configuration is not 100% valid:
Utils.wait_for(lambda: phase.get('ready', None) is not None, self._conf["timeout"]) Utils.wait_for(lambda: phase.get('ready', None) is not None, self._conf["timeout"], 0.001)
logSys.log(5, ' server phase %s', phase)
if not phase.get('start', False): if not phase.get('start', False):
raise ServerExecutionException('Async configuration of server failed') raise ServerExecutionException('Async configuration of server failed')
# Start server, daemonize it, etc. # Start server, daemonize it, etc.
pid = os.getpid() pid = os.getpid()
server = Fail2banServer.startServerDirect(self._conf, background) server = Fail2banServer.startServerDirect(self._conf, background)
if not async:
phase['start-ready'] = True
logSys.log(5, ' server phase %s', phase)
# If forked - just exit other processes # If forked - just exit other processes
if pid != os.getpid(): # pragma: no cover if pid != os.getpid(): # pragma: no cover
os._exit(0) os._exit(0)
@ -193,7 +197,7 @@ class Fail2banServer(Fail2banCmdLine):
# wait for client answer "done": # wait for client answer "done":
if not async and cli: if not async and cli:
Utils.wait_for(lambda: phase.get('done', None) is not None, self._conf["timeout"]) Utils.wait_for(lambda: phase.get('done', None) is not None, self._conf["timeout"], 0.001)
if not phase.get('done', False): if not phase.get('done', False):
if server: # pragma: no cover if server: # pragma: no cover
server.quit() server.quit()

View File

@ -184,6 +184,19 @@ def _start_params(tmp, use_stock=False, logtarget="/dev/null", db=":memory:"):
"--timeout", str(fail2bancmdline.MAX_WAITTIME), "--timeout", str(fail2bancmdline.MAX_WAITTIME),
) )
def _get_pid_from_file(pidfile):
f = pid = None
try:
f = open(pidfile)
pid = f.read()
pid = re.match(r'\S+', pid).group()
return int(pid)
except Exception as e: # pragma: no cover
logSys.debug(e)
finally:
if f is not None:
f.close()
return pid
def _kill_srv(pidfile): def _kill_srv(pidfile):
logSys.debug("cleanup: %r", (pidfile, isdir(pidfile))) logSys.debug("cleanup: %r", (pidfile, isdir(pidfile)))
@ -193,23 +206,22 @@ def _kill_srv(pidfile):
if not isfile(pidfile): # pragma: no cover if not isfile(pidfile): # pragma: no cover
pidfile = pjoin(piddir, "fail2ban.pid") pidfile = pjoin(piddir, "fail2ban.pid")
# output log in heavydebug (to see possible start errors):
if unittest.F2B.log_level < logging.DEBUG: # pragma: no cover
logfile = pjoin(piddir, "f2b.log")
if isfile(logfile):
_out_file(logfile)
else:
logSys.log(5, 'no logfile %r', logfile)
if not isfile(pidfile): if not isfile(pidfile):
logSys.debug("cleanup: no pidfile for %r", piddir) logSys.debug("cleanup: no pidfile for %r", piddir)
return True return True
f = pid = None
try:
logSys.debug("cleanup pidfile: %r", pidfile) logSys.debug("cleanup pidfile: %r", pidfile)
f = open(pidfile) pid = _get_pid_from_file(pidfile)
pid = f.read() if pid is None: # pragma: no cover
pid = re.match(r'\S+', pid).group()
pid = int(pid)
except Exception as e: # pragma: no cover
logSys.debug(e)
return False return False
finally:
if f is not None:
f.close()
try: try:
logSys.debug("cleanup pid: %r", pid) logSys.debug("cleanup pid: %r", pid)
@ -443,6 +455,10 @@ class Fail2banClientTest(Fail2banClientServerBase):
def testClientStartBackgroundCall(self, tmp): def testClientStartBackgroundCall(self, tmp):
global INTERACT global INTERACT
startparams = _start_params(tmp, logtarget=pjoin(tmp, "f2b.log")) startparams = _start_params(tmp, logtarget=pjoin(tmp, "f2b.log"))
# if fast, start server process from client started direct here:
if unittest.F2B.fast: # pragma: no cover
self.execSuccess(startparams + ("start",))
else:
# start (in new process, using the same python version): # start (in new process, using the same python version):
cmd = (sys.executable, pjoin(BIN, CLIENT)) cmd = (sys.executable, pjoin(BIN, CLIENT))
logSys.debug('Start %s ...', cmd) logSys.debug('Start %s ...', cmd)
@ -459,6 +475,24 @@ class Fail2banClientTest(Fail2banClientServerBase):
self.assertLogged("TEST-ECHO") self.assertLogged("TEST-ECHO")
self.assertLogged("Exit with code 0") self.assertLogged("Exit with code 0")
self.pruneLog() self.pruneLog()
# test ping timeout:
self.execSuccess(startparams, "ping", "0.1")
self.assertLogged("Server replied: pong")
self.pruneLog()
# python 3 seems to bypass such short timeouts also,
# so suspend/resume server process and test between it...
pid = _get_pid_from_file(pjoin(tmp, "f2b.pid"))
try:
# suspend:
os.kill(pid, signal.SIGSTOP); # or SIGTSTP?
time.sleep(Utils.DEFAULT_SHORT_INTERVAL)
# test ping with short timeout:
self.execFailed(startparams, "ping", "1e-10")
finally:
# resume:
os.kill(pid, signal.SIGCONT)
self.assertLogged("timed out")
self.pruneLog()
# interactive client chat with started server: # interactive client chat with started server:
INTERACT += [ INTERACT += [
"echo INTERACT-ECHO", "echo INTERACT-ECHO",