mirror of https://github.com/huashengdun/webssh
Updated test_app.py
parent
646621279b
commit
32f303474e
|
@ -0,0 +1 @@
|
||||||
|
[127.0.0.1]:2222 ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAINwZGQmNFADnAAlm5uFLQTrdxqpNxHdgg4JPbB3sR2kr
|
|
@ -443,6 +443,7 @@ class OtherTestBase(AsyncHTTPTestCase):
|
||||||
headers = {'Cookie': '_xsrf=yummy'}
|
headers = {'Cookie': '_xsrf=yummy'}
|
||||||
debug = False
|
debug = False
|
||||||
policy = None
|
policy = None
|
||||||
|
xsrf = True
|
||||||
hostfile = ''
|
hostfile = ''
|
||||||
syshostfile = ''
|
syshostfile = ''
|
||||||
tdstream = ''
|
tdstream = ''
|
||||||
|
@ -458,6 +459,7 @@ class OtherTestBase(AsyncHTTPTestCase):
|
||||||
self.body.update(port=str(self.sshserver_port))
|
self.body.update(port=str(self.sshserver_port))
|
||||||
loop = self.io_loop
|
loop = self.io_loop
|
||||||
options.debug = self.debug
|
options.debug = self.debug
|
||||||
|
options.xsrf = self.xsrf
|
||||||
options.policy = self.policy if self.policy else random.choice(['warning', 'autoadd']) # noqa
|
options.policy = self.policy if self.policy else random.choice(['warning', 'autoadd']) # noqa
|
||||||
options.hostfile = self.hostfile
|
options.hostfile = self.hostfile
|
||||||
options.syshostfile = self.syshostfile
|
options.syshostfile = self.syshostfile
|
||||||
|
@ -486,7 +488,7 @@ class OtherTestBase(AsyncHTTPTestCase):
|
||||||
super(OtherTestBase, self).tearDown()
|
super(OtherTestBase, self).tearDown()
|
||||||
|
|
||||||
|
|
||||||
class TestAppInDebug(OtherTestBase):
|
class TestAppInDebugMode(OtherTestBase):
|
||||||
|
|
||||||
debug = True
|
debug = True
|
||||||
|
|
||||||
|
@ -512,7 +514,7 @@ class TestAppInDebug(OtherTestBase):
|
||||||
self.assertIn(b'novalidate>', response.body)
|
self.assertIn(b'novalidate>', response.body)
|
||||||
|
|
||||||
|
|
||||||
class TestAppMiscell(OtherTestBase):
|
class TestAppWithLargeBuffer(OtherTestBase):
|
||||||
|
|
||||||
@tornado.testing.gen_test
|
@tornado.testing.gen_test
|
||||||
def test_app_for_sending_message_with_large_size(self):
|
def test_app_for_sending_message_with_large_size(self):
|
||||||
|
@ -564,6 +566,28 @@ class TestAppWithRejectPolicy(OtherTestBase):
|
||||||
self.assertEqual(message, data['status'])
|
self.assertEqual(message, data['status'])
|
||||||
|
|
||||||
|
|
||||||
|
class TestAppWithBadHostKey(OtherTestBase):
|
||||||
|
|
||||||
|
policy = random.choice(['warning', 'autoadd', 'reject'])
|
||||||
|
hostfile = make_tests_data_path('test_known_hosts')
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.sshserver_port = 2222
|
||||||
|
super(TestAppWithBadHostKey, self).setUp()
|
||||||
|
|
||||||
|
@tornado.testing.gen_test
|
||||||
|
def test_app_with_bad_host_key(self):
|
||||||
|
url = self.get_url('/')
|
||||||
|
client = self.get_http_client()
|
||||||
|
body = urlencode(dict(self.body, username='foo'))
|
||||||
|
response = yield client.fetch(url, method='POST', body=body,
|
||||||
|
headers=self.headers)
|
||||||
|
data = json.loads(to_str(response.body))
|
||||||
|
self.assertIsNone(data['id'])
|
||||||
|
self.assertIsNone(data['encoding'])
|
||||||
|
self.assertEqual('Bad host key.', data['status'])
|
||||||
|
|
||||||
|
|
||||||
class TestAppWithTrustedStream(OtherTestBase):
|
class TestAppWithTrustedStream(OtherTestBase):
|
||||||
tdstream = '127.0.0.2'
|
tdstream = '127.0.0.2'
|
||||||
|
|
||||||
|
@ -616,7 +640,7 @@ class TestAppNotFoundHandler(OtherTestBase):
|
||||||
self.assertIn(b'404: Not Found', response.body)
|
self.assertIn(b'404: Not Found', response.body)
|
||||||
|
|
||||||
|
|
||||||
class TestAppHeadRequest(OtherTestBase):
|
class TestAppWithHeadRequest(OtherTestBase):
|
||||||
|
|
||||||
def test_with_index_path(self):
|
def test_with_index_path(self):
|
||||||
response = self.fetch('/', method='HEAD')
|
response = self.fetch('/', method='HEAD')
|
||||||
|
@ -629,3 +653,20 @@ class TestAppHeadRequest(OtherTestBase):
|
||||||
def test_with_not_found_path(self):
|
def test_with_not_found_path(self):
|
||||||
response = self.fetch('/notfound', method='HEAD')
|
response = self.fetch('/notfound', method='HEAD')
|
||||||
self.assertEqual(response.code, 404)
|
self.assertEqual(response.code, 404)
|
||||||
|
|
||||||
|
|
||||||
|
class TestAppWithPutRequest(OtherTestBase):
|
||||||
|
|
||||||
|
xsrf = False
|
||||||
|
|
||||||
|
@tornado.testing.gen_test
|
||||||
|
def test_app_with_method_not_supported(self):
|
||||||
|
url = self.get_url('/')
|
||||||
|
client = self.get_http_client()
|
||||||
|
body = urlencode(dict(self.body, username='foo'))
|
||||||
|
|
||||||
|
with self.assertRaises(HTTPError) as ctx:
|
||||||
|
yield client.fetch(
|
||||||
|
url, method='PUT', body=body, headers=self.headers
|
||||||
|
)
|
||||||
|
self.assertIn('Method Not Allowed', ctx.exception.message)
|
||||||
|
|
|
@ -158,9 +158,7 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
|
||||||
self.result = dict(id=None, status=None, encoding=None)
|
self.result = dict(id=None, status=None, encoding=None)
|
||||||
|
|
||||||
def write_error(self, status_code, **kwargs):
|
def write_error(self, status_code, **kwargs):
|
||||||
if self.request.method != 'POST' or not swallow_http_errors:
|
if swallow_http_errors and self.request.method == 'POST':
|
||||||
super(IndexHandler, self).write_error(status_code, **kwargs)
|
|
||||||
else:
|
|
||||||
exc_info = kwargs.get('exc_info')
|
exc_info = kwargs.get('exc_info')
|
||||||
if exc_info:
|
if exc_info:
|
||||||
reason = getattr(exc_info[1], 'log_message', None)
|
reason = getattr(exc_info[1], 'log_message', None)
|
||||||
|
@ -169,6 +167,8 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
|
||||||
self.result.update(status=self._reason)
|
self.result.update(status=self._reason)
|
||||||
self.set_status(200)
|
self.set_status(200)
|
||||||
self.finish(self.result)
|
self.finish(self.result)
|
||||||
|
else:
|
||||||
|
super(IndexHandler, self).write_error(status_code, **kwargs)
|
||||||
|
|
||||||
def get_ssh_client(self):
|
def get_ssh_client(self):
|
||||||
ssh = paramiko.SSHClient()
|
ssh = paramiko.SSHClient()
|
||||||
|
|
|
@ -33,6 +33,7 @@ define('redirect', type=bool, default=True, help='Redirecting http to https')
|
||||||
define('fbidhttp', type=bool, default=True,
|
define('fbidhttp', type=bool, default=True,
|
||||||
help='Forbid public plain http incoming requests')
|
help='Forbid public plain http incoming requests')
|
||||||
define('xheaders', type=bool, default=True, help='Support xheaders')
|
define('xheaders', type=bool, default=True, help='Support xheaders')
|
||||||
|
define('xsrf', type=bool, default=True, help='CSRF protection')
|
||||||
define('wpintvl', type=int, default=0, help='Websocket ping interval')
|
define('wpintvl', type=int, default=0, help='Websocket ping interval')
|
||||||
define('version', type=bool, help='Show version information',
|
define('version', type=bool, help='Show version information',
|
||||||
callback=print_version)
|
callback=print_version)
|
||||||
|
@ -48,7 +49,7 @@ def get_app_settings(options):
|
||||||
static_path=os.path.join(base_dir, 'webssh', 'static'),
|
static_path=os.path.join(base_dir, 'webssh', 'static'),
|
||||||
websocket_ping_interval=options.wpintvl,
|
websocket_ping_interval=options.wpintvl,
|
||||||
debug=options.debug,
|
debug=options.debug,
|
||||||
xsrf_cookies=True
|
xsrf_cookies=options.xsrf
|
||||||
)
|
)
|
||||||
return settings
|
return settings
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue