mirror of https://github.com/fail2ban/fail2ban
				
				
				
			filtersystemd.py: fixes wrong time point of "in operation" mode
todo: need more tests to cover any step of switch to inOperationMode (all branches)pull/3146/head
							parent
							
								
									7678f59827
								
							
						
					
					
						commit
						96661f25ab
					
				| 
						 | 
				
			
			@ -33,7 +33,10 @@ jobs:
 | 
			
		|||
        uses: actions/setup-python@v2
 | 
			
		||||
        with:
 | 
			
		||||
          python-version: ${{ matrix.python-version }}
 | 
			
		||||
      
 | 
			
		||||
 | 
			
		||||
      - name: Grant systemd-journal access
 | 
			
		||||
        run: sudo usermod -a -G systemd-journal "$USER" || echo 'no systemd-journal access'
 | 
			
		||||
 | 
			
		||||
      - name: Python version
 | 
			
		||||
        run: |
 | 
			
		||||
          F2B_PY=$(python -c "import sys; print(sys.version)")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -92,8 +92,8 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
 | 
			
		|||
		try:
 | 
			
		||||
			args['flags'] = int(kwargs.pop('journalflags'))
 | 
			
		||||
		except KeyError:
 | 
			
		||||
			# be sure all journal types will be opened if files specified (don't set flags):
 | 
			
		||||
			if 'files' not in args or not len(args['files']):
 | 
			
		||||
			# be sure all journal types will be opened if files/path specified (don't set flags):
 | 
			
		||||
			if ('files' not in args or not len(args['files'])) and ('path' not in args or not args['path']):
 | 
			
		||||
				args['flags'] = 4
 | 
			
		||||
				
 | 
			
		||||
		try:
 | 
			
		||||
| 
						 | 
				
			
			@ -258,6 +258,10 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
 | 
			
		|||
			date = datetime.datetime.fromtimestamp(date)
 | 
			
		||||
		self.__journal.seek_realtime(date)
 | 
			
		||||
 | 
			
		||||
	def inOperationMode(self):
 | 
			
		||||
		self.inOperation = True
 | 
			
		||||
		logSys.info("[%s] Jail is in operation now (process new journal entries)", self.jailName)
 | 
			
		||||
 | 
			
		||||
	##
 | 
			
		||||
	# Main loop.
 | 
			
		||||
	#
 | 
			
		||||
| 
						 | 
				
			
			@ -268,23 +272,44 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
 | 
			
		|||
 | 
			
		||||
		if not self.getJournalMatch():
 | 
			
		||||
			logSys.notice(
 | 
			
		||||
				"Jail started without 'journalmatch' set. "
 | 
			
		||||
				"[%s] Jail started without 'journalmatch' set. "
 | 
			
		||||
				"Jail regexs will be checked against all journal entries, "
 | 
			
		||||
				"which is not advised for performance reasons.")
 | 
			
		||||
				"which is not advised for performance reasons.", self.jailName)
 | 
			
		||||
 | 
			
		||||
		# Try to obtain the last known time (position of journal)
 | 
			
		||||
		start_time = 0
 | 
			
		||||
		if self.jail.database is not None:
 | 
			
		||||
			start_time = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0
 | 
			
		||||
		# Seek to max(last_known_time, now - findtime) in journal
 | 
			
		||||
		start_time = max( start_time, MyTime.time() - int(self.getFindTime()) )
 | 
			
		||||
		self.seekToTime(start_time)
 | 
			
		||||
		# Move back one entry to ensure do not end up in dead space
 | 
			
		||||
		# if start time beyond end of journal
 | 
			
		||||
		# Save current cursor position (to recognize in operation mode):
 | 
			
		||||
		logentry = None
 | 
			
		||||
		try:
 | 
			
		||||
			self.__journal.get_previous()
 | 
			
		||||
			self.__journal.seek_tail()
 | 
			
		||||
			logentry = self.__journal.get_previous()
 | 
			
		||||
			self.__journal.get_next()
 | 
			
		||||
		except OSError:
 | 
			
		||||
			pass # Reading failure, so safe to ignore
 | 
			
		||||
			logentry = None # Reading failure, so safe to ignore
 | 
			
		||||
		if logentry:
 | 
			
		||||
			# Try to obtain the last known time (position of journal)
 | 
			
		||||
			startTime = 0
 | 
			
		||||
			if self.jail.database is not None:
 | 
			
		||||
				startTime = self.jail.database.getJournalPos(self.jail, 'systemd-journal') or 0
 | 
			
		||||
			# Seek to max(last_known_time, now - findtime) in journal
 | 
			
		||||
			startTime = max( startTime, MyTime.time() - int(self.getFindTime()) )
 | 
			
		||||
			self.seekToTime(startTime)
 | 
			
		||||
			# Not in operation while we'll read old messages ...
 | 
			
		||||
			self.inOperation = False
 | 
			
		||||
			# Save current time in order to check time to switch "in operation" mode
 | 
			
		||||
			startTime = (1, MyTime.time(), logentry.get('__CURSOR'))
 | 
			
		||||
			# Move back one entry to ensure do not end up in dead space
 | 
			
		||||
			# if start time beyond end of journal
 | 
			
		||||
			try:
 | 
			
		||||
				self.__journal.get_previous()
 | 
			
		||||
			except OSError:
 | 
			
		||||
				pass # Reading failure, so safe to ignore
 | 
			
		||||
		else:
 | 
			
		||||
			# empty journal or no entries for current filter:
 | 
			
		||||
			self.inOperationMode()
 | 
			
		||||
			# seek_tail() seems to have a bug by no entries (could bypass some entries hereafter), so seek to now instead:
 | 
			
		||||
			startTime = MyTime.time()
 | 
			
		||||
			self.seekToTime(startTime)
 | 
			
		||||
			# for possible future switches of in-operation mode:
 | 
			
		||||
			startTime = (0, startTime)
 | 
			
		||||
 | 
			
		||||
		line = None
 | 
			
		||||
		while self.active:
 | 
			
		||||
| 
						 | 
				
			
			@ -317,12 +342,27 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
 | 
			
		|||
							e, exc_info=logSys.getEffectiveLevel() <= logging.DEBUG)
 | 
			
		||||
					self.ticks += 1
 | 
			
		||||
					if logentry:
 | 
			
		||||
						line = self.formatJournalEntry(logentry)
 | 
			
		||||
						self.processLineAndAdd(*line)
 | 
			
		||||
						line, tm = self.formatJournalEntry(logentry)
 | 
			
		||||
						# switch "in operation" mode if we'll find start entry (+ some delta):
 | 
			
		||||
						if not self.inOperation:
 | 
			
		||||
							if tm >= MyTime.time() - 1: # reached now (approximated):
 | 
			
		||||
								self.inOperationMode()
 | 
			
		||||
							elif startTime[0] == 1:
 | 
			
		||||
								# if it reached start entry (or get read time larger than start time)
 | 
			
		||||
								if logentry.get('__CURSOR') == startTime[2] or tm > startTime[1]:
 | 
			
		||||
									# give the filter same time it needed to reach the start entry:
 | 
			
		||||
									startTime = (0, MyTime.time()*2 - startTime[1])
 | 
			
		||||
							elif tm > startTime[1]: # reached start time (approximated):
 | 
			
		||||
								self.inOperationMode()
 | 
			
		||||
						# process line
 | 
			
		||||
						self.processLineAndAdd(line, tm)
 | 
			
		||||
						self.__modified += 1
 | 
			
		||||
						if self.__modified >= 100: # todo: should be configurable
 | 
			
		||||
							break
 | 
			
		||||
					else:
 | 
			
		||||
						# "in operation" mode since we don't have messages anymore (reached end of journal):
 | 
			
		||||
						if not self.inOperation:
 | 
			
		||||
							self.inOperationMode()
 | 
			
		||||
						break
 | 
			
		||||
				self.__modified = 0
 | 
			
		||||
				if self.ticks % 10 == 0:
 | 
			
		||||
| 
						 | 
				
			
			@ -334,7 +374,7 @@ class FilterSystemd(JournalFilter): # pragma: systemd no cover
 | 
			
		|||
				    or not self.active
 | 
			
		||||
				  )
 | 
			
		||||
				):
 | 
			
		||||
					self.jail.database.updateJournal(self.jail, 'systemd-journal', line[1], line[0][1])
 | 
			
		||||
					self.jail.database.updateJournal(self.jail, 'systemd-journal', tm, line[1])
 | 
			
		||||
					self.__nextUpdateTM = MyTime.time() + Utils.DEFAULT_SLEEP_TIME * 5
 | 
			
		||||
					line = None
 | 
			
		||||
			except Exception as e: # pragma: no cover
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1537,6 +1537,29 @@ def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
 | 
			
		|||
			_gen_falure("192.0.2.6")
 | 
			
		||||
			self.assertFalse(self.jail.getFailTicket())
 | 
			
		||||
 | 
			
		||||
			# now reset DB, so we'd find all messages before filter entering in operation mode:
 | 
			
		||||
			self.filter.stop()
 | 
			
		||||
			self.filter.join()
 | 
			
		||||
			self.jail.database.updateJournal(self.jail, 'systemd-journal', MyTime.time()-10000, 'TEST')
 | 
			
		||||
			self._initFilter()
 | 
			
		||||
			self.filter.setMaxRetry(1)
 | 
			
		||||
			states = []
 | 
			
		||||
			def _state(*args):
 | 
			
		||||
				self.assertNotIn("** in operation", states)
 | 
			
		||||
				self.assertFalse(self.filter.inOperation)
 | 
			
		||||
				states.append("** process line: %r" % (args,))
 | 
			
		||||
			self.filter.processLineAndAdd = _state
 | 
			
		||||
			def _inoper():
 | 
			
		||||
				self.assertNotIn("** in operation", states)
 | 
			
		||||
				self.assertEqual(len(states), 11)
 | 
			
		||||
				states.append("** in operation")
 | 
			
		||||
				self.filter.__class__.inOperationMode(self.filter)
 | 
			
		||||
			self.filter.inOperationMode = _inoper
 | 
			
		||||
			self.filter.start()
 | 
			
		||||
			self.waitForTicks(12)
 | 
			
		||||
			self.assertTrue(Utils.wait_for(lambda: len(states) == 12, _maxWaitTime(10)))
 | 
			
		||||
			self.assertEqual(states[-1], "** in operation")
 | 
			
		||||
 | 
			
		||||
		def test_delJournalMatch(self):
 | 
			
		||||
			self._initFilter()
 | 
			
		||||
			self.filter.start()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue