mirror of https://github.com/fail2ban/fail2ban
prevent to early exit from main loop (tast case bug by multi-threaded execution / wait for completion);
idle state fixed (if observer really sleeps only);pull/716/head
parent
00fdf5ce0a
commit
a82cc3bcbf
|
@ -182,15 +182,6 @@ class ObserverThread(threading.Thread):
|
||||||
self.add('is_alive')
|
self.add('is_alive')
|
||||||
## if we should stop - break a main loop
|
## if we should stop - break a main loop
|
||||||
while self.active:
|
while self.active:
|
||||||
## going sleep, wait for events (in queue)
|
|
||||||
self.idle = True
|
|
||||||
n = self._notify
|
|
||||||
if n:
|
|
||||||
n.wait(self.sleeptime)
|
|
||||||
## wake up - reset signal now (we don't need it so long as we reed from queue)
|
|
||||||
n.clear()
|
|
||||||
if self._paused:
|
|
||||||
continue
|
|
||||||
self.idle = False
|
self.idle = False
|
||||||
## check events available and execute all events from queue
|
## check events available and execute all events from queue
|
||||||
while not self._paused:
|
while not self._paused:
|
||||||
|
@ -209,6 +200,21 @@ class ObserverThread(threading.Thread):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
#logSys.error('%s', e, exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
|
#logSys.error('%s', e, exc_info=logSys.getEffectiveLevel()<=logging.DEBUG)
|
||||||
logSys.error('%s', e, exc_info=True)
|
logSys.error('%s', e, exc_info=True)
|
||||||
|
## going sleep, wait for events (in queue)
|
||||||
|
n = self._notify
|
||||||
|
if n:
|
||||||
|
self.idle = True
|
||||||
|
n.wait(self.sleeptime)
|
||||||
|
## wake up - reset signal now (we don't need it so long as we reed from queue)
|
||||||
|
n.clear()
|
||||||
|
if self._paused:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
## notify event deleted (shutdown) - just sleep a litle bit (waiting for shutdown events, prevent high cpu usage)
|
||||||
|
time.sleep(0.001)
|
||||||
|
## stop by shutdown and empty queue :
|
||||||
|
if not self.is_full:
|
||||||
|
break
|
||||||
## end of main loop - exit
|
## end of main loop - exit
|
||||||
logSys.info("Observer stopped, %s events remaining.", len(self._queue))
|
logSys.info("Observer stopped, %s events remaining.", len(self._queue))
|
||||||
#print("Observer stopped, %s events remaining." % len(self._queue))
|
#print("Observer stopped, %s events remaining." % len(self._queue))
|
||||||
|
@ -249,10 +255,10 @@ class ObserverThread(threading.Thread):
|
||||||
self._notify.set()
|
self._notify.set()
|
||||||
#self.pulse_notify()
|
#self.pulse_notify()
|
||||||
self._notify = None
|
self._notify = None
|
||||||
self.active = False
|
|
||||||
# wait max wtime seconds until full (events remaining)
|
# wait max wtime seconds until full (events remaining)
|
||||||
self.wait_empty(wtime)
|
self.wait_empty(wtime)
|
||||||
n.clear()
|
n.clear()
|
||||||
|
self.active = False
|
||||||
self.wait_idle(0.5)
|
self.wait_idle(0.5)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -264,8 +270,6 @@ class ObserverThread(threading.Thread):
|
||||||
"""Wait observer is running and returns if observer has no more events (queue is empty)
|
"""Wait observer is running and returns if observer has no more events (queue is empty)
|
||||||
"""
|
"""
|
||||||
time.sleep(0.001)
|
time.sleep(0.001)
|
||||||
if not self.is_full:
|
|
||||||
return not self.is_full
|
|
||||||
if sleeptime is not None:
|
if sleeptime is not None:
|
||||||
e = MyTime.time() + sleeptime
|
e = MyTime.time() + sleeptime
|
||||||
# block queue with not operation to be sure all really jobs are executed if nop goes from queue :
|
# block queue with not operation to be sure all really jobs are executed if nop goes from queue :
|
||||||
|
|
Loading…
Reference in New Issue