mirror of https://github.com/fail2ban/fail2ban
fix sporadic bugs within asyncserver, cherry picked from "f2b-perfom-prepare-716-cs":
- differentiate between in loop and active (reset active only after really leaves a loop); - differentiate between foreign and self loop-thread by stop/close connection; - stops logging connection errors if too many errors; - safe remove socket (no sporadic errors if too fast start/stop occurred);pull/1346/head
parent
deca0b80ab
commit
01b379ab2e
|
@ -32,6 +32,7 @@ import fcntl
|
|||
import os
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
from .utils import Utils
|
||||
|
@ -98,10 +99,11 @@ def loop(active, timeout=None, use_poll=False):
|
|||
to avoid loop timeout mistake: different in poll and poll2 (sec vs ms),
|
||||
and to prevent sporadic errors like EBADF 'Bad file descriptor' etc. (see gh-161)
|
||||
"""
|
||||
errCount = 0
|
||||
if timeout is None:
|
||||
timeout = Utils.DEFAULT_SLEEP_TIME
|
||||
poll = asyncore.poll
|
||||
if use_poll and asyncore.poll2 and hasattr(asyncore.select, 'poll'): # pragma: no cover
|
||||
if use_poll and asyncore.poll2 and hasattr(asyncore.select, 'poll'): # pragma: no cover
|
||||
logSys.debug('Server listener (select) uses poll')
|
||||
# poll2 expected a timeout in milliseconds (but poll and loop in seconds):
|
||||
timeout = float(timeout) / 1000
|
||||
|
@ -110,11 +112,20 @@ def loop(active, timeout=None, use_poll=False):
|
|||
while active():
|
||||
try:
|
||||
poll(timeout)
|
||||
except Exception as e: # pragma: no cover
|
||||
if e.args[0] in (errno.ENOTCONN, errno.EBADF): # (errno.EBADF, 'Bad file descriptor')
|
||||
logSys.info('Server connection was closed: %s', str(e))
|
||||
else:
|
||||
logSys.error('Server connection was closed: %s', str(e))
|
||||
if errCount:
|
||||
errCount -= 1
|
||||
except Exception as e: # pragma: no cover
|
||||
if not active():
|
||||
break
|
||||
errCount += 1
|
||||
if errCount < 20:
|
||||
if e.args[0] in (errno.ENOTCONN, errno.EBADF): # (errno.EBADF, 'Bad file descriptor')
|
||||
logSys.info('Server connection was closed: %s', str(e))
|
||||
else:
|
||||
logSys.error('Server connection was closed: %s', str(e))
|
||||
elif errCount == 20:
|
||||
logSys.info('Too many errors - stop logging connection errors')
|
||||
logSys.exception(e)
|
||||
|
||||
|
||||
##
|
||||
|
@ -159,13 +170,14 @@ class AsyncServer(asyncore.dispatcher):
|
|||
# @param force: remove the socket file if exists.
|
||||
|
||||
def start(self, sock, force, use_poll=False):
|
||||
self.__worker = threading.current_thread()
|
||||
self.__sock = sock
|
||||
# Remove socket
|
||||
if os.path.exists(sock):
|
||||
logSys.error("Fail2ban seems to be already running")
|
||||
if force:
|
||||
logSys.warning("Forcing execution of the server")
|
||||
os.remove(sock)
|
||||
self._remove_sock()
|
||||
else:
|
||||
raise AsyncServerException("Server already running")
|
||||
# Creates the socket.
|
||||
|
@ -178,19 +190,25 @@ class AsyncServer(asyncore.dispatcher):
|
|||
AsyncServer.__markCloseOnExec(self.socket)
|
||||
self.listen(1)
|
||||
# Sets the init flag.
|
||||
self.__init = self.__active = True
|
||||
self.__init = self.__loop = self.__active = True
|
||||
# Event loop as long as active:
|
||||
loop(lambda: self.__active, use_poll=use_poll)
|
||||
loop(lambda: self.__loop, use_poll=use_poll)
|
||||
self.__active = False
|
||||
# Cleanup all
|
||||
self.stop()
|
||||
|
||||
|
||||
def close(self):
|
||||
if self.__active:
|
||||
self.__loop = False
|
||||
asyncore.dispatcher.close(self)
|
||||
# If not the loop thread (stops self in handler), wait (a little bit)
|
||||
# for the server leaves loop, before remove socket
|
||||
if threading.current_thread() != self.__worker:
|
||||
Utils.wait_for(lambda: not self.__active, 1)
|
||||
# Remove socket (file) only if it was created:
|
||||
if self.__init and os.path.exists(self.__sock):
|
||||
os.remove(self.__sock)
|
||||
self._remove_sock()
|
||||
logSys.debug("Removed socket file " + self.__sock)
|
||||
logSys.debug("Socket shutdown")
|
||||
self.__active = False
|
||||
|
@ -205,6 +223,16 @@ class AsyncServer(asyncore.dispatcher):
|
|||
def isActive(self):
|
||||
return self.__active
|
||||
|
||||
##
|
||||
# Safe remove (in multithreaded mode):
|
||||
|
||||
def _remove_sock(self):
|
||||
try:
|
||||
os.remove(self.__sock)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
##
|
||||
# Marks socket as close-on-exec to avoid leaking file descriptors when
|
||||
# running actions involving command execution.
|
||||
|
|
Loading…
Reference in New Issue