|
|
|
@ -14,6 +14,11 @@ from webssh.main import make_app, make_handlers
|
|
|
|
|
from webssh.settings import get_app_settings, max_body_size
|
|
|
|
|
from webssh.utils import to_str
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from urllib.parse import urlencode
|
|
|
|
|
except ImportError:
|
|
|
|
|
from urllib import urlencode
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
handler.DELAY = 0.1
|
|
|
|
|
|
|
|
|
@ -123,10 +128,102 @@ class TestApp(AsyncHTTPTestCase):
|
|
|
|
|
ws = yield tornado.websocket.websocket_connect(ws_url)
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertIsNone(msg)
|
|
|
|
|
self.assertEqual(ws.close_reason, 'Websocket authentication failed.')
|
|
|
|
|
|
|
|
|
|
@tornado.testing.gen_test
|
|
|
|
|
def test_app_with_correct_credentials_user_robey(self):
|
|
|
|
|
url = self.get_url('/')
|
|
|
|
|
client = self.get_http_client()
|
|
|
|
|
response = yield client.fetch(url)
|
|
|
|
|
self.assertEqual(response.code, 200)
|
|
|
|
|
|
|
|
|
|
response = yield client.fetch(url, method='POST', body=self.body)
|
|
|
|
|
data = json.loads(to_str(response.body))
|
|
|
|
|
self.assertIsNone(data['status'])
|
|
|
|
|
self.assertIsNotNone(data['id'])
|
|
|
|
|
self.assertIsNotNone(data['encoding'])
|
|
|
|
|
|
|
|
|
|
url = url.replace('http', 'ws')
|
|
|
|
|
ws_url = url + 'ws?id=' + data['id']
|
|
|
|
|
ws = yield tornado.websocket.websocket_connect(ws_url)
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(to_str(msg, data['encoding']), banner)
|
|
|
|
|
ws.close()
|
|
|
|
|
|
|
|
|
|
@tornado.testing.gen_test
|
|
|
|
|
def test_app_with_correct_credentials_user_bar(self):
|
|
|
|
|
url = self.get_url('/')
|
|
|
|
|
client = self.get_http_client()
|
|
|
|
|
response = yield client.fetch(url)
|
|
|
|
|
self.assertEqual(response.code, 200)
|
|
|
|
|
|
|
|
|
|
body = self.body.replace('robey', 'bar')
|
|
|
|
|
response = yield client.fetch(url, method='POST', body=body)
|
|
|
|
|
data = json.loads(to_str(response.body))
|
|
|
|
|
self.assertIsNone(data['status'])
|
|
|
|
|
self.assertIsNotNone(data['id'])
|
|
|
|
|
self.assertIsNotNone(data['encoding'])
|
|
|
|
|
|
|
|
|
|
url = url.replace('http', 'ws')
|
|
|
|
|
ws_url = url + 'ws?id=' + data['id']
|
|
|
|
|
ws = yield tornado.websocket.websocket_connect(ws_url)
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(to_str(msg, data['encoding']), banner)
|
|
|
|
|
|
|
|
|
|
# messages below will be ignored silently
|
|
|
|
|
yield ws.write_message('hello')
|
|
|
|
|
yield ws.write_message('"hello"')
|
|
|
|
|
yield ws.write_message('[hello]')
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': []}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': {}}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': 'ab'}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': ['a', 'b']}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [100]}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [100]*10}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [-1, -1]}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': [1]}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': (1,)}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': {'a': 2}}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': 1}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': 2.1}))
|
|
|
|
|
yield ws.write_message(json.dumps({'key-non-existed': 'hello'}))
|
|
|
|
|
# end - those just for testing webssh websocket stablity
|
|
|
|
|
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [79, 23]}))
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(b'resized', msg)
|
|
|
|
|
|
|
|
|
|
yield ws.write_message(json.dumps({'data': 'bye'}))
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(b'bye', msg)
|
|
|
|
|
ws.close()
|
|
|
|
|
|
|
|
|
|
@tornado.testing.gen_test
|
|
|
|
|
def test_app_auth_with_valid_pubkey_by_urlencoded_form(self):
|
|
|
|
|
url = self.get_url('/')
|
|
|
|
|
client = self.get_http_client()
|
|
|
|
|
response = yield client.fetch(url)
|
|
|
|
|
self.assertEqual(response.code, 200)
|
|
|
|
|
|
|
|
|
|
privatekey = read_file(make_tests_data_path('user_rsa_key'))
|
|
|
|
|
self.body_dict.update(privatekey=privatekey)
|
|
|
|
|
body = urlencode(self.body_dict)
|
|
|
|
|
response = yield client.fetch(url, method='POST', body=body)
|
|
|
|
|
data = json.loads(to_str(response.body))
|
|
|
|
|
self.assertIsNone(data['status'])
|
|
|
|
|
self.assertIsNotNone(data['id'])
|
|
|
|
|
self.assertIsNotNone(data['encoding'])
|
|
|
|
|
|
|
|
|
|
url = url.replace('http', 'ws')
|
|
|
|
|
ws_url = url + 'ws?id=' + data['id']
|
|
|
|
|
ws = yield tornado.websocket.websocket_connect(ws_url)
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(to_str(msg, data['encoding']), banner)
|
|
|
|
|
ws.close()
|
|
|
|
|
|
|
|
|
|
@tornado.testing.gen_test
|
|
|
|
|
def test_app_auth_with_valid_pubkey_for_user_robey(self):
|
|
|
|
|
def test_app_auth_with_valid_pubkey_by_multipart_form(self):
|
|
|
|
|
url = self.get_url('/')
|
|
|
|
|
client = self.get_http_client()
|
|
|
|
|
response = yield client.fetch(url)
|
|
|
|
@ -237,72 +334,3 @@ class TestApp(AsyncHTTPTestCase):
|
|
|
|
|
|
|
|
|
|
with self.assertRaises(HTTPError):
|
|
|
|
|
yield client.fetch(url, method='POST', headers=headers, body=body)
|
|
|
|
|
|
|
|
|
|
@tornado.testing.gen_test
|
|
|
|
|
def test_app_with_correct_credentials_user_robey(self):
|
|
|
|
|
url = self.get_url('/')
|
|
|
|
|
client = self.get_http_client()
|
|
|
|
|
response = yield client.fetch(url)
|
|
|
|
|
self.assertEqual(response.code, 200)
|
|
|
|
|
|
|
|
|
|
response = yield client.fetch(url, method='POST', body=self.body)
|
|
|
|
|
data = json.loads(to_str(response.body))
|
|
|
|
|
self.assertIsNone(data['status'])
|
|
|
|
|
self.assertIsNotNone(data['id'])
|
|
|
|
|
self.assertIsNotNone(data['encoding'])
|
|
|
|
|
|
|
|
|
|
url = url.replace('http', 'ws')
|
|
|
|
|
ws_url = url + 'ws?id=' + data['id']
|
|
|
|
|
ws = yield tornado.websocket.websocket_connect(ws_url)
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(to_str(msg, data['encoding']), banner)
|
|
|
|
|
ws.close()
|
|
|
|
|
|
|
|
|
|
@tornado.testing.gen_test
|
|
|
|
|
def test_app_with_correct_credentials_user_bar(self):
|
|
|
|
|
url = self.get_url('/')
|
|
|
|
|
client = self.get_http_client()
|
|
|
|
|
response = yield client.fetch(url)
|
|
|
|
|
self.assertEqual(response.code, 200)
|
|
|
|
|
|
|
|
|
|
body = self.body.replace('robey', 'bar')
|
|
|
|
|
response = yield client.fetch(url, method='POST', body=body)
|
|
|
|
|
data = json.loads(to_str(response.body))
|
|
|
|
|
self.assertIsNone(data['status'])
|
|
|
|
|
self.assertIsNotNone(data['id'])
|
|
|
|
|
self.assertIsNotNone(data['encoding'])
|
|
|
|
|
|
|
|
|
|
url = url.replace('http', 'ws')
|
|
|
|
|
ws_url = url + 'ws?id=' + data['id']
|
|
|
|
|
ws = yield tornado.websocket.websocket_connect(ws_url)
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(to_str(msg, data['encoding']), banner)
|
|
|
|
|
|
|
|
|
|
# messages below will be ignored silently
|
|
|
|
|
yield ws.write_message('hello')
|
|
|
|
|
yield ws.write_message('"hello"')
|
|
|
|
|
yield ws.write_message('[hello]')
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': []}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': {}}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': 'ab'}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': ['a', 'b']}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [100]}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [100]*10}))
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [-1, -1]}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': [1]}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': (1,)}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': {'a': 2}}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': 1}))
|
|
|
|
|
yield ws.write_message(json.dumps({'data': 2.1}))
|
|
|
|
|
yield ws.write_message(json.dumps({'key-non-existed': 'hello'}))
|
|
|
|
|
# end - those just for testing webssh websocket stablity
|
|
|
|
|
|
|
|
|
|
yield ws.write_message(json.dumps({'resize': [79, 23]}))
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(b'resized', msg)
|
|
|
|
|
|
|
|
|
|
yield ws.write_message(json.dumps({'data': 'bye'}))
|
|
|
|
|
msg = yield ws.read_message()
|
|
|
|
|
self.assertEqual(b'bye', msg)
|
|
|
|
|
ws.close()
|
|
|
|
|