jumpserver/run_websocket.py

362 lines
12 KiB
Python
Raw Normal View History

2015-11-02 03:18:06 +00:00
# coding: utf-8
import time
2015-11-07 05:38:50 +00:00
import datetime
2015-11-02 03:18:06 +00:00
import json
2015-11-02 15:20:40 +00:00
import os
import sys
2015-11-02 03:18:06 +00:00
import os.path
2015-11-03 15:03:31 +00:00
import threading
2015-11-23 15:07:58 +00:00
import datetime
2015-11-07 09:32:32 +00:00
import urllib
2015-11-02 03:18:06 +00:00
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
import tornado.httpserver
2015-11-03 11:12:15 +00:00
import tornado.gen
2015-11-07 09:32:32 +00:00
import tornado.httpclient
2015-11-03 15:03:31 +00:00
from tornado.websocket import WebSocketClosedError
2015-11-02 03:18:06 +00:00
from tornado.options import define, options
2015-11-05 12:07:21 +00:00
from pyinotify import WatchManager, Notifier, ProcessEvent, IN_DELETE, IN_CREATE, IN_MODIFY, AsyncNotifier
2015-11-22 13:59:07 +00:00
import select
2015-11-05 10:31:00 +00:00
2015-11-23 15:07:58 +00:00
from connect import Tty, User, Asset, PermRole, logger, get_object
2015-11-23 07:34:28 +00:00
from connect import TtyLog, Log, Session, user_have_perm
2015-11-05 10:31:00 +00:00
try:
import simplejson as json
except ImportError:
import json
2015-11-02 03:18:06 +00:00
2015-11-05 12:07:21 +00:00
define("port", default=3000, help="run on the given port", type=int)
2015-11-03 11:12:15 +00:00
define("host", default='0.0.0.0', help="run port on", type=str)
2015-11-02 03:18:06 +00:00
2015-11-23 15:07:58 +00:00
def require_auth(role='user'):
def _deco(func):
2015-11-24 03:58:42 +00:00
def _deco2(request, *args, **kwargs):
2015-11-23 15:07:58 +00:00
if request.get_cookie('sessionid'):
session_key = request.get_cookie('sessionid')
else:
2015-11-24 08:31:06 +00:00
session_key = request.get_argument('sessionid', '')
2015-11-23 15:07:58 +00:00
2015-11-24 03:58:42 +00:00
logger.debug('Websocket: session_key: %s' % session_key)
2015-11-23 15:07:58 +00:00
if session_key:
session = get_object(Session, session_key=session_key)
2015-11-24 03:58:42 +00:00
logger.debug('Websocket: session: %s' % session)
if session and datetime.datetime.now() < session.expire_date:
2015-11-23 15:07:58 +00:00
user_id = session.get_decoded().get('_auth_user_id')
user = get_object(User, id=user_id)
if user:
logger.debug('Websocket: user [ %s ] request websocket' % user.username)
request.user = user
if role == 'admin':
if user.role in ['SU', 'GA']:
return func(request, *args, **kwargs)
logger.debug('Websocket: user [ %s ] is not admin.' % user.username)
else:
return func(request, *args, **kwargs)
2015-11-24 03:58:42 +00:00
else:
logger.debug('Websocket: session expired: %s' % session_key)
2015-11-24 08:31:06 +00:00
try:
request.close()
except AttributeError:
pass
2015-11-23 15:07:58 +00:00
logger.warning('Websocket: Request auth failed.')
# asset_id = int(request.get_argument('id', 9999))
# print asset_id
# asset = Asset.objects.filter(id=asset_id)
# if asset:
# asset = asset[0]
# request.asset = asset
# else:
# request.close()
#
# if user:
# user = user[0]
# request.user = user
#
# else:
# print("No session user.")
# request.close()
2015-11-24 03:58:42 +00:00
return _deco2
2015-11-07 09:32:32 +00:00
return _deco
2015-11-03 15:21:33 +00:00
class MyThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(MyThread, self).__init__(*args, **kwargs)
def run(self):
try:
super(MyThread, self).run()
except WebSocketClosedError:
pass
2015-11-02 15:20:40 +00:00
class EventHandler(ProcessEvent):
def __init__(self, client=None):
self.client = client
def process_IN_MODIFY(self, event):
2015-11-03 15:21:33 +00:00
self.client.write_message(f.read())
2015-11-02 15:20:40 +00:00
def file_monitor(path='.', client=None):
wm = WatchManager()
mask = IN_DELETE | IN_CREATE | IN_MODIFY
2015-11-03 15:03:31 +00:00
notifier = AsyncNotifier(wm, EventHandler(client))
2015-11-02 15:20:40 +00:00
wm.add_watch(path, mask, auto_add=True, rec=True)
if not os.path.isfile(path):
2015-11-23 15:07:58 +00:00
logger.debug("File %s does not exist." % path)
2015-11-02 15:20:40 +00:00
sys.exit(3)
else:
2015-11-23 15:07:58 +00:00
logger.debug("Now starting monitor file %s." % path)
2015-11-02 15:20:40 +00:00
global f
f = open(path, 'r')
st_size = os.stat(path)[6]
f.seek(st_size)
while True:
try:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
print "keyboard Interrupt."
notifier.stop()
break
2015-11-02 03:18:06 +00:00
class Application(tornado.web.Application):
def __init__(self):
handlers = [
2015-11-03 11:12:15 +00:00
(r'/monitor', MonitorHandler),
2015-11-05 10:31:00 +00:00
(r'/terminal', WebTerminalHandler),
2015-11-07 07:27:49 +00:00
(r'/kill', WebTerminalKillHandler),
2015-11-02 03:18:06 +00:00
]
setting = {
'cookie_secret': 'DFksdfsasdfkasdfFKwlwfsdfsa1204mx',
'template_path': os.path.join(os.path.dirname(__file__), 'templates'),
'static_path': os.path.join(os.path.dirname(__file__), 'static'),
2015-11-03 15:03:31 +00:00
'debug': True,
2015-11-02 03:18:06 +00:00
}
tornado.web.Application.__init__(self, handlers, **setting)
2015-11-03 11:12:15 +00:00
class MonitorHandler(tornado.websocket.WebSocketHandler):
2015-11-03 15:03:31 +00:00
clients = []
threads = []
2015-11-02 03:18:06 +00:00
2015-11-03 11:12:15 +00:00
def __init__(self, *args, **kwargs):
self.file_path = None
super(self.__class__, self).__init__(*args, **kwargs)
2015-11-02 15:20:40 +00:00
def check_origin(self, origin):
return True
2015-11-23 15:07:58 +00:00
@require_auth('admin')
2015-11-02 03:18:06 +00:00
def open(self):
2015-11-03 11:12:15 +00:00
# 获取监控的path
self.file_path = self.get_argument('file_path', '')
2015-11-03 15:03:31 +00:00
MonitorHandler.clients.append(self)
2015-11-03 15:21:33 +00:00
thread = MyThread(target=file_monitor, args=('%s.log' % self.file_path, self))
2015-11-03 15:03:31 +00:00
MonitorHandler.threads.append(thread)
2015-11-02 03:18:06 +00:00
self.stream.set_nodelay(True)
2015-11-03 15:03:31 +00:00
try:
for t in MonitorHandler.threads:
if t.is_alive():
continue
t.setDaemon(True)
t.start()
2015-11-03 15:21:33 +00:00
except WebSocketClosedError:
2015-11-03 15:03:31 +00:00
client_index = MonitorHandler.clients.index(self)
MonitorHandler.threads[client_index].stop()
MonitorHandler.clients.remove(self)
MonitorHandler.threads.remove(MonitorHandler.threads[client_index])
2015-11-02 15:20:40 +00:00
2015-11-23 15:07:58 +00:00
logger.debug("Websocket: Monitor client num: %s, thread num: %s" % (len(MonitorHandler.clients),
len(MonitorHandler.threads)))
2015-11-07 05:38:50 +00:00
def on_message(self, message):
# 监控日志,发生变动发向客户端
pass
2015-11-02 03:18:06 +00:00
def on_close(self):
# 客户端主动关闭
2015-11-03 15:03:31 +00:00
# self.close()
2015-11-23 15:07:58 +00:00
logger.debug("Websocket: Monitor client close request")
try:
client_index = MonitorHandler.clients.index(self)
MonitorHandler.clients.remove(self)
MonitorHandler.threads.remove(MonitorHandler.threads[client_index])
except ValueError:
pass
2015-11-02 03:18:06 +00:00
2015-11-07 05:38:50 +00:00
class WebTty(Tty):
def __init__(self, *args, **kwargs):
super(WebTty, self).__init__(*args, **kwargs)
self.login_type = 'web'
self.ws = None
self.data = ''
2015-11-07 05:38:50 +00:00
self.input_mode = False
2015-11-07 07:27:49 +00:00
class WebTerminalKillHandler(tornado.web.RequestHandler):
2015-11-23 15:07:58 +00:00
@require_auth('admin')
2015-11-07 07:27:49 +00:00
def get(self):
ws_id = self.get_argument('id')
2015-11-08 04:01:08 +00:00
Log.objects.filter(id=ws_id).update(is_finished=True)
2015-11-07 07:27:49 +00:00
for ws in WebTerminalHandler.clients:
if ws.id == int(ws_id):
2015-11-24 08:31:06 +00:00
logger.debug("Kill log id %s" % ws_id)
2015-11-07 09:32:32 +00:00
ws.log.save()
2015-11-07 07:27:49 +00:00
ws.close()
2015-11-24 08:31:06 +00:00
logger.debug('Websocket: web terminal client num: %s' % len(WebTerminalHandler.clients))
2015-11-07 07:27:49 +00:00
2015-11-05 10:31:00 +00:00
class WebTerminalHandler(tornado.websocket.WebSocketHandler):
2015-11-07 07:27:49 +00:00
clients = []
tasks = []
2015-11-05 10:31:00 +00:00
def __init__(self, *args, **kwargs):
2015-11-07 05:38:50 +00:00
self.term = None
self.log_file_f = None
self.log_time_f = None
self.log = None
2015-11-07 07:27:49 +00:00
self.id = 0
2015-11-23 07:34:28 +00:00
self.user = None
2015-11-05 10:31:00 +00:00
super(WebTerminalHandler, self).__init__(*args, **kwargs)
def check_origin(self, origin):
return True
2015-11-24 03:58:42 +00:00
@require_auth('user')
2015-11-05 10:31:00 +00:00
def open(self):
2015-11-24 03:58:42 +00:00
logger.debug('Websocket: Open request')
2015-11-23 15:07:58 +00:00
role_name = self.get_argument('role', 'sb')
asset_id = self.get_argument('id', 9999)
asset = get_object(Asset, id=asset_id)
if asset:
roles = user_have_perm(self.user, asset)
2015-11-24 03:58:42 +00:00
logger.debug(roles)
2015-11-29 03:33:19 +00:00
logger.debug('rolename: %s' % role_name)
2015-11-23 15:07:58 +00:00
login_role = ''
for role in roles:
if role.name == role_name:
login_role = role
break
if not login_role:
2015-11-24 03:01:54 +00:00
logger.warning('Websocket: Not that Role %s for Host: %s User: %s ' % (role_name, asset.hostname,
2015-11-23 15:07:58 +00:00
self.user.username))
self.close()
return
2015-11-24 03:01:54 +00:00
else:
logger.warning('Websocket: No that Host: %s User: %s ' % (asset_id, self.user.username))
self.close()
return
logger.debug('Websocket: request web terminal Host: %s User: %s Role: %s' % (asset.hostname, self.user.username,
login_role.name))
2015-11-24 03:58:42 +00:00
self.term = WebTty(self.user, asset, login_role)
2015-11-26 11:49:23 +00:00
self.term.remote_ip = self.request.remote_ip
2015-11-07 05:38:50 +00:00
self.term.get_connection()
self.term.channel = self.term.ssh.invoke_shell(term='xterm')
2015-11-07 05:38:50 +00:00
WebTerminalHandler.tasks.append(MyThread(target=self.forward_outbound))
2015-11-07 07:27:49 +00:00
WebTerminalHandler.clients.append(self)
2015-11-05 10:31:00 +00:00
for t in WebTerminalHandler.tasks:
if t.is_alive():
continue
2015-11-25 10:59:12 +00:00
try:
t.setDaemon(True)
t.start()
except RuntimeError:
pass
2015-11-05 10:31:00 +00:00
def on_message(self, message):
data = json.loads(message)
if not data:
return
2015-11-07 05:38:50 +00:00
if data.get('data'):
self.term.input_mode = True
if str(data['data']) in ['\r', '\n', '\r\n']:
2015-11-22 13:59:07 +00:00
if self.term.vim_flag:
match = self.term.ps1_pattern.search(self.term.vim_data)
if match:
self.term.vim_flag = False
vim_data = self.term.deal_command(self.term.vim_data)[0:200]
if len(data) > 0:
TtyLog(log=self.log, datetime=datetime.datetime.now(), cmd=vim_data).save()
TtyLog(log=self.log, datetime=datetime.datetime.now(),
cmd=self.term.deal_command(self.term.data)[0:200]).save()
self.term.vim_data = ''
self.term.data = ''
2015-11-07 05:38:50 +00:00
self.term.input_mode = False
self.term.channel.send(data['data'])
2015-11-05 10:31:00 +00:00
def on_close(self):
2015-11-24 03:58:42 +00:00
logger.debug('Websocket: Close request')
2015-11-07 07:27:49 +00:00
if self in WebTerminalHandler.clients:
WebTerminalHandler.clients.remove(self)
2015-11-07 09:32:32 +00:00
try:
self.log_file_f.write('End time is %s' % datetime.datetime.now())
self.log.is_finished = True
self.log.end_time = datetime.datetime.now()
self.log.save()
2015-11-25 10:59:12 +00:00
self.log_time_f.close()
2015-11-07 09:32:32 +00:00
self.close()
except AttributeError:
pass
2015-11-07 05:38:50 +00:00
def forward_outbound(self):
self.log_file_f, self.log_time_f, self.log = self.term.get_log()
2015-11-07 07:27:49 +00:00
self.id = self.log.id
2015-11-05 10:31:00 +00:00
try:
data = ''
2015-11-07 05:38:50 +00:00
pre_timestamp = time.time()
2015-11-05 10:31:00 +00:00
while True:
r, w, e = select.select([self.term.channel, sys.stdin], [], [])
if self.term.channel in r:
recv = self.term.channel.recv(1024)
2015-11-06 04:09:23 +00:00
if not len(recv):
return
data += recv
2015-11-22 13:59:07 +00:00
if self.term.vim_flag:
self.term.vim_data += recv
2015-11-06 04:09:23 +00:00
try:
self.write_message(json.dumps({'data': data}))
2015-11-07 05:38:50 +00:00
now_timestamp = time.time()
2015-11-26 05:21:05 +00:00
self.log_time_f.write('%s %s\n' % (round(now_timestamp-pre_timestamp, 4), len(data)))
2015-11-07 05:38:50 +00:00
self.log_file_f.write(data)
pre_timestamp = now_timestamp
self.log_file_f.flush()
self.log_time_f.flush()
if self.term.input_mode and not self.term.is_output(data):
self.term.data += data
2015-11-06 04:09:23 +00:00
data = ''
except UnicodeDecodeError:
pass
2015-11-05 10:31:00 +00:00
finally:
self.close()
2015-11-02 03:18:06 +00:00
if __name__ == '__main__':
tornado.options.parse_command_line()
app = Application()
server = tornado.httpserver.HTTPServer(app)
2015-11-03 11:12:15 +00:00
server.bind(options.port, options.host)
# server.listen(options.port)
server.start(num_processes=1)
2015-11-22 13:59:07 +00:00
print "Run server on %s:%s" % (options.host, options.port)
2015-11-02 03:18:06 +00:00
tornado.ioloop.IOLoop.instance().start()