mirror of https://github.com/fail2ban/fail2ban
resolves a bottleneck within transmitting of large data between server and client: speedup search of communications end-marker and increase max buffer size (up to 32KB)
parent
5abc4ba4ae
commit
e8ee3ba544
|
@ -48,7 +48,8 @@ class CSocket:
|
||||||
def send(self, msg, nonblocking=False, timeout=None):
|
def send(self, msg, nonblocking=False, timeout=None):
|
||||||
# Convert every list member to string
|
# Convert every list member to string
|
||||||
obj = dumps(map(CSocket.convert, msg), HIGHEST_PROTOCOL)
|
obj = dumps(map(CSocket.convert, msg), HIGHEST_PROTOCOL)
|
||||||
self.__csock.send(obj + CSPROTO.END)
|
self.__csock.send(obj)
|
||||||
|
self.__csock.send(CSPROTO.END)
|
||||||
return self.receive(self.__csock, nonblocking, timeout)
|
return self.receive(self.__csock, nonblocking, timeout)
|
||||||
|
|
||||||
def settimeout(self, timeout):
|
def settimeout(self, timeout):
|
||||||
|
@ -81,9 +82,12 @@ class CSocket:
|
||||||
msg = CSPROTO.EMPTY
|
msg = CSPROTO.EMPTY
|
||||||
if nonblocking: sock.setblocking(0)
|
if nonblocking: sock.setblocking(0)
|
||||||
if timeout: sock.settimeout(timeout)
|
if timeout: sock.settimeout(timeout)
|
||||||
while msg.rfind(CSPROTO.END) == -1:
|
bufsize = 1024
|
||||||
chunk = sock.recv(512)
|
while msg.rfind(CSPROTO.END, -32) == -1:
|
||||||
if chunk in ('', b''): # python 3.x may return b'' instead of ''
|
chunk = sock.recv(bufsize)
|
||||||
raise RuntimeError("socket connection broken")
|
if not len(chunk):
|
||||||
|
raise socket.error(104, 'Connection reset by peer')
|
||||||
|
if chunk == CSPROTO.END: break
|
||||||
msg = msg + chunk
|
msg = msg + chunk
|
||||||
|
if bufsize < 32768: bufsize <<= 1
|
||||||
return loads(msg)
|
return loads(msg)
|
||||||
|
|
|
@ -152,7 +152,7 @@ class Socket(LogCaptureTestCase):
|
||||||
org_handler = RequestHandler.found_terminator
|
org_handler = RequestHandler.found_terminator
|
||||||
try:
|
try:
|
||||||
RequestHandler.found_terminator = lambda self: self.close()
|
RequestHandler.found_terminator = lambda self: self.close()
|
||||||
self.assertRaisesRegexp(RuntimeError, r"socket connection broken",
|
self.assertRaisesRegexp(Exception, r"reset by peer|Broken pipe",
|
||||||
lambda: client.send(testMessage, timeout=unittest.F2B.maxWaitTime(10)))
|
lambda: client.send(testMessage, timeout=unittest.F2B.maxWaitTime(10)))
|
||||||
finally:
|
finally:
|
||||||
RequestHandler.found_terminator = org_handler
|
RequestHandler.found_terminator = org_handler
|
||||||
|
@ -168,7 +168,7 @@ class Socket(LogCaptureTestCase):
|
||||||
org_handler = RequestHandler.found_terminator
|
org_handler = RequestHandler.found_terminator
|
||||||
try:
|
try:
|
||||||
RequestHandler.found_terminator = lambda self: TestMsgError()
|
RequestHandler.found_terminator = lambda self: TestMsgError()
|
||||||
#self.assertRaisesRegexp(RuntimeError, r"socket connection broken", client.send, testMessage)
|
#self.assertRaisesRegexp(Exception, r"reset by peer|Broken pipe", client.send, testMessage)
|
||||||
self.assertEqual(client.send(testMessage), 'ERROR: test unpickle error')
|
self.assertEqual(client.send(testMessage), 'ERROR: test unpickle error')
|
||||||
finally:
|
finally:
|
||||||
RequestHandler.found_terminator = org_handler
|
RequestHandler.found_terminator = org_handler
|
||||||
|
|
Loading…
Reference in New Issue