From 5bd32566c30f92bdfae4ce85cbd02b23b5d9e9d6 Mon Sep 17 00:00:00 2001 From: Cyril Jaquier Date: Fri, 14 Dec 2007 21:19:00 +0000 Subject: [PATCH] - Moved socket to /var/run/fail2ban. - Rewrote the communication server. git-svn-id: https://fail2ban.svn.sourceforge.net/svnroot/fail2ban/branches/FAIL2BAN-0_8@628 a942ae1a-1317-0410-a47c-b1dcaea8d605 --- CHANGELOG | 2 + MANIFEST | 2 +- fail2ban-server | 2 +- server/asyncserver.py | 153 ++++++++++++++++++++++++++++++++++++++++++ server/server.py | 17 ++--- server/ssocket.py | 136 ------------------------------------- 6 files changed, 163 insertions(+), 149 deletions(-) create mode 100644 server/asyncserver.py delete mode 100644 server/ssocket.py diff --git a/CHANGELOG b/CHANGELOG index 603777f6..c3e852bd 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -22,6 +22,8 @@ ver. 0.8.2 (2007/??/??) - stable to Iain Lea - Fixed "ignore IPs". Only the first value was taken into account. Thanks to Adrien Clerc +- Moved socket to /var/run/fail2ban. +- Rewrote the communication server. ver. 0.8.1 (2007/08/14) - stable ---------- diff --git a/MANIFEST b/MANIFEST index df9970c2..9c5a9ecd 100644 --- a/MANIFEST +++ b/MANIFEST @@ -17,7 +17,7 @@ client/actionreader.py client/__init__.py client/configurator.py client/csocket.py -server/ssocket.py +server/asyncserver.py server/banticket.py server/filter.py server/filtergamin.py diff --git a/fail2ban-server b/fail2ban-server index ca449798..ce84c69d 100755 --- a/fail2ban-server +++ b/fail2ban-server @@ -50,7 +50,7 @@ class Fail2banServer: self.__conf = dict() self.__conf["background"] = True self.__conf["force"] = False - self.__conf["socket"] = "/tmp/fail2ban.sock" + self.__conf["socket"] = "/var/run/fail2ban/fail2ban.sock" def dispVersion(self): print "Fail2Ban v" + version diff --git a/server/asyncserver.py b/server/asyncserver.py new file mode 100644 index 00000000..f5c129d8 --- /dev/null +++ b/server/asyncserver.py @@ -0,0 +1,153 @@ +# This file is part of Fail2Ban. +# +# Fail2Ban is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# Fail2Ban is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Fail2Ban; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +# Author: Cyril Jaquier +# +# $Revision: 567 $ + +__author__ = "Cyril Jaquier" +__version__ = "$Revision: 567 $" +__date__ = "$Date: 2007-03-26 23:17:31 +0200 (Mon, 26 Mar 2007) $" +__copyright__ = "Copyright (c) 2004 Cyril Jaquier" +__license__ = "GPL" + +from pickle import dumps, loads, HIGHEST_PROTOCOL +import asyncore, asynchat, socket, os, logging, sys + +# Gets the instance of the logger. +logSys = logging.getLogger("fail2ban.server") + +## +# Request handler class. +# +# This class extends asynchat in order to provide a request handler for +# incoming query. + +class RequestHandler(asynchat.async_chat): + + END_STRING = "" + + def __init__(self, conn, transmitter): + asynchat.async_chat.__init__(self, conn) + self.__transmitter = transmitter + self.__buffer = [] + # Sets the terminator. + self.set_terminator(RequestHandler.END_STRING) + self.found_terminator = self.handle_request_line + + def collect_incoming_data(self, data): + logSys.debug("Received raw data: " + str(data)) + self.__buffer.append(data) + + ## + # Handles a new request. + # + # This method is called once we have a complete request. + + def handle_request_line(self): + # Joins the buffer items. + message = loads("".join(self.__buffer)) + # Gives the message to the transmitter. + message = self.__transmitter.proceed(message) + # Serializes the response. + message = dumps(message, HIGHEST_PROTOCOL) + # Sends the response to the client. + self.send(message + RequestHandler.END_STRING) + # Closes the channel. + self.close_when_done() + + def handle_error(self): + logSys.error("Unexpected communication error") + self.close() + +## +# Asynchronous server class. +# +# This class extends asyncore and dispatches connection requests to +# RequestHandler. + +class AsyncServer(asyncore.dispatcher): + + def __init__(self, transmitter): + asyncore.dispatcher.__init__(self) + self.__transmitter = transmitter + self.__sock = "/var/run/fail2ban/fail2ban.sock" + self.__init = False + + ## + # Returns False as we only read the socket first. + + def writable(self): + return False + + def handle_accept(self): + try: + conn, addr = self.accept() + except socket.error: + logSys.warning("Socket error") + return + except TypeError: + logSys.warning("Type error") + return + # Creates an instance of the handler class to handle the + # request/response on the incoming connection. + RequestHandler(conn, self.__transmitter) + + ## + # Starts the communication server. + # + # @param sock: socket file. + # @param force: remove the socket file if exists. + + def start(self, sock, force): + self.__sock = sock + # Remove socket + if os.path.exists(sock): + logSys.error("Fail2ban seems to be already running") + if force: + logSys.warn("Forcing execution of the server") + os.remove(sock) + else: + raise AsyncServerException("Server already running") + # Creates the socket. + self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind(sock) + self.listen(1) + # Sets the init flag. + self.__init = True + # TODO Add try..catch + asyncore.loop(timeout = 2) + + ## + # Stops the communication server. + + def stop(self): + if self.__init: + # Only closes the socket if it was initialized first. + self.close() + # Remove socket + if os.path.exists(self.__sock): + logSys.debug("Removed socket file " + self.__sock) + os.remove(self.__sock) + logSys.debug("Socket shutdown") + + +## +# AsyncServerException is used to wrap communication exceptions. + +class AsyncServerException(Exception): + pass diff --git a/server/server.py b/server/server.py index 108fac81..bcd56f66 100644 --- a/server/server.py +++ b/server/server.py @@ -27,8 +27,8 @@ __license__ = "GPL" from threading import Lock, RLock from jails import Jails from transmitter import Transmitter -from ssocket import SSocket -from ssocket import SSocketErrorException +from asyncserver import AsyncServer +from asyncserver import AsyncServerException import logging, logging.handlers, sys, os, signal # Gets the instance of the logger. @@ -42,7 +42,7 @@ class Server: self.__jails = Jails() self.__daemon = daemon self.__transm = Transmitter(self) - self.__socket = SSocket(self.__transm) + self.__asyncServer = AsyncServer(self.__transm) self.__logLevel = 3 self.__logTarget = "STDOUT" # Set logging level @@ -72,20 +72,15 @@ class Server: # Start the communication logSys.debug("Starting communication") try: - self.__socket.initialize(sock, force) - self.__socket.start() - # Workaround (???) for join() bug. - # https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1167930&group_id=5470 - while self.__socket.isAlive(): - self.__socket.join(1) - except SSocketErrorException: + self.__asyncServer.start(sock, force) + except AsyncServerException: logSys.error("Could not start server") logSys.info("Exiting Fail2ban") def quit(self): self.stopAllJail() # Stop communication - self.__socket.stop() + self.__asyncServer.stop() def addJail(self, name, backend): self.__jails.add(name, backend) diff --git a/server/ssocket.py b/server/ssocket.py deleted file mode 100644 index f56f7199..00000000 --- a/server/ssocket.py +++ /dev/null @@ -1,136 +0,0 @@ -# This file is part of Fail2Ban. -# -# Fail2Ban is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# Fail2Ban is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Fail2Ban; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -# Author: Cyril Jaquier -# -# $Revision$ - -__author__ = "Cyril Jaquier" -__version__ = "$Revision$" -__date__ = "$Date$" -__copyright__ = "Copyright (c) 2004 Cyril Jaquier" -__license__ = "GPL" - -from threading import Thread -# cPickle generates an exception with Python 2.5 -#from cPickle import dumps, loads, HIGHEST_PROTOCOL -from pickle import dumps, loads, HIGHEST_PROTOCOL -import socket, logging, os, os.path - -# Gets the instance of the logger. -logSys = logging.getLogger("fail2ban.comm") - -class SSocket(Thread): - - END_STRING = "" - - def __init__(self, transmitter): - Thread.__init__(self) - self.__transmit = transmitter - self.__isRunning = False - self.__socket = "/tmp/fail2ban.sock" - self.__ssock = None - logSys.debug("Created SSocket") - - def initialize(self, sock = "/tmp/fail2ban.sock", force = False): - self.__socket = sock - # Remove socket - if os.path.exists(sock): - logSys.error("Fail2ban seems to be already running") - if force: - logSys.warn("Forcing execution of the server") - os.remove(sock) - else: - raise SSocketErrorException("Server already running") - # Create an INET, STREAMing socket - #self.__ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.__ssock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - #self.__ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - #self.__ssock.setblocking(False) - # Do not use a blocking socket as there is problem at shutdown. - # Use a timeout instead. Daemon exits at most 'timeout' seconds - # after the command. - self.__ssock.settimeout(1) - # Bind the socket to a public host and a well-known port - #self.__ssock.bind(("localhost", 2222)) - self.__ssock.bind(sock) - # Become a server socket - self.__ssock.listen(1) - - def run(self): - self.__isRunning = True - while self.__isRunning: - try: - (csock, address) = self.__ssock.accept() - thread = SocketWorker(csock, self.__transmit) - thread.start() - except socket.timeout: - # Do nothing here - pass - except socket.error: - # Do nothing here - pass - self.__ssock.close() - # Remove socket - if os.path.exists(self.__socket): - logSys.debug("Removed socket file " + self.__socket) - os.remove(self.__socket) - logSys.debug("Socket shutdown") - return True - - ## - # Stop the thread. - # - # Set the isRunning flag to False. - # @bug It seems to be some concurrency problem with this flag - - def stop(self): - self.__isRunning = False - - -class SocketWorker(Thread): - - def __init__(self, csock, transmitter): - Thread.__init__(self) - self.__csock = csock - self.__transmit = transmitter - - def run(self): - logSys.debug("Starting new thread to handle the request") - msg = self.__receive(self.__csock) - msg = self.__transmit.proceed(msg) - self.__send(self.__csock, msg) - self.__csock.close() - logSys.debug("Connection closed") - - @staticmethod - def __send(sock, msg): - obj = dumps(msg, HIGHEST_PROTOCOL) - sock.send(obj + SSocket.END_STRING) - - @staticmethod - def __receive(sock): - msg = '' - while msg.rfind(SSocket.END_STRING) == -1: - chunk = sock.recv(128) - if chunk == '': - raise RuntimeError, "socket connection broken" - msg = msg + chunk - return loads(msg) - - -class SSocketErrorException(Exception): - pass