shutdown sockets before close, avoid socket leakage by use of the explicit socket close in async_chat;

better error handling with error counting, differentiate special case ([Errno 24] Too many open files), with resulting stop of the server
(avoid flood the log file, closes gh-991 and similar issues);
restored auto-garbage, because of non-reference-counting python's (like pypy), otherwise it may leak there on objects like unix-socket, etc.
pull/2061/head
sebres 2018-03-02 17:08:23 +01:00
parent fa520f36c3
commit 5f021aa648
3 changed files with 64 additions and 16 deletions

View File

@ -61,6 +61,7 @@ class CSocket:
return return
if sendEnd: if sendEnd:
self.__csock.sendall(CSPROTO.CLOSE + CSPROTO.END) self.__csock.sendall(CSPROTO.CLOSE + CSPROTO.END)
self.__csock.shutdown(socket.SHUT_RDWR)
self.__csock.close() self.__csock.close()
self.__csock = None self.__csock = None

View File

@ -393,7 +393,9 @@ class BgService(object):
self.__count = self.__threshold; self.__count = self.__threshold;
if hasattr(gc, 'set_threshold'): if hasattr(gc, 'set_threshold'):
gc.set_threshold(0) gc.set_threshold(0)
gc.disable() # don't disable auto garbage, because of non-reference-counting python's (like pypy),
# otherwise it may leak there on objects like unix-socket, etc.
#gc.disable()
def service(self, force=False, wait=False): def service(self, force=False, wait=False):
self.__count -= 1 self.__count -= 1

View File

@ -42,21 +42,36 @@ from ..helpers import logging, getLogger, formatExceptionInfo
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
## ##
# Request handler class. # Request handler class.
# #
# This class extends asynchat in order to provide a request handler for # This class extends asynchat in order to provide a request handler for
# incoming query. # incoming query.
class RequestHandler(asynchat.async_chat): class RequestHandler(asynchat.async_chat):
def __init__(self, conn, transmitter): def __init__(self, conn, transmitter):
asynchat.async_chat.__init__(self, conn) asynchat.async_chat.__init__(self, conn)
self.__conn = conn
self.__transmitter = transmitter self.__transmitter = transmitter
self.__buffer = [] self.__buffer = []
# Sets the terminator. # Sets the terminator.
self.set_terminator(CSPROTO.END) self.set_terminator(CSPROTO.END)
def __close(self):
if self.__conn:
conn = self.__conn
self.__conn = None
try:
conn.shutdown(socket.SHUT_RDWR)
conn.close()
except socket.error: # pragma: no cover - normally unreachable
pass
def handle_close(self):
self.__close()
asynchat.async_chat.handle_close(self)
def collect_incoming_data(self, data): def collect_incoming_data(self, data):
#logSys.debug("Received raw data: " + str(data)) #logSys.debug("Received raw data: " + str(data))
self.__buffer.append(data) self.__buffer.append(data)
@ -111,14 +126,15 @@ class RequestHandler(asynchat.async_chat):
self.close_when_done() self.close_when_done()
def loop(active, timeout=None, use_poll=False): def loop(active, timeout=None, use_poll=False, err_count=None):
"""Custom event loop implementation """Custom event loop implementation
Uses poll instead of loop to respect `active` flag, Uses poll instead of loop to respect `active` flag,
to avoid loop timeout mistake: different in poll and poll2 (sec vs ms), to avoid loop timeout mistake: different in poll and poll2 (sec vs ms),
and to prevent sporadic errors like EBADF 'Bad file descriptor' etc. (see gh-161) and to prevent sporadic errors like EBADF 'Bad file descriptor' etc. (see gh-161)
""" """
errCount = 0 if not err_count: err_count={}
err_count['listen'] = 0
if timeout is None: if timeout is None:
timeout = Utils.DEFAULT_SLEEP_TIME timeout = Utils.DEFAULT_SLEEP_TIME
poll = asyncore.poll poll = asyncore.poll
@ -133,22 +149,29 @@ def loop(active, timeout=None, use_poll=False):
while active(): while active():
try: try:
poll(timeout) poll(timeout)
if errCount: if err_count['listen']:
errCount -= 1 err_count['listen'] -= 1
except Exception as e: except Exception as e:
if not active(): if not active():
break break
errCount += 1 err_count['listen'] += 1
if errCount < 20: if err_count['listen'] < 20:
# errno.ENOTCONN - 'Socket is not connected' # errno.ENOTCONN - 'Socket is not connected'
# errno.EBADF - 'Bad file descriptor' # errno.EBADF - 'Bad file descriptor'
if e.args[0] in (errno.ENOTCONN, errno.EBADF): # pragma: no cover (too sporadic) if e.args[0] in (errno.ENOTCONN, errno.EBADF): # pragma: no cover (too sporadic)
logSys.info('Server connection was closed: %s', str(e)) logSys.info('Server connection was closed: %s', str(e))
else: else:
logSys.error('Server connection was closed: %s', str(e)) logSys.error('Server connection was closed: %s', str(e))
elif errCount == 20: elif err_count['listen'] == 20:
logSys.exception(e) logSys.exception(e)
logSys.error('Too many errors - stop logging connection errors') logSys.error('Too many errors - stop logging connection errors')
elif err_count['listen'] > 100: # pragma: no cover - normally unreachable
if (
e.args[0] == errno.EMFILE # [Errno 24] Too many open files
or sum(err_count.itervalues()) > 1000
):
logSys.critical("Too many errors - critical count reached %r", err_count)
break
## ##
@ -165,6 +188,7 @@ class AsyncServer(asyncore.dispatcher):
self.__sock = "/var/run/fail2ban/fail2ban.sock" self.__sock = "/var/run/fail2ban/fail2ban.sock"
self.__init = False self.__init = False
self.__active = False self.__active = False
self.__errCount = {'accept': 0, 'listen': 0}
self.onstart = None self.onstart = None
## ##
@ -176,12 +200,25 @@ class AsyncServer(asyncore.dispatcher):
def handle_accept(self): def handle_accept(self):
try: try:
conn, addr = self.accept() conn, addr = self.accept()
except socket.error: # pragma: no cover except socket.error as e: # pragma: no cover
logSys.warning("Socket error") self.__errCount['accept'] += 1
if self.__errCount['accept'] < 20:
logSys.warning("Socket error: %s", e)
elif self.__errCount['accept'] == 20:
logSys.error("Too many acceptor errors - stop logging errors")
elif self.__errCount['accept'] > 100:
if (
e.args[0] == errno.EMFILE # [Errno 24] Too many open files
or sum(self.__errCount.itervalues()) > 1000
):
logSys.critical("Too many errors - critical count reached %r", err_count)
self.stop()
return return
except TypeError: # pragma: no cover except TypeError as e: # pragma: no cover
logSys.warning("Type error") logSys.warning("Type error: %s", e)
return return
if self.__errCount['accept']:
self.__errCount['accept'] -= 1;
AsyncServer.__markCloseOnExec(conn) AsyncServer.__markCloseOnExec(conn)
# Creates an instance of the handler class to handle the # Creates an instance of the handler class to handle the
# request/response on the incoming connection. # request/response on the incoming connection.
@ -219,7 +256,7 @@ class AsyncServer(asyncore.dispatcher):
if self.onstart: if self.onstart:
self.onstart() self.onstart()
# Event loop as long as active: # Event loop as long as active:
loop(lambda: self.__loop, timeout=timeout, use_poll=use_poll) loop(lambda: self.__loop, timeout=timeout, use_poll=use_poll, err_count=self.__errCount)
self.__active = False self.__active = False
# Cleanup all # Cleanup all
self.stop() self.stop()
@ -246,13 +283,21 @@ class AsyncServer(asyncore.dispatcher):
# Stops the communication server. # Stops the communication server.
def stop_communication(self): def stop_communication(self):
logSys.debug("Stop communication") if self.__transmitter:
self.__transmitter = None logSys.debug("Stop communication")
self.__transmitter = None
# shutdown socket here:
if self.socket:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error: # pragma: no cover - normally unreachable
pass
## ##
# Stops the server. # Stops the server.
def stop(self): def stop(self):
self.stop_communication()
self.close() self.close()
# better remains a method (not a property) since used as a callable for wait_for # better remains a method (not a property) since used as a callable for wait_for