mirror of https://github.com/fail2ban/fail2ban
improved wait for observer stop on server quit (second stop would force quit), this also cause reset db in observer (to avoid out of sequence errors) before database gets ultimately closed at end of server stop process (gh-2608)
parent
d2d3762ba9
commit
3befbb1770
|
@ -146,9 +146,11 @@ class ObserverThread(JailThread):
|
|||
def pulse_notify(self):
|
||||
"""Notify wakeup (sets /and resets/ notify event)
|
||||
"""
|
||||
if not self._paused and self._notify:
|
||||
self._notify.set()
|
||||
#self._notify.clear()
|
||||
if not self._paused:
|
||||
n = self._notify
|
||||
if n:
|
||||
n.set()
|
||||
#n.clear()
|
||||
|
||||
def add(self, *event):
|
||||
"""Add a event to queue and notify thread to wake up.
|
||||
|
@ -237,6 +239,7 @@ class ObserverThread(JailThread):
|
|||
break
|
||||
## end of main loop - exit
|
||||
logSys.info("Observer stopped, %s events remaining.", len(self._queue))
|
||||
self._notify = None
|
||||
#print("Observer stopped, %s events remaining." % len(self._queue))
|
||||
except Exception as e:
|
||||
logSys.error('Observer stopped after error: %s', e, exc_info=True)
|
||||
|
@ -262,9 +265,8 @@ class ObserverThread(JailThread):
|
|||
if not self.active:
|
||||
super(ObserverThread, self).start()
|
||||
|
||||
def stop(self):
|
||||
def stop(self, wtime=5, forceQuit=True):
|
||||
if self.active and self._notify:
|
||||
wtime = 5
|
||||
logSys.info("Observer stop ... try to end queue %s seconds", wtime)
|
||||
#print("Observer stop ....")
|
||||
# just add shutdown job to make possible wait later until full (events remaining)
|
||||
|
@ -276,10 +278,15 @@ class ObserverThread(JailThread):
|
|||
#self.pulse_notify()
|
||||
self._notify = None
|
||||
# wait max wtime seconds until full (events remaining)
|
||||
self.wait_empty(wtime)
|
||||
n.clear()
|
||||
self.active = False
|
||||
self.wait_idle(0.5)
|
||||
if self.wait_empty(wtime) or forceQuit:
|
||||
n.clear()
|
||||
self.active = False; # leave outer (active) loop
|
||||
self._paused = True; # leave inner (queue) loop
|
||||
self.__db = None
|
||||
else:
|
||||
self._notify = n
|
||||
return self.wait_idle(min(wtime, 0.5)) and not self.is_full
|
||||
return True
|
||||
|
||||
@property
|
||||
def is_full(self):
|
||||
|
|
|
@ -193,23 +193,26 @@ class Server:
|
|||
signal.signal(s, sh)
|
||||
|
||||
# Give observer a small chance to complete its work before exit
|
||||
if Observers.Main is not None:
|
||||
Observers.Main.stop()
|
||||
obsMain = Observers.Main
|
||||
if obsMain is not None:
|
||||
if obsMain.stop(forceQuit=False):
|
||||
obsMain = None
|
||||
Observers.Main = None
|
||||
|
||||
# Now stop all the jails
|
||||
self.stopAllJail()
|
||||
|
||||
# Stop observer ultimately
|
||||
if obsMain is not None:
|
||||
obsMain.stop()
|
||||
|
||||
# Explicit close database (server can leave in a thread,
|
||||
# so delayed GC can prevent commiting changes)
|
||||
if self.__db:
|
||||
self.__db.close()
|
||||
self.__db = None
|
||||
|
||||
# Stop observer and exit
|
||||
if Observers.Main is not None:
|
||||
Observers.Main.stop()
|
||||
Observers.Main = None
|
||||
# Stop async
|
||||
# Stop async and exit
|
||||
if self.__asyncServer is not None:
|
||||
self.__asyncServer.stop()
|
||||
self.__asyncServer = None
|
||||
|
|
|
@ -343,6 +343,7 @@ def with_foreground_server_thread(startextra={}):
|
|||
# to wait for end of server, default accept any exit code, because multi-threaded,
|
||||
# thus server can exit in-between...
|
||||
def _stopAndWaitForServerEnd(code=(SUCCESS, FAILED)):
|
||||
tearDownMyTime()
|
||||
# if seems to be down - try to catch end phase (wait a bit for end:True to recognize down state):
|
||||
if not phase.get('end', None) and not os.path.exists(pjoin(tmp, "f2b.pid")):
|
||||
Utils.wait_for(lambda: phase.get('end', None) is not None, MID_WAITTIME)
|
||||
|
@ -1570,6 +1571,37 @@ class Fail2banServerTest(Fail2banClientServerBase):
|
|||
self.assertLogged(
|
||||
"192.0.2.11", "+ 600 =", all=True, wait=MID_WAITTIME)
|
||||
|
||||
# test stop with busy observer:
|
||||
self.pruneLog("[test-phase end) stop on busy observer]")
|
||||
tearDownMyTime()
|
||||
a = {'state': 0}
|
||||
obsMain = Observers.Main
|
||||
def _long_action():
|
||||
logSys.info('++ observer enters busy state ...')
|
||||
a['state'] = 1
|
||||
Utils.wait_for(lambda: a['state'] == 2, MAX_WAITTIME)
|
||||
obsMain.db_purge(); # does nothing (db is already None)
|
||||
logSys.info('-- observer leaves busy state.')
|
||||
obsMain.add('call', _long_action)
|
||||
obsMain.add('call', lambda: None)
|
||||
# wait observer enter busy state:
|
||||
Utils.wait_for(lambda: a['state'] == 1, MAX_WAITTIME)
|
||||
# overwrite default wait time (normally 5 seconds):
|
||||
obsMain_stop = obsMain.stop
|
||||
def _stop(wtime=(0.01 if unittest.F2B.fast else 0.1), forceQuit=True):
|
||||
return obsMain_stop(wtime, forceQuit)
|
||||
obsMain.stop = _stop
|
||||
# stop server and wait for end:
|
||||
self.stopAndWaitForServerEnd(SUCCESS)
|
||||
# check observer and db state:
|
||||
self.assertNotLogged('observer leaves busy state')
|
||||
self.assertFalse(obsMain.idle)
|
||||
self.assertEqual(obsMain._ObserverThread__db, None)
|
||||
# server is exited without wait for observer, stop it now:
|
||||
a['state'] = 2
|
||||
self.assertLogged('observer leaves busy state', wait=True)
|
||||
obsMain.join()
|
||||
|
||||
# test multiple start/stop of the server (threaded in foreground) --
|
||||
if False: # pragma: no cover
|
||||
@with_foreground_server_thread()
|
||||
|
|
Loading…
Reference in New Issue