diff --git a/fail2ban/server/observer.py b/fail2ban/server/observer.py index bd7cbe4a..c19549ba 100644 --- a/fail2ban/server/observer.py +++ b/fail2ban/server/observer.py @@ -146,9 +146,11 @@ class ObserverThread(JailThread): def pulse_notify(self): """Notify wakeup (sets /and resets/ notify event) """ - if not self._paused and self._notify: - self._notify.set() - #self._notify.clear() + if not self._paused: + n = self._notify + if n: + n.set() + #n.clear() def add(self, *event): """Add a event to queue and notify thread to wake up. @@ -237,6 +239,7 @@ class ObserverThread(JailThread): break ## end of main loop - exit logSys.info("Observer stopped, %s events remaining.", len(self._queue)) + self._notify = None #print("Observer stopped, %s events remaining." % len(self._queue)) except Exception as e: logSys.error('Observer stopped after error: %s', e, exc_info=True) @@ -262,9 +265,8 @@ class ObserverThread(JailThread): if not self.active: super(ObserverThread, self).start() - def stop(self): + def stop(self, wtime=5, forceQuit=True): if self.active and self._notify: - wtime = 5 logSys.info("Observer stop ... try to end queue %s seconds", wtime) #print("Observer stop ....") # just add shutdown job to make possible wait later until full (events remaining) @@ -276,10 +278,15 @@ class ObserverThread(JailThread): #self.pulse_notify() self._notify = None # wait max wtime seconds until full (events remaining) - self.wait_empty(wtime) - n.clear() - self.active = False - self.wait_idle(0.5) + if self.wait_empty(wtime) or forceQuit: + n.clear() + self.active = False; # leave outer (active) loop + self._paused = True; # leave inner (queue) loop + self.__db = None + else: + self._notify = n + return self.wait_idle(min(wtime, 0.5)) and not self.is_full + return True @property def is_full(self): diff --git a/fail2ban/server/server.py b/fail2ban/server/server.py index 15265822..feb3b399 100644 --- a/fail2ban/server/server.py +++ b/fail2ban/server/server.py @@ -193,23 +193,26 @@ class Server: signal.signal(s, sh) # Give observer a small chance to complete its work before exit - if Observers.Main is not None: - Observers.Main.stop() + obsMain = Observers.Main + if obsMain is not None: + if obsMain.stop(forceQuit=False): + obsMain = None + Observers.Main = None # Now stop all the jails self.stopAllJail() + # Stop observer ultimately + if obsMain is not None: + obsMain.stop() + # Explicit close database (server can leave in a thread, # so delayed GC can prevent commiting changes) if self.__db: self.__db.close() self.__db = None - # Stop observer and exit - if Observers.Main is not None: - Observers.Main.stop() - Observers.Main = None - # Stop async + # Stop async and exit if self.__asyncServer is not None: self.__asyncServer.stop() self.__asyncServer = None diff --git a/fail2ban/tests/fail2banclienttestcase.py b/fail2ban/tests/fail2banclienttestcase.py index 5caa4dd9..95f73ed3 100644 --- a/fail2ban/tests/fail2banclienttestcase.py +++ b/fail2ban/tests/fail2banclienttestcase.py @@ -343,6 +343,7 @@ def with_foreground_server_thread(startextra={}): # to wait for end of server, default accept any exit code, because multi-threaded, # thus server can exit in-between... def _stopAndWaitForServerEnd(code=(SUCCESS, FAILED)): + tearDownMyTime() # if seems to be down - try to catch end phase (wait a bit for end:True to recognize down state): if not phase.get('end', None) and not os.path.exists(pjoin(tmp, "f2b.pid")): Utils.wait_for(lambda: phase.get('end', None) is not None, MID_WAITTIME) @@ -1570,6 +1571,37 @@ class Fail2banServerTest(Fail2banClientServerBase): self.assertLogged( "192.0.2.11", "+ 600 =", all=True, wait=MID_WAITTIME) + # test stop with busy observer: + self.pruneLog("[test-phase end) stop on busy observer]") + tearDownMyTime() + a = {'state': 0} + obsMain = Observers.Main + def _long_action(): + logSys.info('++ observer enters busy state ...') + a['state'] = 1 + Utils.wait_for(lambda: a['state'] == 2, MAX_WAITTIME) + obsMain.db_purge(); # does nothing (db is already None) + logSys.info('-- observer leaves busy state.') + obsMain.add('call', _long_action) + obsMain.add('call', lambda: None) + # wait observer enter busy state: + Utils.wait_for(lambda: a['state'] == 1, MAX_WAITTIME) + # overwrite default wait time (normally 5 seconds): + obsMain_stop = obsMain.stop + def _stop(wtime=(0.01 if unittest.F2B.fast else 0.1), forceQuit=True): + return obsMain_stop(wtime, forceQuit) + obsMain.stop = _stop + # stop server and wait for end: + self.stopAndWaitForServerEnd(SUCCESS) + # check observer and db state: + self.assertNotLogged('observer leaves busy state') + self.assertFalse(obsMain.idle) + self.assertEqual(obsMain._ObserverThread__db, None) + # server is exited without wait for observer, stop it now: + a['state'] = 2 + self.assertLogged('observer leaves busy state', wait=True) + obsMain.join() + # test multiple start/stop of the server (threaded in foreground) -- if False: # pragma: no cover @with_foreground_server_thread()