mirror of https://github.com/huashengdun/webssh
Refactored tests
parent
d389e32b85
commit
8922813142
|
@ -27,7 +27,54 @@ handler.DELAY = 0.1
|
|||
swallow_http_errors = handler.swallow_http_errors
|
||||
|
||||
|
||||
class TestAppBasic(AsyncHTTPTestCase):
|
||||
class TestAppBase(AsyncHTTPTestCase):
|
||||
|
||||
def get_httpserver_options(self):
|
||||
return get_server_settings(options)
|
||||
|
||||
def assert_response(self, bstr, response):
|
||||
if swallow_http_errors:
|
||||
self.assertEqual(response.code, 200)
|
||||
self.assertIn(bstr, response.body)
|
||||
else:
|
||||
self.assertEqual(response.code, 400)
|
||||
self.assertIn(b'Bad Request', response.body)
|
||||
|
||||
def assert_status_in(self, data, status):
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIn(status, data['status'])
|
||||
|
||||
def assert_status_equal(self, data, status=''):
|
||||
if not status:
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
else:
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertEqual(status, data['status'])
|
||||
|
||||
def sync_post(self, body, headers={}, url='/', method='POST'):
|
||||
headers.update(self.headers)
|
||||
return self.fetch(url, method=method, body=body, headers=headers)
|
||||
|
||||
def async_post(self, url, body, headers={}, method='POST'):
|
||||
if url.startswith('/'):
|
||||
url = self.get_url('/')
|
||||
|
||||
if isinstance(body, dict):
|
||||
body = urlencode(body)
|
||||
|
||||
if headers:
|
||||
headers.update(self.headers)
|
||||
else:
|
||||
headers = self.headers
|
||||
|
||||
client = self.get_http_client()
|
||||
return client.fetch(url, method=method, body=body, headers=headers)
|
||||
|
||||
|
||||
class TestAppBasic(TestAppBase):
|
||||
|
||||
running = [True]
|
||||
sshserver_port = 2200
|
||||
|
@ -65,26 +112,6 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
cls.running.pop()
|
||||
print('='*20)
|
||||
|
||||
def get_httpserver_options(self):
|
||||
return get_server_settings(options)
|
||||
|
||||
def assert_response(self, bstr, response):
|
||||
if swallow_http_errors:
|
||||
self.assertEqual(response.code, 200)
|
||||
self.assertIn(bstr, response.body)
|
||||
else:
|
||||
self.assertEqual(response.code, 400)
|
||||
self.assertIn(b'Bad Request', response.body)
|
||||
|
||||
def sync_post(self, body, headers={}, url='/', method='POST'):
|
||||
headers.update(self.headers)
|
||||
return self.fetch(url, method=method, body=body, headers=headers)
|
||||
|
||||
def async_post(self, url, body, headers={}, method='POST'):
|
||||
client = self.get_http_client()
|
||||
headers.update(self.headers)
|
||||
return client.fetch(url, method=method, body=body, headers=headers)
|
||||
|
||||
def test_app_with_invalid_form_for_missing_argument(self):
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 200)
|
||||
|
@ -142,17 +169,11 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
|
||||
def test_app_with_wrong_credentials(self):
|
||||
response = self.sync_post(self.body + 's')
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIn('Authentication failed.', data['status'])
|
||||
self.assert_status_in(json.loads(to_str(response.body)), 'Authentication failed.') # noqa
|
||||
|
||||
def test_app_with_correct_credentials(self):
|
||||
response = self.sync_post(self.body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), None)
|
||||
|
||||
def test_app_with_correct_credentials_but_with_no_port(self):
|
||||
default_port = handler.DEFAULT_PORT
|
||||
|
@ -161,18 +182,12 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
# with no port value
|
||||
body = self.body.replace(str(self.sshserver_port), '')
|
||||
response = self.sync_post(body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), None)
|
||||
|
||||
# with no port argument
|
||||
body = body.replace('port=&', '')
|
||||
response = self.sync_post(body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), None)
|
||||
|
||||
handler.DEFAULT_PORT = default_port
|
||||
|
||||
|
@ -181,9 +196,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
url = self.get_url('/')
|
||||
response = yield self.async_post(url, self.body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id=' + data['id']
|
||||
|
@ -198,9 +211,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
url = self.get_url('/')
|
||||
response = yield self.async_post(url, self.body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id=' + data['id']
|
||||
|
@ -214,9 +225,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
url = self.get_url('/')
|
||||
response = yield self.async_post(url, self.body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws'
|
||||
|
@ -230,9 +239,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
url = self.get_url('/')
|
||||
response = yield self.async_post(url, self.body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id='
|
||||
|
@ -246,9 +253,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
url = self.get_url('/')
|
||||
response = yield self.async_post(url, self.body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id=1' + data['id']
|
||||
|
@ -263,9 +268,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
url = self.get_url('/')
|
||||
response = yield self.async_post(url, body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id=' + data['id']
|
||||
|
@ -310,9 +313,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
body = urlencode(self.body_dict)
|
||||
response = yield self.async_post(url, body)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id=' + data['id']
|
||||
|
@ -333,9 +334,7 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
}
|
||||
response = yield self.async_post(url, body, headers=headers)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id=' + data['id']
|
||||
|
@ -404,7 +403,6 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_post_form_with_large_body_size_by_multipart_form(self):
|
||||
url = self.get_url('/')
|
||||
privatekey = 'h' * (2 * max_body_size)
|
||||
files = [('privatekey', 'user_rsa_key', privatekey)]
|
||||
content_type, body = encode_multipart_formdata(self.body_dict.items(),
|
||||
|
@ -414,32 +412,27 @@ class TestAppBasic(AsyncHTTPTestCase):
|
|||
}
|
||||
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
yield self.async_post(url, body, headers=headers)
|
||||
yield self.async_post('/', body, headers=headers)
|
||||
self.assertIn('Bad Request', ctx.exception.message)
|
||||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_post_form_with_large_body_size_by_urlencoded_form(self):
|
||||
url = self.get_url('/')
|
||||
privatekey = 'h' * (2 * max_body_size)
|
||||
body = self.body + '&privatekey=' + privatekey
|
||||
with self.assertRaises(HTTPError) as ctx:
|
||||
yield self.async_post(url, body)
|
||||
yield self.async_post('/', body)
|
||||
self.assertIn('Bad Request', ctx.exception.message)
|
||||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_with_user_keyonly_for_bad_authentication_type(self):
|
||||
url = self.get_url('/')
|
||||
self.body_dict.update(username='keyonly', password='foo')
|
||||
body = urlencode(self.body_dict)
|
||||
response = yield self.async_post(url, body)
|
||||
response = yield self.async_post('/', body)
|
||||
self.assertEqual(response.code, 200)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertIn('Bad authentication type', data['status'])
|
||||
self.assert_status_in(json.loads(to_str(response.body)), 'Bad authentication type') # noqa
|
||||
|
||||
|
||||
class OtherTestBase(AsyncHTTPTestCase):
|
||||
class OtherTestBase(TestAppBase):
|
||||
sshserver_port = 3300
|
||||
headers = {'Cookie': '_xsrf=yummy'}
|
||||
debug = False
|
||||
|
@ -472,9 +465,6 @@ class OtherTestBase(AsyncHTTPTestCase):
|
|||
app = make_app(make_handlers(loop, options), get_app_settings(options))
|
||||
return app
|
||||
|
||||
def get_httpserver_options(self):
|
||||
return get_server_settings(options)
|
||||
|
||||
def setUp(self):
|
||||
print('='*20)
|
||||
self.running = True
|
||||
|
@ -524,14 +514,10 @@ class TestAppWithLargeBuffer(OtherTestBase):
|
|||
@tornado.testing.gen_test
|
||||
def test_app_for_sending_message_with_large_size(self):
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(dict(self.body, username='foo'))
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=self.headers)
|
||||
response = yield self.async_post(url, body, self.headers)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['status'])
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assert_status_equal(data, None)
|
||||
|
||||
url = url.replace('http', 'ws')
|
||||
ws_url = url + 'ws?id=' + data['id']
|
||||
|
@ -559,14 +545,8 @@ class TestAppWithRejectPolicy(OtherTestBase):
|
|||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_with_hostname_not_in_hostkeys(self):
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(self.body)
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=self.headers)
|
||||
response = yield self.async_post('/', urlencode(self.body), self.headers) # noqa
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIsNone(data['encoding'])
|
||||
message = 'Connection to {}:{} is not allowed.'.format(self.body['hostname'], self.sshserver_port) # noqa
|
||||
self.assertEqual(message, data['status'])
|
||||
|
||||
|
@ -582,14 +562,8 @@ class TestAppWithBadHostKey(OtherTestBase):
|
|||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_with_bad_host_key(self):
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(self.body)
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=self.headers)
|
||||
response = yield self.async_post('/', urlencode(self.body), self.headers) # noqa
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertEqual('Bad host key.', data['status'])
|
||||
|
||||
|
||||
|
@ -690,23 +664,13 @@ class TestAppWithTooManyConnections(OtherTestBase):
|
|||
clients['127.0.0.1'] = {'fake_worker_id': None}
|
||||
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(self.body)
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=self.headers)
|
||||
response = yield self.async_post(url, urlencode(self.body), self.headers) # noqa
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertEqual(data['status'], 'Too many connections.')
|
||||
self.assertEqual('Too many connections.', data['status']) # noqa
|
||||
|
||||
clients['127.0.0.1'].clear()
|
||||
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=self.headers)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assertIsNone(data['status'])
|
||||
response = yield self.async_post(url, urlencode(self.body), self.headers) # noqa
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), None)
|
||||
|
||||
|
||||
class TestAppWithCrossOriginOperation(OtherTestBase):
|
||||
|
@ -715,58 +679,28 @@ class TestAppWithCrossOriginOperation(OtherTestBase):
|
|||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_with_wrong_event_origin(self):
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(dict(self.body, _origin='localhost'))
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=self.headers)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertEqual(
|
||||
'Cross origin operation is not allowed.', data['status']
|
||||
)
|
||||
response = yield self.async_post('/', body, self.headers)
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), 'Cross origin operation is not allowed.') # noqa
|
||||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_with_wrong_header_origin(self):
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(self.body)
|
||||
headers = dict(self.headers, Origin='localhost')
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=headers)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNone(data['id'])
|
||||
self.assertIsNone(data['encoding'])
|
||||
self.assertEqual(
|
||||
'Cross origin operation is not allowed.', data['status']
|
||||
)
|
||||
response = yield self.async_post('/', urlencode(self.body), headers)
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), 'Cross origin operation is not allowed.') # noqa
|
||||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_with_correct_event_origin(self):
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(dict(self.body, _origin=self.origin))
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=self.headers)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assertIsNone(data['status'])
|
||||
response = yield self.async_post('/', body, self.headers)
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), None)
|
||||
self.assertIsNone(response.headers.get('Access-Control-Allow-Origin'))
|
||||
|
||||
@tornado.testing.gen_test
|
||||
def test_app_with_correct_header_origin(self):
|
||||
url = self.get_url('/')
|
||||
client = self.get_http_client()
|
||||
body = urlencode(self.body)
|
||||
headers = dict(self.headers, Origin=self.origin)
|
||||
response = yield client.fetch(url, method='POST', body=body,
|
||||
headers=headers)
|
||||
data = json.loads(to_str(response.body))
|
||||
self.assertIsNotNone(data['id'])
|
||||
self.assertIsNotNone(data['encoding'])
|
||||
self.assertIsNone(data['status'])
|
||||
response = yield self.async_post('/', urlencode(self.body), headers)
|
||||
self.assert_status_equal(json.loads(to_str(response.body)), None)
|
||||
self.assertEqual(
|
||||
response.headers.get('Access-Control-Allow-Origin'), self.origin
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue