- Improved server communication. Start a new thread for each incoming request. Allow concurrent accesses to the server. However, the server is not really thread-safe yet...

git-svn-id: https://fail2ban.svn.sourceforge.net/svnroot/fail2ban/trunk@337 a942ae1a-1317-0410-a47c-b1dcaea8d605
0.x
Cyril Jaquier 2006-09-07 22:24:17 +00:00
parent 46dee1bd9a
commit c7747d1f17
2 changed files with 28 additions and 17 deletions

View File

@ -181,6 +181,11 @@ class Fail2banClient:
else: else:
logSys.error("Could not find server") logSys.error("Could not find server")
return False return False
elif len(cmd) == 1 and cmd[0] == "stop":
# Workaround for the shutdown
ret = self.processCmd([cmd])
self.ping()
return ret
else: else:
return self.processCmd([cmd]) return self.processCmd([cmd])

View File

@ -54,31 +54,22 @@ class SSocket(Thread):
#self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ssock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.ssock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
#self.ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #self.ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.ssock.setblocking(False) #self.ssock.setblocking(False)
self.ssock.setblocking(True)
# Bind the socket to a public host and a well-known port # Bind the socket to a public host and a well-known port
#self.ssock.bind(("localhost", 2222)) #self.ssock.bind(("localhost", 2222))
self.ssock.bind(SSocket.SOCKET_FILE) self.ssock.bind(SSocket.SOCKET_FILE)
# Become a server socket # Become a server socket
self.ssock.listen(1) self.ssock.listen(5)
def run(self): def run(self):
self.isRunning = True self.isRunning = True
stime = 1.0
while self.isRunning: while self.isRunning:
try: # TODO Fix shutdown. A new request is required because accept()
# Accept connections from outside # is blocking.
(csock, address) = self.ssock.accept() (csock, address) = self.ssock.accept()
stime /= 10 thread = SocketWorker(csock, self.transmit)
logSys.debug("Connection accepted") thread.start()
msg = self.receive(csock)
msg = self.transmit.proceed(msg)
self.send(csock, msg)
csock.close()
except Exception:
time.sleep(stime)
stime += 0.05
if stime > 1.0:
stime = 1.0
self.ssock.close() self.ssock.close()
# Remove socket # Remove socket
if os.path.exists(SSocket.SOCKET_FILE): if os.path.exists(SSocket.SOCKET_FILE):
@ -96,6 +87,21 @@ class SSocket(Thread):
def stop(self): def stop(self):
self.isRunning = False self.isRunning = False
class SocketWorker(Thread):
def __init__(self, csock, transmitter):
Thread.__init__(self)
self.csock = csock
self.transmit = transmitter
def run(self):
logSys.debug("Starting new thread to handle the request")
msg = self.receive(self.csock)
msg = self.transmit.proceed(msg)
self.send(self.csock, msg)
self.csock.close()
def send(self, socket, msg): def send(self, socket, msg):
obj = pickle.dumps(msg) obj = pickle.dumps(msg)
socket.send(obj + SSocket.END_STRING) socket.send(obj + SSocket.END_STRING)