多线程修复监控阻塞bug

pull/26/head
ibuler 2015-11-03 23:03:31 +08:00
parent b30697ea20
commit 9d75ad6cea
2 changed files with 39 additions and 9 deletions

View File

@ -5,6 +5,7 @@ import json
import os import os
import sys import sys
import os.path import os.path
import threading
import tornado.ioloop import tornado.ioloop
import tornado.options import tornado.options
@ -12,9 +13,10 @@ import tornado.web
import tornado.websocket import tornado.websocket
import tornado.httpserver import tornado.httpserver
import tornado.gen import tornado.gen
from tornado.websocket import WebSocketClosedError
from tornado.options import define, options from tornado.options import define, options
from pyinotify import WatchManager, Notifier, ProcessEvent, IN_DELETE, IN_CREATE, IN_MODIFY from pyinotify import WatchManager, Notifier, ProcessEvent, IN_DELETE, IN_CREATE, IN_MODIFY, AsyncNotifier, TornadoAsyncNotifier, ThreadedNotifier
define("port", default=8080, help="run on the given port", type=int) define("port", default=8080, help="run on the given port", type=int)
define("host", default='0.0.0.0', help="run port on", type=str) define("host", default='0.0.0.0', help="run port on", type=str)
@ -32,13 +34,16 @@ class EventHandler(ProcessEvent):
def process_IN_MODIFY(self, event): def process_IN_MODIFY(self, event):
print "Modify file:%s." % os.path.join(event.path, event.name) print "Modify file:%s." % os.path.join(event.path, event.name)
self.client.write_message(f.read()) try:
self.client.write_message(f.read())
except WebSocketClosedError:
raise WebSocketClosedError
def file_monitor(path='.', client=None): def file_monitor(path='.', client=None):
wm = WatchManager() wm = WatchManager()
mask = IN_DELETE | IN_CREATE | IN_MODIFY mask = IN_DELETE | IN_CREATE | IN_MODIFY
notifier = Notifier(wm, EventHandler(client)) notifier = AsyncNotifier(wm, EventHandler(client))
wm.add_watch(path, mask, auto_add=True, rec=True) wm.add_watch(path, mask, auto_add=True, rec=True)
if not os.path.isfile(path): if not os.path.isfile(path):
print "You should monitor a file" print "You should monitor a file"
@ -52,6 +57,7 @@ def file_monitor(path='.', client=None):
while True: while True:
try: try:
print "hello world"
notifier.process_events() notifier.process_events()
if notifier.check_events(): if notifier.check_events():
notifier.read_events() notifier.read_events()
@ -72,13 +78,15 @@ class Application(tornado.web.Application):
'cookie_secret': 'DFksdfsasdfkasdfFKwlwfsdfsa1204mx', 'cookie_secret': 'DFksdfsasdfkasdfFKwlwfsdfsa1204mx',
'template_path': os.path.join(os.path.dirname(__file__), 'templates'), 'template_path': os.path.join(os.path.dirname(__file__), 'templates'),
'static_path': os.path.join(os.path.dirname(__file__), 'static'), 'static_path': os.path.join(os.path.dirname(__file__), 'static'),
'debug': True,
} }
tornado.web.Application.__init__(self, handlers, **setting) tornado.web.Application.__init__(self, handlers, **setting)
class MonitorHandler(tornado.websocket.WebSocketHandler): class MonitorHandler(tornado.websocket.WebSocketHandler):
clients = set() clients = []
threads = []
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.file_path = None self.file_path = None
@ -90,20 +98,38 @@ class MonitorHandler(tornado.websocket.WebSocketHandler):
def open(self): def open(self):
# 获取监控的path # 获取监控的path
self.file_path = self.get_argument('file_path', '') self.file_path = self.get_argument('file_path', '')
MonitorHandler.clients.add(self) MonitorHandler.clients.append(self)
thread = threading.Thread(target=file_monitor, args=('%s.log' % self.file_path, self))
MonitorHandler.threads.append(thread)
self.stream.set_nodelay(True) self.stream.set_nodelay(True)
print len(MonitorHandler.threads), len(MonitorHandler.clients)
def on_message(self, message): def on_message(self, message):
self.write_message('Connect WebSocket Success. <br/>') self.write_message('Connect WebSocket Success. <br/>')
# 监控日志,发生变动发向客户端 # 监控日志,发生变动发向客户端
file_monitor('%s.log' % self.file_path, client=self)
self.write_message('Disconnect WebSocket.<br/>') try:
for t in MonitorHandler.threads:
if t.is_alive():
continue
t.setDaemon(True)
t.start()
except WebSocketClosedError, e:
client_index = MonitorHandler.clients.index(self)
MonitorHandler.threads[client_index].stop()
MonitorHandler.clients.remove(self)
MonitorHandler.threads.remove(MonitorHandler.threads[client_index])
def on_close(self): def on_close(self):
# 客户端主动关闭 # 客户端主动关闭
self.close() # self.close()
self.finish()
print "Close websocket."
client_index = MonitorHandler.clients.index(self)
MonitorHandler.clients.remove(self) MonitorHandler.clients.remove(self)
MonitorHandler.threads.remove(MonitorHandler.threads[client_index])
class MainHandler(tornado.web.RequestHandler): class MainHandler(tornado.web.RequestHandler):

View File

@ -114,6 +114,10 @@
socket.send(file_path) socket.send(file_path)
}; };
window.onbeforeunload = function(){
socket.close()
};
var username = obj.closest('tr').find('#username').text(); var username = obj.closest('tr').find('#username').text();
var ip = obj.closest('tr').find('#ip').text(); var ip = obj.closest('tr').find('#ip').text();