performance fix: minimizes connection overhead, using same socket by multiple commands without close it (ex.: 'start' sends several hundreds commands at once)

pull/1099/head
sebres 2015-07-06 12:23:53 +02:00
parent 38f8e1a82a
commit 81e659b760
3 changed files with 66 additions and 51 deletions

View File

@ -153,30 +153,36 @@ class Fail2banClient:
return self.__processCmd([["ping"]], False) return self.__processCmd([["ping"]], False)
def __processCmd(self, cmd, showRet = True): def __processCmd(self, cmd, showRet = True):
beautifier = Beautifier() client = None
streamRet = True try:
for c in cmd: beautifier = Beautifier()
beautifier.setInputCmd(c) streamRet = True
try: for c in cmd:
client = CSocket(self.__conf["socket"]) beautifier.setInputCmd(c)
ret = client.send(c) try:
if ret[0] == 0: if not client:
logSys.debug("OK : " + `ret[1]`) client = CSocket(self.__conf["socket"])
ret = client.send(c)
if ret[0] == 0:
logSys.debug("OK : " + `ret[1]`)
if showRet:
print beautifier.beautify(ret[1])
else:
logSys.error("NOK: " + `ret[1].args`)
if showRet:
print beautifier.beautifyError(ret[1])
streamRet = False
except socket.error:
if showRet: if showRet:
print beautifier.beautify(ret[1]) self.__logSocketError()
else: return False
logSys.error("NOK: " + `ret[1].args`) except Exception, e:
if showRet: if showRet:
print beautifier.beautifyError(ret[1]) logSys.error(e)
streamRet = False return False
except socket.error: finally:
if showRet: if client:
self.__logSocketError() client.close()
return False
except Exception, e:
if showRet:
logSys.error(e)
return False
return streamRet return streamRet
def __logSocketError(self): def __logSocketError(self):

View File

@ -29,39 +29,45 @@ from pickle import dumps, loads, HIGHEST_PROTOCOL
import socket import socket
import sys import sys
if sys.version_info >= (3,):
# b"" causes SyntaxError in python <= 2.5, so below implements equivalent
EMPTY_BYTES = bytes("", encoding="ascii")
else:
# python 2.x, string type is equivalent to bytes.
EMPTY_BYTES = ""
class CSocket: class CSocket:
EMPTY_BYTES = ""
END_STRING = "<F2B_END_COMMAND>"
CLOSE_STRING = "<F2B_CLOSE_COMMAND>"
# python 2.x, string type is equivalent to bytes.
if sys.version_info >= (3,): if sys.version_info >= (3,):
END_STRING = bytes("<F2B_END_COMMAND>", encoding='ascii') # b"" causes SyntaxError in python <= 2.5, so below implements equivalent
else: EMPTY_BYTES = bytes(EMPTY_BYTES, encoding="ascii")
END_STRING = "<F2B_END_COMMAND>" END_STRING = bytes(END_STRING, encoding='ascii')
CLOSE_STRING = bytes(CLOSE_STRING, encoding='ascii')
def __init__(self, sock = "/var/run/fail2ban/fail2ban.sock"): def __init__(self, sock="/var/run/fail2ban/fail2ban.sock"):
# Create an INET, STREAMing socket # Create an INET, STREAMing socket
#self.csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #self.csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__csock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.__csock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
#self.csock.connect(("localhost", 2222)) #self.csock.connect(("localhost", 2222))
self.__csock.connect(sock) self.__csock.connect(sock)
def __del__(self):
self.close(False)
def send(self, msg): def send(self, msg):
# Convert every list member to string # Convert every list member to string
obj = dumps([str(m) for m in msg], HIGHEST_PROTOCOL) obj = dumps([str(m) for m in msg], HIGHEST_PROTOCOL)
self.__csock.send(obj + CSocket.END_STRING) self.__csock.send(obj + CSocket.END_STRING)
ret = self.receive(self.__csock) return self.receive(self.__csock)
def close(self, sendEnd=True):
if not self.__csock:
return
if sendEnd:
self.__csock.sendall(CSocket.CLOSE_STRING + CSocket.END_STRING)
self.__csock.close() self.__csock.close()
return ret self.__csock = None
@staticmethod @staticmethod
def receive(sock): def receive(sock):
msg = EMPTY_BYTES msg = CSocket.EMPTY_BYTES
while msg.rfind(CSocket.END_STRING) == -1: while msg.rfind(CSocket.END_STRING) == -1:
chunk = sock.recv(6) chunk = sock.recv(6)
if chunk == '': if chunk == '':

View File

@ -38,14 +38,6 @@ from ..helpers import getLogger,formatExceptionInfo
# Gets the instance of the logger. # Gets the instance of the logger.
logSys = getLogger(__name__) logSys = getLogger(__name__)
if sys.version_info >= (3,):
# b"" causes SyntaxError in python <= 2.5, so below implements equivalent
EMPTY_BYTES = bytes("", encoding="ascii")
else:
# python 2.x, string type is equivalent to bytes.
EMPTY_BYTES = ""
## ##
# Request handler class. # Request handler class.
# #
@ -54,10 +46,15 @@ else:
class RequestHandler(asynchat.async_chat): class RequestHandler(asynchat.async_chat):
# python 2.x, string type is equivalent to bytes.
EMPTY_BYTES = ""
END_STRING = "<F2B_END_COMMAND>"
CLOSE_STRING = "<F2B_CLOSE_COMMAND>"
if sys.version_info >= (3,): if sys.version_info >= (3,):
END_STRING = bytes("<F2B_END_COMMAND>", encoding="ascii") # b"" causes SyntaxError in python <= 2.5, so below implements equivalent
else: EMPTY_BYTES = bytes(EMPTY_BYTES, encoding="ascii")
END_STRING = "<F2B_END_COMMAND>" END_STRING = bytes(END_STRING, encoding="ascii")
CLOSE_STRING = bytes(CLOSE_STRING, encoding='ascii')
def __init__(self, conn, transmitter): def __init__(self, conn, transmitter):
asynchat.async_chat.__init__(self, conn) asynchat.async_chat.__init__(self, conn)
@ -76,16 +73,22 @@ class RequestHandler(asynchat.async_chat):
# This method is called once we have a complete request. # This method is called once we have a complete request.
def found_terminator(self): def found_terminator(self):
# Pop whole buffer
buf = self.__buffer
self.__buffer = []
# Joins the buffer items. # Joins the buffer items.
message = loads(EMPTY_BYTES.join(self.__buffer)) message = loads(RequestHandler.EMPTY_BYTES.join(buf))
# Close if close received
if message == RequestHandler.CLOSE_STRING:
# Closes the channel.
self.close_when_done()
return
# Gives the message to the transmitter. # Gives the message to the transmitter.
message = self.__transmitter.proceed(message) message = self.__transmitter.proceed(message)
# Serializes the response. # Serializes the response.
message = dumps(message, HIGHEST_PROTOCOL) message = dumps(message, HIGHEST_PROTOCOL)
# Sends the response to the client. # Sends the response to the client.
self.push(message + RequestHandler.END_STRING) self.push(message + RequestHandler.END_STRING)
# Closes the channel.
self.close_when_done()
def handle_error(self): def handle_error(self):
e1, e2 = formatExceptionInfo() e1, e2 = formatExceptionInfo()