mirror of https://github.com/huashengdun/webssh
Added KEY_MAX_SIZE to validate the private key
parent
07dc33df1a
commit
3c139e05f8
|
@ -161,8 +161,7 @@ class TestApp(AsyncHTTPTestCase):
|
||||||
response = yield client.fetch(url)
|
response = yield client.fetch(url)
|
||||||
self.assertEqual(response.code, 200)
|
self.assertEqual(response.code, 200)
|
||||||
|
|
||||||
privatekey = read_file(os.path.join(base_dir, 'tests', 'user_rsa_key'))
|
privatekey = 'h' * 1024
|
||||||
privatekey = privatekey[:100] + 'bad' + privatekey[100:]
|
|
||||||
files = [('privatekey', 'user_rsa_key', privatekey)]
|
files = [('privatekey', 'user_rsa_key', privatekey)]
|
||||||
content_type, body = encode_multipart_formdata(self.body_dict.items(),
|
content_type, body = encode_multipart_formdata(self.body_dict.items(),
|
||||||
files)
|
files)
|
||||||
|
@ -172,9 +171,54 @@ class TestApp(AsyncHTTPTestCase):
|
||||||
response = yield client.fetch(url, method='POST', headers=headers,
|
response = yield client.fetch(url, method='POST', headers=headers,
|
||||||
body=body)
|
body=body)
|
||||||
data = json.loads(to_str(response.body))
|
data = json.loads(to_str(response.body))
|
||||||
self.assertIsNotNone(data['status'])
|
|
||||||
self.assertIsNone(data['id'])
|
self.assertIsNone(data['id'])
|
||||||
self.assertIsNone(data['encoding'])
|
self.assertIsNone(data['encoding'])
|
||||||
|
self.assertEqual(data['status'], 'Not a valid private key or wrong password for decrypting the key.') # noqa
|
||||||
|
|
||||||
|
@tornado.testing.gen_test
|
||||||
|
def test_app_auth_with_pubkey_exceeds_key_max_size(self):
|
||||||
|
url = self.get_url('/')
|
||||||
|
client = self.get_http_client()
|
||||||
|
response = yield client.fetch(url)
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
|
||||||
|
privatekey = 'h' * (handler.KEY_MAX_SIZE * 2)
|
||||||
|
files = [('privatekey', 'user_rsa_key', privatekey)]
|
||||||
|
content_type, body = encode_multipart_formdata(self.body_dict.items(),
|
||||||
|
files)
|
||||||
|
headers = {
|
||||||
|
'Content-Type': content_type, 'content-length': str(len(body))
|
||||||
|
}
|
||||||
|
response = yield client.fetch(url, method='POST', headers=headers,
|
||||||
|
body=body)
|
||||||
|
data = json.loads(to_str(response.body))
|
||||||
|
self.assertIsNone(data['id'])
|
||||||
|
self.assertIsNone(data['encoding'])
|
||||||
|
self.assertEqual(data['status'], 'Not a valid private key.')
|
||||||
|
|
||||||
|
@tornado.testing.gen_test
|
||||||
|
def test_app_auth_with_pubkey_cannot_be_decoded(self):
|
||||||
|
url = self.get_url('/')
|
||||||
|
client = self.get_http_client()
|
||||||
|
response = yield client.fetch(url)
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
|
||||||
|
privatekey = 'h' * 1024
|
||||||
|
files = [('privatekey', 'user_rsa_key', privatekey)]
|
||||||
|
content_type, body = encode_multipart_formdata(self.body_dict.items(),
|
||||||
|
files)
|
||||||
|
body = body.encode('utf-8')
|
||||||
|
# added some gbk bytes to the privatekey, make it cannot be decoded
|
||||||
|
body = body[:-100] + b'\xb4\xed\xce\xf3' + body[-100:]
|
||||||
|
headers = {
|
||||||
|
'Content-Type': content_type, 'content-length': str(len(body))
|
||||||
|
}
|
||||||
|
response = yield client.fetch(url, method='POST', headers=headers,
|
||||||
|
body=body)
|
||||||
|
data = json.loads(to_str(response.body))
|
||||||
|
self.assertIsNone(data['id'])
|
||||||
|
self.assertIsNone(data['encoding'])
|
||||||
|
self.assertEqual(data['status'], 'Not a valid private key.')
|
||||||
|
|
||||||
@tornado.testing.gen_test
|
@tornado.testing.gen_test
|
||||||
def test_app_post_form_with_large_body_size(self):
|
def test_app_post_form_with_large_body_size(self):
|
||||||
|
|
|
@ -28,6 +28,7 @@ except ImportError:
|
||||||
|
|
||||||
|
|
||||||
DELAY = 3
|
DELAY = 3
|
||||||
|
KEY_MAX_SIZE = 16384
|
||||||
|
|
||||||
|
|
||||||
def parse_encoding(data):
|
def parse_encoding(data):
|
||||||
|
@ -69,9 +70,16 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
|
||||||
def get_privatekey(self):
|
def get_privatekey(self):
|
||||||
try:
|
try:
|
||||||
data = self.request.files.get('privatekey')[0]['body']
|
data = self.request.files.get('privatekey')[0]['body']
|
||||||
except TypeError:
|
except TypeError: # no privatekey provided
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if len(data) < KEY_MAX_SIZE:
|
||||||
|
try:
|
||||||
return to_str(data)
|
return to_str(data)
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
raise ValueError('Not a valid private key.')
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_specific_pkey(cls, pkeycls, privatekey, password):
|
def get_specific_pkey(cls, pkeycls, privatekey, password):
|
||||||
|
@ -96,8 +104,8 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
|
||||||
or cls.get_specific_pkey(paramiko.Ed25519Key, privatekey,
|
or cls.get_specific_pkey(paramiko.Ed25519Key, privatekey,
|
||||||
password)
|
password)
|
||||||
if not pkey:
|
if not pkey:
|
||||||
raise ValueError('Not a valid private key file or '
|
raise ValueError('Not a valid private key or wrong password '
|
||||||
'wrong password for decrypting the private key.')
|
'for decrypting the key.')
|
||||||
return pkey
|
return pkey
|
||||||
|
|
||||||
def get_hostname(self):
|
def get_hostname(self):
|
||||||
|
|
Loading…
Reference in New Issue