mirror of https://github.com/fail2ban/fail2ban
				
				
				
			ENH: tests much more robust now across pythons 2.4 -- 2.7
* needed additional sleeps for polling filter since that one relies on time-stamps and too rapid changes would not be caught by the PollFilter * in python 2.4, time stamps are up to a second (int's) so sleeps longer * test_new_bogus_file -- just to make sure that addition of new files does not alter our monitoringpull/8/merge
							parent
							
								
									d9248a6cf8
								
							
						
					
					
						commit
						a1a67d34a9
					
				| 
						 | 
				
			
			@ -24,6 +24,7 @@ __license__ = "GPL"
 | 
			
		|||
 | 
			
		||||
import unittest
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
import time
 | 
			
		||||
import tempfile
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -85,6 +86,10 @@ def _copy_lines_between_files(fin, fout, n=None, skip=0, mode='a', terminal_line
 | 
			
		|||
 | 
			
		||||
	Returns open fout
 | 
			
		||||
	"""
 | 
			
		||||
	if sys.version_info[:2] <= (2,4):
 | 
			
		||||
		# on old Python st_mtime is int, so we should give at least 1 sec so
 | 
			
		||||
		# polling filter could detect the change
 | 
			
		||||
		time.sleep(1)
 | 
			
		||||
	if isinstance(fin, str):
 | 
			
		||||
		fin = open(fin, 'r')
 | 
			
		||||
	if isinstance(fout, str):
 | 
			
		||||
| 
						 | 
				
			
			@ -315,6 +320,9 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
			self.filter.setActive(True)
 | 
			
		||||
			self.filter.addFailRegex("(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
 | 
			
		||||
			self.filter.start()
 | 
			
		||||
			# If filter is polling it would sleep a bit to guarantee that
 | 
			
		||||
			# we have initial time-stamp difference to trigger "actions"
 | 
			
		||||
			self._sleep_4_poll()
 | 
			
		||||
			#print "D: started filter %s" % self.filter
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -326,7 +334,7 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
			#print "D: WAITING FOR FILTER TO STOP"
 | 
			
		||||
			self.filter.join()		  # wait for the thread to terminate
 | 
			
		||||
			#print "D: KILLING THE FILE"
 | 
			
		||||
			#_killfile(self.file, self.name)
 | 
			
		||||
			_killfile(self.file, self.name)
 | 
			
		||||
			pass
 | 
			
		||||
 | 
			
		||||
		def __str__(self):
 | 
			
		||||
| 
						 | 
				
			
			@ -343,6 +351,19 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
				time.sleep(0.1)
 | 
			
		||||
			return False
 | 
			
		||||
 | 
			
		||||
		def _sleep_4_poll(self):
 | 
			
		||||
			# Since FilterPoll relies on time stamps and some
 | 
			
		||||
			# actions might be happening too fast in the tests,
 | 
			
		||||
			# sleep a bit to guarantee reliable time stamps
 | 
			
		||||
			if isinstance(self.filter, FilterPoll):
 | 
			
		||||
				if sys.version_info[:2] <= (2,4):
 | 
			
		||||
					# on old Python st_mtime is int, so we should give
 | 
			
		||||
					# at least 1 sec so polling filter could detect
 | 
			
		||||
					# the change
 | 
			
		||||
					time.sleep(0.5)
 | 
			
		||||
				else:
 | 
			
		||||
					time.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
		def isEmpty(self, delay=0.4):
 | 
			
		||||
			# shorter wait time for not modified status
 | 
			
		||||
			return not self.isFilled(delay)
 | 
			
		||||
| 
						 | 
				
			
			@ -354,7 +375,6 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
 | 
			
		||||
		def test_grow_file(self):
 | 
			
		||||
			# suck in lines from this sample log file
 | 
			
		||||
			#self.filter.getFailures(self.name)
 | 
			
		||||
			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 | 
			
		||||
 | 
			
		||||
			# Now let's feed it with entries from the file
 | 
			
		||||
| 
						 | 
				
			
			@ -365,7 +385,7 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
			# since it should have not been enough
 | 
			
		||||
 | 
			
		||||
			_copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
 | 
			
		||||
			self.isFilled(6)
 | 
			
		||||
			self.assertTrue(self.isFilled(6))
 | 
			
		||||
			# so we sleep for up to 2 sec for it not to become empty,
 | 
			
		||||
			# and meanwhile pass to other thread(s) and filter should
 | 
			
		||||
			# have gathered new failures and passed them into the
 | 
			
		||||
| 
						 | 
				
			
			@ -383,7 +403,6 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 | 
			
		||||
 | 
			
		||||
		def test_rewrite_file(self):
 | 
			
		||||
			#
 | 
			
		||||
			# if we rewrite the file at once
 | 
			
		||||
			self.file.close()
 | 
			
		||||
			_copy_lines_between_files(GetFailures.FILENAME_01, self.name)
 | 
			
		||||
| 
						 | 
				
			
			@ -399,11 +418,10 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
		def test_move_file(self):
 | 
			
		||||
			#
 | 
			
		||||
			# if we move file into a new location while it has been open already
 | 
			
		||||
			self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 | 
			
		||||
												  n=14, mode='w')
 | 
			
		||||
			self.isFilled(6)
 | 
			
		||||
			self.assertTrue(self.isEmpty(2))
 | 
			
		||||
			self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 | 
			
		||||
			self.assertEqual(self.filter.failManager.getFailTotal(), 2) # Fails with Poll from time to time
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -413,6 +431,25 @@ def get_monitor_failures_testcase(Filter_):
 | 
			
		|||
			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 | 
			
		||||
			self.assertEqual(self.filter.failManager.getFailTotal(), 3)
 | 
			
		||||
 | 
			
		||||
			# now remove the moved file
 | 
			
		||||
			_killfile(None, self.name + '.bak')
 | 
			
		||||
			_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 | 
			
		||||
			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 | 
			
		||||
			self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		def test_new_bogus_file(self):
 | 
			
		||||
			# to make sure that watching whole directory does not effect
 | 
			
		||||
			_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 | 
			
		||||
			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 | 
			
		||||
 | 
			
		||||
			# create a bogus file in the same directory and see if that doesn't affect
 | 
			
		||||
			open(self.name + '.bak2', 'w').write('')
 | 
			
		||||
			_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 | 
			
		||||
			self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 | 
			
		||||
			self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 | 
			
		||||
			_killfile(None, self.name + '.bak2')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		def test_delLogPath(self):
 | 
			
		||||
			# Smoke test for removing of the path from being watched
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue