webssh/tests/test_app.py

689 lines
25 KiB
Python
Raw Normal View History

2018-04-26 13:51:01 +00:00
import json
import random
import threading
import tornado.websocket
import tornado.gen
from tornado.testing import AsyncHTTPTestCase
from tornado.httpclient import HTTPError
2018-04-26 13:51:01 +00:00
from tornado.options import options
2018-05-30 13:29:44 +00:00
from tests.sshserver import run_ssh_server, banner
from tests.utils import encode_multipart_formdata, read_file, make_tests_data_path # noqa
from webssh import handler
from webssh.main import make_app, make_handlers
2018-08-27 23:43:43 +00:00
from webssh.settings import (
get_app_settings, get_server_settings, max_body_size
2018-08-27 23:43:43 +00:00
)
2018-08-20 10:35:50 +00:00
from webssh.utils import to_str
from webssh.worker import clients
2018-04-26 13:51:01 +00:00
2018-08-24 22:53:21 +00:00
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
2018-04-26 13:51:01 +00:00
handler.DELAY = 0.1
swallow_http_errors = handler.swallow_http_errors
2018-04-26 13:51:01 +00:00
2019-02-13 03:40:44 +00:00
class TestAppBase(AsyncHTTPTestCase):
def get_httpserver_options(self):
return get_server_settings(options)
def assert_response(self, bstr, response):
if swallow_http_errors:
self.assertEqual(response.code, 200)
self.assertIn(bstr, response.body)
else:
self.assertEqual(response.code, 400)
self.assertIn(b'Bad Request', response.body)
def assert_status_in(self, data, status):
self.assertIsNone(data['encoding'])
self.assertIsNone(data['id'])
self.assertIn(status, data['status'])
2019-02-13 12:51:12 +00:00
def assert_status_equal(self, data, status):
self.assertIsNone(data['encoding'])
self.assertIsNone(data['id'])
2019-02-13 03:40:44 +00:00
self.assertEqual(status, data['status'])
2019-02-13 12:51:12 +00:00
def assert_status_none(self, data):
self.assertIsNotNone(data['encoding'])
self.assertIsNotNone(data['id'])
self.assertIsNone(data['status'])
2019-02-13 03:40:44 +00:00
2019-02-13 12:51:12 +00:00
def fetch_request(self, url, method='GET', body='', headers={}, sync=True):
if not sync and url.startswith('/'):
2019-02-13 12:51:12 +00:00
url = self.get_url(url)
2019-02-13 03:40:44 +00:00
if isinstance(body, dict):
body = urlencode(body)
2019-02-13 12:51:12 +00:00
if not headers:
2019-02-13 03:40:44 +00:00
headers = self.headers
2019-02-13 12:51:12 +00:00
else:
headers.update(self.headers)
2019-02-13 03:40:44 +00:00
2019-02-13 12:51:12 +00:00
client = self if sync else self.get_http_client()
2019-02-13 03:40:44 +00:00
return client.fetch(url, method=method, body=body, headers=headers)
2019-02-13 12:51:12 +00:00
def sync_post(self, url, body, headers={}):
return self.fetch_request(url, 'POST', body, headers)
def async_post(self, url, body, headers={}):
return self.fetch_request(url, 'POST', body, headers, sync=False)
2019-02-13 03:40:44 +00:00
class TestAppBasic(TestAppBase):
2018-04-26 13:51:01 +00:00
2018-06-01 01:32:14 +00:00
running = [True]
2018-04-27 01:49:48 +00:00
sshserver_port = 2200
2018-08-28 12:08:03 +00:00
body = 'hostname=127.0.0.1&port={}&_xsrf=yummy&username=robey&password=foo'.format(sshserver_port) # noqa
headers = {'Cookie': '_xsrf=yummy'}
2018-04-26 13:51:01 +00:00
def get_app(self):
2018-08-26 13:53:28 +00:00
self.body_dict = {
'hostname': '127.0.0.1',
'port': str(self.sshserver_port),
'username': 'robey',
2018-08-28 12:08:03 +00:00
'password': '',
'_xsrf': 'yummy'
2018-08-26 13:53:28 +00:00
}
2018-04-26 13:51:01 +00:00
loop = self.io_loop
options.debug = False
2018-04-26 13:51:01 +00:00
options.policy = random.choice(['warning', 'autoadd'])
2018-10-15 14:06:02 +00:00
options.hostfile = ''
options.syshostfile = ''
options.tdstream = ''
2018-08-28 12:08:03 +00:00
app = make_app(make_handlers(loop, options), get_app_settings(options))
2018-04-26 13:51:01 +00:00
return app
2018-04-27 01:49:48 +00:00
@classmethod
def setUpClass(cls):
2018-06-01 01:32:14 +00:00
print('='*20)
2018-04-27 01:49:48 +00:00
t = threading.Thread(
2018-06-01 01:32:14 +00:00
target=run_ssh_server, args=(cls.sshserver_port, cls.running)
2018-04-27 01:49:48 +00:00
)
t.setDaemon(True)
t.start()
2018-04-26 13:51:01 +00:00
@classmethod
def tearDownClass(cls):
2018-06-01 01:32:14 +00:00
cls.running.pop()
print('='*20)
2018-04-26 13:51:01 +00:00
def test_app_with_invalid_form_for_missing_argument(self):
2018-04-26 13:51:01 +00:00
response = self.fetch('/')
self.assertEqual(response.code, 200)
2018-08-25 09:35:31 +00:00
2018-08-28 12:08:03 +00:00
body = 'port=7000&username=admin&password&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Missing argument hostname', response)
2018-08-25 09:35:31 +00:00
2018-08-28 12:08:03 +00:00
body = 'hostname=127.0.0.1&port=7000&password&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Missing argument username', response)
2018-08-25 09:35:31 +00:00
2018-08-28 12:08:03 +00:00
body = 'hostname=&port=&username=&password&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Missing value hostname', response)
2018-04-26 13:51:01 +00:00
2018-08-28 12:08:03 +00:00
body = 'hostname=127.0.0.1&port=7000&username=&password&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Missing value username', response)
2018-08-25 09:35:31 +00:00
def test_app_with_invalid_form_for_invalid_value(self):
2018-08-28 12:08:03 +00:00
body = 'hostname=127.0.0&port=22&username=&password&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Invalid hostname', response)
2018-08-20 13:20:55 +00:00
2018-08-28 12:08:03 +00:00
body = 'hostname=http://www.googe.com&port=22&username=&password&_xsrf=yummy' # noqa
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Invalid hostname', response)
2018-08-20 13:20:55 +00:00
2018-08-28 12:08:03 +00:00
body = 'hostname=127.0.0.1&port=port&username=&password&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Invalid port', response)
2018-04-26 13:51:01 +00:00
2018-08-28 12:08:03 +00:00
body = 'hostname=127.0.0.1&port=70000&username=&password&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'Invalid port', response)
2018-04-26 13:51:01 +00:00
def test_app_with_wrong_hostname_ip(self):
2018-11-16 03:42:25 +00:00
body = 'hostname=127.0.0.2&port=2200&username=admin&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
self.assertEqual(response.code, 200)
self.assertIn(b'Unable to connect to', response.body)
def test_app_with_wrong_hostname_domain(self):
2018-11-16 03:42:25 +00:00
body = 'hostname=xxxxxxxxxxxx&port=2200&username=admin&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
self.assertEqual(response.code, 200)
self.assertIn(b'Unable to connect to', response.body)
def test_app_with_wrong_port(self):
2018-08-28 12:08:03 +00:00
body = 'hostname=127.0.0.1&port=7000&username=admin&_xsrf=yummy'
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
self.assertEqual(response.code, 200)
self.assertIn(b'Unable to connect to', response.body)
2018-04-26 13:51:01 +00:00
def test_app_with_wrong_credentials(self):
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', self.body + 's')
2019-02-13 03:40:44 +00:00
self.assert_status_in(json.loads(to_str(response.body)), 'Authentication failed.') # noqa
2018-04-26 13:51:01 +00:00
def test_app_with_correct_credentials(self):
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', self.body)
self.assert_status_none(json.loads(to_str(response.body)))
2018-04-26 13:51:01 +00:00
2018-10-10 15:18:38 +00:00
def test_app_with_correct_credentials_but_with_no_port(self):
default_port = handler.DEFAULT_PORT
handler.DEFAULT_PORT = self.sshserver_port
# with no port value
body = self.body.replace(str(self.sshserver_port), '')
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
self.assert_status_none(json.loads(to_str(response.body)))
2018-10-10 15:18:38 +00:00
# with no port argument
body = body.replace('port=&', '')
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', body)
self.assert_status_none(json.loads(to_str(response.body)))
2018-10-10 15:18:38 +00:00
handler.DEFAULT_PORT = default_port
2018-04-26 13:51:01 +00:00
@tornado.testing.gen_test
def test_app_with_correct_credentials_timeout(self):
url = self.get_url('/')
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, self.body)
2018-08-20 10:35:50 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-04-26 13:51:01 +00:00
url = url.replace('http', 'ws')
2018-05-30 13:29:44 +00:00
ws_url = url + 'ws?id=' + data['id']
2018-04-26 13:51:01 +00:00
yield tornado.gen.sleep(handler.DELAY + 0.1)
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertIsNone(msg)
2018-08-24 22:53:21 +00:00
self.assertEqual(ws.close_reason, 'Websocket authentication failed.')
@tornado.testing.gen_test
def test_app_with_correct_credentials_user_robey(self):
url = self.get_url('/')
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, self.body)
2018-08-24 22:53:21 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-08-24 22:53:21 +00:00
url = url.replace('http', 'ws')
ws_url = url + 'ws?id=' + data['id']
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertEqual(to_str(msg, data['encoding']), banner)
ws.close()
2018-08-25 09:35:31 +00:00
@tornado.testing.gen_test
def test_app_with_correct_credentials_but_without_id_argument(self):
url = self.get_url('/')
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, self.body)
2018-08-25 09:35:31 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-08-25 09:35:31 +00:00
url = url.replace('http', 'ws')
ws_url = url + 'ws'
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertIsNone(msg)
self.assertIn('Missing argument id', ws.close_reason)
@tornado.testing.gen_test
def test_app_with_correct_credentials_but_empty_id(self):
2018-08-25 09:35:31 +00:00
url = self.get_url('/')
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, self.body)
2018-08-25 09:35:31 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-08-25 09:35:31 +00:00
url = url.replace('http', 'ws')
ws_url = url + 'ws?id='
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertIsNone(msg)
self.assertIn('Missing value id', ws.close_reason)
2018-08-25 09:35:31 +00:00
@tornado.testing.gen_test
def test_app_with_correct_credentials_but_wrong_id(self):
url = self.get_url('/')
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, self.body)
2018-08-25 09:35:31 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-08-25 09:35:31 +00:00
url = url.replace('http', 'ws')
ws_url = url + 'ws?id=1' + data['id']
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertIsNone(msg)
self.assertIn('Websocket authentication failed', ws.close_reason)
2018-08-24 22:53:21 +00:00
@tornado.testing.gen_test
def test_app_with_correct_credentials_user_bar(self):
body = self.body.replace('robey', 'bar')
2018-08-28 12:08:03 +00:00
url = self.get_url('/')
response = yield self.async_post(url, body)
2018-08-24 22:53:21 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-08-24 22:53:21 +00:00
url = url.replace('http', 'ws')
ws_url = url + 'ws?id=' + data['id']
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertEqual(to_str(msg, data['encoding']), banner)
# messages below will be ignored silently
yield ws.write_message('hello')
yield ws.write_message('"hello"')
yield ws.write_message('[hello]')
yield ws.write_message(json.dumps({'resize': []}))
yield ws.write_message(json.dumps({'resize': {}}))
yield ws.write_message(json.dumps({'resize': 'ab'}))
yield ws.write_message(json.dumps({'resize': ['a', 'b']}))
yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}}))
yield ws.write_message(json.dumps({'resize': [100]}))
yield ws.write_message(json.dumps({'resize': [100]*10}))
yield ws.write_message(json.dumps({'resize': [-1, -1]}))
yield ws.write_message(json.dumps({'data': [1]}))
yield ws.write_message(json.dumps({'data': (1,)}))
yield ws.write_message(json.dumps({'data': {'a': 2}}))
yield ws.write_message(json.dumps({'data': 1}))
yield ws.write_message(json.dumps({'data': 2.1}))
yield ws.write_message(json.dumps({'key-non-existed': 'hello'}))
# end - those just for testing webssh websocket stablity
yield ws.write_message(json.dumps({'resize': [79, 23]}))
msg = yield ws.read_message()
self.assertEqual(b'resized', msg)
yield ws.write_message(json.dumps({'data': 'bye'}))
msg = yield ws.read_message()
self.assertEqual(b'bye', msg)
ws.close()
@tornado.testing.gen_test
def test_app_auth_with_valid_pubkey_by_urlencoded_form(self):
url = self.get_url('/')
privatekey = read_file(make_tests_data_path('user_rsa_key'))
self.body_dict.update(privatekey=privatekey)
2019-02-13 12:51:12 +00:00
response = yield self.async_post(url, self.body_dict)
2018-08-24 22:53:21 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-08-24 22:53:21 +00:00
url = url.replace('http', 'ws')
ws_url = url + 'ws?id=' + data['id']
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertEqual(to_str(msg, data['encoding']), banner)
2018-04-26 13:51:01 +00:00
ws.close()
@tornado.testing.gen_test
2018-08-24 22:53:21 +00:00
def test_app_auth_with_valid_pubkey_by_multipart_form(self):
url = self.get_url('/')
privatekey = read_file(make_tests_data_path('user_rsa_key'))
files = [('privatekey', 'user_rsa_key', privatekey)]
content_type, body = encode_multipart_formdata(self.body_dict.items(),
files)
headers = {
2018-08-20 12:42:54 +00:00
'Content-Type': content_type, 'content-length': str(len(body))
}
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, body, headers=headers)
2018-08-20 10:35:50 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
url = url.replace('http', 'ws')
ws_url = url + 'ws?id=' + data['id']
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
2018-08-20 10:35:50 +00:00
self.assertEqual(to_str(msg, data['encoding']), banner)
ws.close()
@tornado.testing.gen_test
def test_app_auth_with_invalid_pubkey_for_user_robey(self):
url = self.get_url('/')
privatekey = 'h' * 1024
files = [('privatekey', 'user_rsa_key', privatekey)]
content_type, body = encode_multipart_formdata(self.body_dict.items(),
files)
headers = {
'Content-Type': content_type, 'content-length': str(len(body))
}
2018-08-27 23:43:43 +00:00
if swallow_http_errors:
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, body, headers=headers)
2019-06-27 04:52:19 +00:00
self.assertIn(b'Invalid key', response.body)
2018-08-27 23:43:43 +00:00
else:
with self.assertRaises(HTTPError) as ctx:
2018-08-28 12:08:03 +00:00
yield self.async_post(url, body, headers=headers)
2018-08-27 23:43:43 +00:00
self.assertIn('Bad Request', ctx.exception.message)
@tornado.testing.gen_test
def test_app_auth_with_pubkey_exceeds_key_max_size(self):
url = self.get_url('/')
2019-06-27 04:52:19 +00:00
privatekey = 'h' * (handler.PrivateKey.max_length + 1)
files = [('privatekey', 'user_rsa_key', privatekey)]
content_type, body = encode_multipart_formdata(self.body_dict.items(),
files)
headers = {
'Content-Type': content_type, 'content-length': str(len(body))
}
2018-08-27 23:43:43 +00:00
if swallow_http_errors:
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, body, headers=headers)
2019-06-27 04:52:19 +00:00
self.assertIn(b'Invalid key', response.body)
2018-08-27 23:43:43 +00:00
else:
with self.assertRaises(HTTPError) as ctx:
2018-08-28 12:08:03 +00:00
yield self.async_post(url, body, headers=headers)
2018-08-27 23:43:43 +00:00
self.assertIn('Bad Request', ctx.exception.message)
@tornado.testing.gen_test
def test_app_auth_with_pubkey_cannot_be_decoded_by_multipart_form(self):
url = self.get_url('/')
privatekey = 'h' * 1024
files = [('privatekey', 'user_rsa_key', privatekey)]
content_type, body = encode_multipart_formdata(self.body_dict.items(),
files)
body = body.encode('utf-8')
# added some gbk bytes to the privatekey, make it cannot be decoded
body = body[:-100] + b'\xb4\xed\xce\xf3' + body[-100:]
headers = {
2018-08-20 12:42:54 +00:00
'Content-Type': content_type, 'content-length': str(len(body))
}
2018-08-27 23:43:43 +00:00
if swallow_http_errors:
2018-08-28 12:08:03 +00:00
response = yield self.async_post(url, body, headers=headers)
2018-08-27 23:43:43 +00:00
self.assertIn(b'Invalid unicode', response.body)
else:
with self.assertRaises(HTTPError) as ctx:
2018-08-28 12:08:03 +00:00
yield self.async_post(url, body, headers=headers)
2018-08-27 23:43:43 +00:00
self.assertIn('Bad Request', ctx.exception.message)
def test_app_post_form_with_large_body_size_by_multipart_form(self):
2018-08-20 10:35:50 +00:00
privatekey = 'h' * (2 * max_body_size)
files = [('privatekey', 'user_rsa_key', privatekey)]
content_type, body = encode_multipart_formdata(self.body_dict.items(),
files)
headers = {
2018-08-20 12:42:54 +00:00
'Content-Type': content_type, 'content-length': str(len(body))
}
response = self.sync_post('/', body, headers=headers)
self.assertIn(response.code, [400, 599])
def test_app_post_form_with_large_body_size_by_urlencoded_form(self):
privatekey = 'h' * (2 * max_body_size)
body = self.body + '&privatekey=' + privatekey
response = self.sync_post('/', body)
self.assertIn(response.code, [400, 599])
2018-08-26 13:53:28 +00:00
@tornado.testing.gen_test
def test_app_with_user_keyonly_for_bad_authentication_type(self):
self.body_dict.update(username='keyonly', password='foo')
2019-02-13 12:51:12 +00:00
response = yield self.async_post('/', self.body_dict)
2018-08-26 13:53:28 +00:00
self.assertEqual(response.code, 200)
2019-02-13 03:40:44 +00:00
self.assert_status_in(json.loads(to_str(response.body)), 'Bad authentication type') # noqa
2018-08-29 01:19:35 +00:00
2019-02-13 03:40:44 +00:00
class OtherTestBase(TestAppBase):
2018-08-29 01:19:35 +00:00
sshserver_port = 3300
headers = {'Cookie': '_xsrf=yummy'}
debug = False
policy = None
2018-11-15 12:23:42 +00:00
xsrf = True
2018-10-15 14:06:02 +00:00
hostfile = ''
syshostfile = ''
tdstream = ''
maxconn = 20
2019-01-19 11:19:45 +00:00
origin = 'same'
2018-08-29 01:19:35 +00:00
body = {
'hostname': '127.0.0.1',
'port': '',
'username': 'robey',
'password': 'foo',
'_xsrf': 'yummy'
}
def get_app(self):
self.body.update(port=str(self.sshserver_port))
loop = self.io_loop
options.debug = self.debug
2018-11-15 12:23:42 +00:00
options.xsrf = self.xsrf
options.policy = self.policy if self.policy else random.choice(['warning', 'autoadd']) # noqa
2018-10-15 14:06:02 +00:00
options.hostfile = self.hostfile
options.syshostfile = self.syshostfile
options.tdstream = self.tdstream
options.maxconn = self.maxconn
2019-01-19 11:19:45 +00:00
options.origin = self.origin
2018-08-29 01:19:35 +00:00
app = make_app(make_handlers(loop, options), get_app_settings(options))
return app
def setUp(self):
print('='*20)
self.running = True
OtherTestBase.sshserver_port += 1
t = threading.Thread(
target=run_ssh_server, args=(self.sshserver_port, self.running)
)
t.setDaemon(True)
t.start()
super(OtherTestBase, self).setUp()
def tearDown(self):
self.running = False
print('='*20)
super(OtherTestBase, self).tearDown()
2018-11-15 12:23:42 +00:00
class TestAppInDebugMode(OtherTestBase):
2018-08-29 01:19:35 +00:00
debug = True
2018-08-29 10:10:57 +00:00
def assert_response(self, bstr, response):
2018-08-29 01:19:35 +00:00
if swallow_http_errors:
2018-08-29 10:10:57 +00:00
self.assertEqual(response.code, 200)
self.assertIn(bstr, response.body)
2018-08-29 01:19:35 +00:00
else:
2018-08-29 10:10:57 +00:00
self.assertEqual(response.code, 500)
self.assertIn(b'Uncaught exception', response.body)
2018-08-29 01:19:35 +00:00
2018-10-16 05:56:44 +00:00
def test_server_error_for_post_method(self):
2019-02-13 12:51:12 +00:00
body = dict(self.body, error='raise')
response = self.sync_post('/', body)
2018-08-29 10:10:57 +00:00
self.assert_response(b'"status": "Internal Server Error"', response)
2018-08-29 01:19:35 +00:00
def test_html(self):
response = self.fetch('/', method='GET')
self.assertIn(b'novalidate>', response.body)
2018-08-29 01:19:35 +00:00
2018-11-15 12:23:42 +00:00
class TestAppWithLargeBuffer(OtherTestBase):
2018-08-29 01:19:35 +00:00
2018-09-01 03:00:01 +00:00
@tornado.testing.gen_test
def test_app_for_sending_message_with_large_size(self):
url = self.get_url('/')
2019-02-13 12:51:12 +00:00
response = yield self.async_post(url, dict(self.body, username='foo'))
2018-09-01 03:00:01 +00:00
data = json.loads(to_str(response.body))
2019-02-13 12:51:12 +00:00
self.assert_status_none(data)
2018-09-01 03:00:01 +00:00
url = url.replace('http', 'ws')
ws_url = url + 'ws?id=' + data['id']
ws = yield tornado.websocket.websocket_connect(ws_url)
msg = yield ws.read_message()
self.assertEqual(to_str(msg, data['encoding']), banner)
send = 'h' * (64 * 1024) + '\r\n\r\n'
yield ws.write_message(json.dumps({'data': send}))
lst = []
while True:
msg = yield ws.read_message()
lst.append(msg)
if msg.endswith(b'\r\n\r\n'):
break
recv = b''.join(lst).decode(data['encoding'])
self.assertEqual(send, recv)
ws.close()
class TestAppWithRejectPolicy(OtherTestBase):
policy = 'reject'
2018-10-15 14:06:02 +00:00
hostfile = make_tests_data_path('known_hosts_example')
@tornado.testing.gen_test
def test_app_with_hostname_not_in_hostkeys(self):
2019-02-13 12:51:12 +00:00
response = yield self.async_post('/', self.body)
data = json.loads(to_str(response.body))
2018-10-08 14:37:03 +00:00
message = 'Connection to {}:{} is not allowed.'.format(self.body['hostname'], self.sshserver_port) # noqa
self.assertEqual(message, data['status'])
2018-10-16 06:51:15 +00:00
2018-11-15 12:23:42 +00:00
class TestAppWithBadHostKey(OtherTestBase):
policy = random.choice(['warning', 'autoadd', 'reject'])
hostfile = make_tests_data_path('test_known_hosts')
def setUp(self):
self.sshserver_port = 2222
super(TestAppWithBadHostKey, self).setUp()
@tornado.testing.gen_test
def test_app_with_bad_host_key(self):
2019-02-13 12:51:12 +00:00
response = yield self.async_post('/', self.body)
2018-11-15 12:23:42 +00:00
data = json.loads(to_str(response.body))
self.assertEqual('Bad host key.', data['status'])
2018-10-16 06:51:15 +00:00
class TestAppWithTrustedStream(OtherTestBase):
tdstream = '127.0.0.2'
def test_with_forbidden_get_request(self):
response = self.fetch('/', method='GET')
self.assertEqual(response.code, 403)
self.assertIn('Forbidden', response.error.message)
2018-10-16 06:51:15 +00:00
def test_with_forbidden_post_request(self):
2019-02-13 12:51:12 +00:00
response = self.sync_post('/', self.body)
self.assertEqual(response.code, 403)
self.assertIn('Forbidden', response.error.message)
2018-10-16 06:51:15 +00:00
def test_with_forbidden_put_request(self):
2019-02-13 12:51:12 +00:00
response = self.fetch_request('/', method='PUT', body=self.body)
2018-10-16 06:51:15 +00:00
self.assertEqual(response.code, 403)
self.assertIn('Forbidden', response.error.message)
2018-10-16 06:51:15 +00:00
class TestAppNotFoundHandler(OtherTestBase):
custom_headers = handler.MixinHandler.custom_headers
2018-10-16 06:51:15 +00:00
def test_with_not_found_get_request(self):
response = self.fetch('/pathnotfound', method='GET')
self.assertEqual(response.code, 404)
self.assertEqual(
response.headers['Server'], self.custom_headers['Server']
)
2018-10-16 06:51:15 +00:00
self.assertIn(b'404: Not Found', response.body)
def test_with_not_found_post_request(self):
2019-02-13 12:51:12 +00:00
response = self.sync_post('/pathnotfound', self.body)
2018-10-16 06:51:15 +00:00
self.assertEqual(response.code, 404)
self.assertEqual(
response.headers['Server'], self.custom_headers['Server']
)
2018-10-16 06:51:15 +00:00
self.assertIn(b'404: Not Found', response.body)
def test_with_not_found_put_request(self):
2019-02-13 12:51:12 +00:00
response = self.fetch_request('/pathnotfound', method='PUT',
body=self.body)
2018-10-16 06:51:15 +00:00
self.assertEqual(response.code, 404)
self.assertEqual(
response.headers['Server'], self.custom_headers['Server']
)
2018-10-16 06:51:15 +00:00
self.assertIn(b'404: Not Found', response.body)
2018-10-16 08:15:39 +00:00
2018-11-15 12:23:42 +00:00
class TestAppWithHeadRequest(OtherTestBase):
2018-10-16 08:15:39 +00:00
def test_with_index_path(self):
response = self.fetch('/', method='HEAD')
self.assertEqual(response.code, 200)
def test_with_ws_path(self):
response = self.fetch('/ws', method='HEAD')
self.assertEqual(response.code, 405)
def test_with_not_found_path(self):
response = self.fetch('/notfound', method='HEAD')
self.assertEqual(response.code, 404)
2018-11-15 12:23:42 +00:00
class TestAppWithPutRequest(OtherTestBase):
xsrf = False
@tornado.testing.gen_test
def test_app_with_method_not_supported(self):
with self.assertRaises(HTTPError) as ctx:
2019-02-13 12:51:12 +00:00
yield self.fetch_request('/', 'PUT', self.body, sync=False)
2018-11-15 12:23:42 +00:00
self.assertIn('Method Not Allowed', ctx.exception.message)
class TestAppWithTooManyConnections(OtherTestBase):
maxconn = 1
def setUp(self):
clients.clear()
super(TestAppWithTooManyConnections, self).setUp()
@tornado.testing.gen_test
def test_app_with_too_many_connections(self):
2019-02-13 01:30:06 +00:00
clients['127.0.0.1'] = {'fake_worker_id': None}
url = self.get_url('/')
2019-02-13 12:51:12 +00:00
response = yield self.async_post(url, self.body)
data = json.loads(to_str(response.body))
2019-02-21 08:12:48 +00:00
self.assertEqual('Too many live connections.', data['status'])
2019-02-13 01:30:06 +00:00
clients['127.0.0.1'].clear()
2019-02-13 12:51:12 +00:00
response = yield self.async_post(url, self.body)
self.assert_status_none(json.loads(to_str(response.body)))
2019-01-19 11:19:45 +00:00
2019-01-23 13:48:03 +00:00
class TestAppWithCrossOriginOperation(OtherTestBase):
2019-01-19 11:19:45 +00:00
origin = 'http://www.example.com'
@tornado.testing.gen_test
2019-01-23 13:48:03 +00:00
def test_app_with_wrong_event_origin(self):
2019-02-13 12:51:12 +00:00
body = dict(self.body, _origin='localhost')
response = yield self.async_post('/', body)
2019-02-13 03:40:44 +00:00
self.assert_status_equal(json.loads(to_str(response.body)), 'Cross origin operation is not allowed.') # noqa
2019-01-23 13:48:03 +00:00
@tornado.testing.gen_test
def test_app_with_wrong_header_origin(self):
2019-02-13 12:51:12 +00:00
headers = dict(Origin='localhost')
response = yield self.async_post('/', self.body, headers=headers)
2019-02-13 03:40:44 +00:00
self.assert_status_equal(json.loads(to_str(response.body)), 'Cross origin operation is not allowed.') # noqa
2019-01-19 11:19:45 +00:00
2019-01-23 13:48:03 +00:00
@tornado.testing.gen_test
def test_app_with_correct_event_origin(self):
2019-02-13 12:51:12 +00:00
body = dict(self.body, _origin=self.origin)
response = yield self.async_post('/', body)
self.assert_status_none(json.loads(to_str(response.body)))
2019-01-23 13:48:03 +00:00
self.assertIsNone(response.headers.get('Access-Control-Allow-Origin'))
@tornado.testing.gen_test
def test_app_with_correct_header_origin(self):
2019-02-13 12:51:12 +00:00
headers = dict(Origin=self.origin)
response = yield self.async_post('/', self.body, headers=headers)
self.assert_status_none(json.loads(to_str(response.body)))
2019-01-23 13:48:03 +00:00
self.assertEqual(
response.headers.get('Access-Control-Allow-Origin'), self.origin
)