diff --git a/fail2ban/client/csocket.py b/fail2ban/client/csocket.py index 6b478460..86dd17c9 100644 --- a/fail2ban/client/csocket.py +++ b/fail2ban/client/csocket.py @@ -61,6 +61,7 @@ class CSocket: return if sendEnd: self.__csock.sendall(CSPROTO.CLOSE + CSPROTO.END) + self.__csock.shutdown(socket.SHUT_RDWR) self.__csock.close() self.__csock = None diff --git a/fail2ban/helpers.py b/fail2ban/helpers.py index 98d59fa1..6a3ed2fd 100644 --- a/fail2ban/helpers.py +++ b/fail2ban/helpers.py @@ -393,7 +393,9 @@ class BgService(object): self.__count = self.__threshold; if hasattr(gc, 'set_threshold'): gc.set_threshold(0) - gc.disable() + # don't disable auto garbage, because of non-reference-counting python's (like pypy), + # otherwise it may leak there on objects like unix-socket, etc. + #gc.disable() def service(self, force=False, wait=False): self.__count -= 1 diff --git a/fail2ban/server/asyncserver.py b/fail2ban/server/asyncserver.py index e254979d..eb99c69a 100644 --- a/fail2ban/server/asyncserver.py +++ b/fail2ban/server/asyncserver.py @@ -42,21 +42,36 @@ from ..helpers import logging, getLogger, formatExceptionInfo # Gets the instance of the logger. logSys = getLogger(__name__) + ## # Request handler class. # # This class extends asynchat in order to provide a request handler for # incoming query. - class RequestHandler(asynchat.async_chat): def __init__(self, conn, transmitter): asynchat.async_chat.__init__(self, conn) + self.__conn = conn self.__transmitter = transmitter self.__buffer = [] # Sets the terminator. self.set_terminator(CSPROTO.END) + def __close(self): + if self.__conn: + conn = self.__conn + self.__conn = None + try: + conn.shutdown(socket.SHUT_RDWR) + conn.close() + except socket.error: # pragma: no cover - normally unreachable + pass + + def handle_close(self): + self.__close() + asynchat.async_chat.handle_close(self) + def collect_incoming_data(self, data): #logSys.debug("Received raw data: " + str(data)) self.__buffer.append(data) @@ -111,14 +126,15 @@ class RequestHandler(asynchat.async_chat): self.close_when_done() -def loop(active, timeout=None, use_poll=False): +def loop(active, timeout=None, use_poll=False, err_count=None): """Custom event loop implementation Uses poll instead of loop to respect `active` flag, to avoid loop timeout mistake: different in poll and poll2 (sec vs ms), and to prevent sporadic errors like EBADF 'Bad file descriptor' etc. (see gh-161) """ - errCount = 0 + if not err_count: err_count={} + err_count['listen'] = 0 if timeout is None: timeout = Utils.DEFAULT_SLEEP_TIME poll = asyncore.poll @@ -133,22 +149,29 @@ def loop(active, timeout=None, use_poll=False): while active(): try: poll(timeout) - if errCount: - errCount -= 1 + if err_count['listen']: + err_count['listen'] -= 1 except Exception as e: if not active(): break - errCount += 1 - if errCount < 20: + err_count['listen'] += 1 + if err_count['listen'] < 20: # errno.ENOTCONN - 'Socket is not connected' # errno.EBADF - 'Bad file descriptor' if e.args[0] in (errno.ENOTCONN, errno.EBADF): # pragma: no cover (too sporadic) logSys.info('Server connection was closed: %s', str(e)) else: logSys.error('Server connection was closed: %s', str(e)) - elif errCount == 20: + elif err_count['listen'] == 20: logSys.exception(e) logSys.error('Too many errors - stop logging connection errors') + elif err_count['listen'] > 100: # pragma: no cover - normally unreachable + if ( + e.args[0] == errno.EMFILE # [Errno 24] Too many open files + or sum(err_count.itervalues()) > 1000 + ): + logSys.critical("Too many errors - critical count reached %r", err_count) + break ## @@ -165,6 +188,7 @@ class AsyncServer(asyncore.dispatcher): self.__sock = "/var/run/fail2ban/fail2ban.sock" self.__init = False self.__active = False + self.__errCount = {'accept': 0, 'listen': 0} self.onstart = None ## @@ -176,12 +200,25 @@ class AsyncServer(asyncore.dispatcher): def handle_accept(self): try: conn, addr = self.accept() - except socket.error: # pragma: no cover - logSys.warning("Socket error") + except socket.error as e: # pragma: no cover + self.__errCount['accept'] += 1 + if self.__errCount['accept'] < 20: + logSys.warning("Socket error: %s", e) + elif self.__errCount['accept'] == 20: + logSys.error("Too many acceptor errors - stop logging errors") + elif self.__errCount['accept'] > 100: + if ( + e.args[0] == errno.EMFILE # [Errno 24] Too many open files + or sum(self.__errCount.itervalues()) > 1000 + ): + logSys.critical("Too many errors - critical count reached %r", err_count) + self.stop() return - except TypeError: # pragma: no cover - logSys.warning("Type error") + except TypeError as e: # pragma: no cover + logSys.warning("Type error: %s", e) return + if self.__errCount['accept']: + self.__errCount['accept'] -= 1; AsyncServer.__markCloseOnExec(conn) # Creates an instance of the handler class to handle the # request/response on the incoming connection. @@ -219,7 +256,7 @@ class AsyncServer(asyncore.dispatcher): if self.onstart: self.onstart() # Event loop as long as active: - loop(lambda: self.__loop, timeout=timeout, use_poll=use_poll) + loop(lambda: self.__loop, timeout=timeout, use_poll=use_poll, err_count=self.__errCount) self.__active = False # Cleanup all self.stop() @@ -246,13 +283,21 @@ class AsyncServer(asyncore.dispatcher): # Stops the communication server. def stop_communication(self): - logSys.debug("Stop communication") - self.__transmitter = None + if self.__transmitter: + logSys.debug("Stop communication") + self.__transmitter = None + # shutdown socket here: + if self.socket: + try: + self.socket.shutdown(socket.SHUT_RDWR) + except socket.error: # pragma: no cover - normally unreachable + pass ## # Stops the server. def stop(self): + self.stop_communication() self.close() # better remains a method (not a property) since used as a callable for wait_for