You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
jumpserver/run_websocket.py

294 lines
9.2 KiB

9 years ago
# coding: utf-8
import time
import datetime
9 years ago
import json
9 years ago
import os
import sys
9 years ago
import os.path
import threading
import urllib
9 years ago
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
import tornado.httpserver
9 years ago
import tornado.gen
import tornado.httpclient
from tornado.websocket import WebSocketClosedError
9 years ago
from tornado.options import define, options
from pyinotify import WatchManager, Notifier, ProcessEvent, IN_DELETE, IN_CREATE, IN_MODIFY, AsyncNotifier
# from gevent import monkey
# monkey.patch_all()
# import gevent
# from gevent.socket import wait_read, wait_write
import struct, fcntl, signal, socket, select, fnmatch
import paramiko
from connect import Tty
from connect import TtyLog, Log
try:
import simplejson as json
except ImportError:
import json
9 years ago
define("port", default=3000, help="run on the given port", type=int)
9 years ago
define("host", default='0.0.0.0', help="run port on", type=str)
9 years ago
def require_auth(func):
def _deco(request, *args, **kwargs):
username = request.get_argument('username', '')
asset_name = request.get_argument('asset_name', '')
token = request.get_argument('token', '')
print username, asset_name, token
client = tornado.httpclient.HTTPClient()
# response = client.fetch('http://some/url') + urllib.urlencode({'username': username,
# 'asset_name': asset_name, 'token': token})
# return request.close()
return func(request, *args, **kwargs)
return _deco
class MyThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(MyThread, self).__init__(*args, **kwargs)
def run(self):
try:
super(MyThread, self).run()
except WebSocketClosedError:
pass
9 years ago
class EventHandler(ProcessEvent):
def __init__(self, client=None):
self.client = client
def process_IN_CREATE(self, event):
print "Create file:%s." % os.path.join(event.path, event.name)
def process_IN_DELETE(self, event):
print "Delete file:%s." % os.path.join(event.path, event.name)
def process_IN_MODIFY(self, event):
print "Modify file:%s." % os.path.join(event.path, event.name)
self.client.write_message(f.read())
9 years ago
def file_monitor(path='.', client=None):
wm = WatchManager()
mask = IN_DELETE | IN_CREATE | IN_MODIFY
notifier = AsyncNotifier(wm, EventHandler(client))
9 years ago
wm.add_watch(path, mask, auto_add=True, rec=True)
if not os.path.isfile(path):
print "You should monitor a file"
sys.exit(3)
else:
print "now starting monitor %s." % path
9 years ago
global f
f = open(path, 'r')
st_size = os.stat(path)[6]
f.seek(st_size)
while True:
try:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
print "keyboard Interrupt."
notifier.stop()
break
9 years ago
class Application(tornado.web.Application):
def __init__(self):
handlers = [
9 years ago
(r'/monitor', MonitorHandler),
(r'/terminal', WebTerminalHandler),
(r'/kill', WebTerminalKillHandler),
9 years ago
]
setting = {
'cookie_secret': 'DFksdfsasdfkasdfFKwlwfsdfsa1204mx',
'template_path': os.path.join(os.path.dirname(__file__), 'templates'),
'static_path': os.path.join(os.path.dirname(__file__), 'static'),
'debug': True,
9 years ago
}
tornado.web.Application.__init__(self, handlers, **setting)
9 years ago
class MonitorHandler(tornado.websocket.WebSocketHandler):
clients = []
threads = []
9 years ago
9 years ago
def __init__(self, *args, **kwargs):
self.file_path = None
super(self.__class__, self).__init__(*args, **kwargs)
9 years ago
def check_origin(self, origin):
return True
@require_auth
9 years ago
def open(self):
9 years ago
# 获取监控的path
self.file_path = self.get_argument('file_path', '')
MonitorHandler.clients.append(self)
thread = MyThread(target=file_monitor, args=('%s.log' % self.file_path, self))
MonitorHandler.threads.append(thread)
9 years ago
self.stream.set_nodelay(True)
try:
for t in MonitorHandler.threads:
if t.is_alive():
continue
t.setDaemon(True)
t.start()
except WebSocketClosedError:
client_index = MonitorHandler.clients.index(self)
MonitorHandler.threads[client_index].stop()
MonitorHandler.clients.remove(self)
MonitorHandler.threads.remove(MonitorHandler.threads[client_index])
9 years ago
print len(MonitorHandler.threads), len(MonitorHandler.clients)
def on_message(self, message):
# 监控日志,发生变动发向客户端
pass
9 years ago
def on_close(self):
# 客户端主动关闭
# self.close()
print "Close websocket."
client_index = MonitorHandler.clients.index(self)
9 years ago
MonitorHandler.clients.remove(self)
MonitorHandler.threads.remove(MonitorHandler.threads[client_index])
9 years ago
class WebTty(Tty):
def __init__(self, *args, **kwargs):
super(WebTty, self).__init__(*args, **kwargs)
self.login_type = 'web'
self.ws = None
self.data = ''
self.input_mode = False
class WebTerminalKillHandler(tornado.web.RequestHandler):
def get(self):
ws_id = self.get_argument('id')
Log.objects.filter(id=ws_id).update(is_finished=True)
for ws in WebTerminalHandler.clients:
print ws.id
if ws.id == int(ws_id):
print "killed"
ws.log.save()
ws.close()
print len(WebTerminalHandler.clients)
class WebTerminalHandler(tornado.websocket.WebSocketHandler):
clients = []
tasks = []
def __init__(self, *args, **kwargs):
self.term = None
self.log_file_f = None
self.log_time_f = None
self.log = None
self.id = 0
super(WebTerminalHandler, self).__init__(*args, **kwargs)
def check_origin(self, origin):
return True
@require_auth
def open(self):
asset_name = self.get_argument('asset_name', '')
username = self.get_argument('username', '')
token = self.get_argument('token', '')
print asset_name, username, token
self.term = WebTty('a', 'b')
self.term.get_connection()
self.term.channel = self.term.ssh.invoke_shell(term='xterm')
WebTerminalHandler.tasks.append(MyThread(target=self.forward_outbound))
WebTerminalHandler.clients.append(self)
for t in WebTerminalHandler.tasks:
if t.is_alive():
continue
t.setDaemon(True)
t.start()
def on_message(self, message):
data = json.loads(message)
if not data:
return
if data.get('data'):
self.term.input_mode = True
if str(data['data']) in ['\r', '\n', '\r\n']:
TtyLog(log=self.log, datetime=datetime.datetime.now(), cmd=self.term.deal_command(self.term.data, self.term.ssh)).save()
self.term.data = ''
self.term.input_mode = False
self.term.channel.send(data['data'])
def on_close(self):
print 'On_close'
if self in WebTerminalHandler.clients:
WebTerminalHandler.clients.remove(self)
try:
self.log_file_f.write('End time is %s' % datetime.datetime.now())
self.log.is_finished = True
self.log.end_time = datetime.datetime.now()
self.log.save()
self.close()
except AttributeError:
pass
def forward_outbound(self):
self.log_file_f, self.log_time_f, self.log = self.term.get_log()
self.id = self.log.id
try:
data = ''
pre_timestamp = time.time()
while True:
r, w, e = select.select([self.term.channel, sys.stdin], [], [])
if self.term.channel in r:
recv = self.term.channel.recv(1024)
if not len(recv):
return
data += recv
try:
self.write_message(json.dumps({'data': data}))
now_timestamp = time.time()
self.log_time_f.write('%s %s\n' % (round(now_timestamp-pre_timestamp, 4), len(data)))
self.log_file_f.write(data)
pre_timestamp = now_timestamp
self.log_file_f.flush()
self.log_time_f.flush()
if self.term.input_mode and not self.term.is_output(data):
self.term.data += data
data = ''
except UnicodeDecodeError:
pass
finally:
self.close()
9 years ago
if __name__ == '__main__':
tornado.options.parse_command_line()
app = Application()
server = tornado.httpserver.HTTPServer(app)
9 years ago
server.bind(options.port, options.host)
# server.listen(options.port)
server.start(num_processes=1)
9 years ago
tornado.ioloop.IOLoop.instance().start()