mirror of https://github.com/fail2ban/fail2ban
TST: test to verify killing stuck children processes
parent
7112e4f6c6
commit
515ad6dc12
|
@ -24,12 +24,14 @@ __author__ = "Cyril Jaquier"
|
|||
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
|
||||
__license__ = "GPL"
|
||||
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
|
||||
from ..server.action import CommandAction, CallingMap
|
||||
|
||||
from .utils import LogCaptureTestCase
|
||||
|
||||
from .utils import pid_exists
|
||||
|
||||
class CommandActionTest(LogCaptureTestCase):
|
||||
|
||||
|
@ -202,6 +204,44 @@ class CommandActionTest(LogCaptureTestCase):
|
|||
or self._is_logged('sleep 60 -- timed out after 3 seconds'))
|
||||
self.assertTrue(self._is_logged('sleep 60 -- killed with SIGTERM'))
|
||||
|
||||
def testExecuteTimeoutWithNastyChildren(self):
|
||||
# temporary file for a nasty kid shell script
|
||||
tmpFilename = tempfile.mktemp(".sh", "fail2ban_")
|
||||
# Create a nasty script which would hang there for a while
|
||||
with open(tmpFilename, 'w') as f:
|
||||
f.write("""#!/bin/bash
|
||||
trap : HUP EXIT TERM
|
||||
|
||||
echo "$$" > %s.pid
|
||||
echo "my pid $$ . sleeping lo-o-o-ong"
|
||||
sleep 10000
|
||||
""" % tmpFilename)
|
||||
|
||||
def getnastypid():
|
||||
with open(tmpFilename + '.pid') as f:
|
||||
return int(f.read())
|
||||
|
||||
# First test if can kill the bastard
|
||||
self.assertRaises(
|
||||
RuntimeError, CommandAction.executeCmd, 'bash %s' % tmpFilename, timeout=.1)
|
||||
# Verify that the proccess itself got killed
|
||||
self.assertFalse(pid_exists(getnastypid())) # process should have been killed
|
||||
self.assertTrue(self._is_logged('timed out'))
|
||||
self.assertTrue(self._is_logged('killed with SIGTERM'))
|
||||
|
||||
# A bit evolved case even though, previous test already tests killing children processes
|
||||
self.assertRaises(
|
||||
RuntimeError, CommandAction.executeCmd, 'out=`bash %s`; echo ALRIGHT' % tmpFilename,
|
||||
timeout=.2)
|
||||
# Verify that the proccess itself got killed
|
||||
self.assertFalse(pid_exists(getnastypid()))
|
||||
self.assertTrue(self._is_logged('timed out'))
|
||||
self.assertTrue(self._is_logged('killed with SIGTERM'))
|
||||
|
||||
os.unlink(tmpFilename)
|
||||
os.unlink(tmpFilename + '.pid')
|
||||
|
||||
|
||||
def testCaptureStdOutErr(self):
|
||||
CommandAction.executeCmd('echo "How now brown cow"')
|
||||
self.assertTrue(self._is_logged("'How now brown cow\\n'"))
|
||||
|
|
|
@ -237,3 +237,30 @@ class LogCaptureTestCase(unittest.TestCase):
|
|||
|
||||
def printLog(self):
|
||||
print(self._log.getvalue())
|
||||
|
||||
# Solution from http://stackoverflow.com/questions/568271/how-to-check-if-there-exists-a-process-with-a-given-pid
|
||||
# under cc by-sa 3.0
|
||||
if os.name == 'posix':
|
||||
def pid_exists(pid):
|
||||
"""Check whether pid exists in the current process table."""
|
||||
import errno
|
||||
if pid < 0:
|
||||
return False
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
except OSError as e:
|
||||
return e.errno == errno.EPERM
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
def pid_exists(pid):
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
SYNCHRONIZE = 0x100000
|
||||
|
||||
process = kernel32.OpenProcess(SYNCHRONIZE, 0, pid)
|
||||
if process != 0:
|
||||
kernel32.CloseHandle(process)
|
||||
return True
|
||||
else:
|
||||
return False
|
Loading…
Reference in New Issue