From 32f303474e2664a49b0668020dc2ad33b7095117 Mon Sep 17 00:00:00 2001 From: Sheng Date: Thu, 15 Nov 2018 20:23:42 +0800 Subject: [PATCH] Updated test_app.py --- tests/data/test_known_hosts | 1 + tests/test_app.py | 47 ++++++++++++++++++++++++++++++++++--- webssh/handler.py | 6 ++--- webssh/settings.py | 3 ++- 4 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 tests/data/test_known_hosts diff --git a/tests/data/test_known_hosts b/tests/data/test_known_hosts new file mode 100644 index 0000000..f1413d8 --- /dev/null +++ b/tests/data/test_known_hosts @@ -0,0 +1 @@ +[127.0.0.1]:2222 ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAINwZGQmNFADnAAlm5uFLQTrdxqpNxHdgg4JPbB3sR2kr diff --git a/tests/test_app.py b/tests/test_app.py index c00c003..bfc21aa 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -443,6 +443,7 @@ class OtherTestBase(AsyncHTTPTestCase): headers = {'Cookie': '_xsrf=yummy'} debug = False policy = None + xsrf = True hostfile = '' syshostfile = '' tdstream = '' @@ -458,6 +459,7 @@ class OtherTestBase(AsyncHTTPTestCase): self.body.update(port=str(self.sshserver_port)) loop = self.io_loop options.debug = self.debug + options.xsrf = self.xsrf options.policy = self.policy if self.policy else random.choice(['warning', 'autoadd']) # noqa options.hostfile = self.hostfile options.syshostfile = self.syshostfile @@ -486,7 +488,7 @@ class OtherTestBase(AsyncHTTPTestCase): super(OtherTestBase, self).tearDown() -class TestAppInDebug(OtherTestBase): +class TestAppInDebugMode(OtherTestBase): debug = True @@ -512,7 +514,7 @@ class TestAppInDebug(OtherTestBase): self.assertIn(b'novalidate>', response.body) -class TestAppMiscell(OtherTestBase): +class TestAppWithLargeBuffer(OtherTestBase): @tornado.testing.gen_test def test_app_for_sending_message_with_large_size(self): @@ -564,6 +566,28 @@ class TestAppWithRejectPolicy(OtherTestBase): self.assertEqual(message, data['status']) +class TestAppWithBadHostKey(OtherTestBase): + + policy = random.choice(['warning', 'autoadd', 'reject']) + hostfile = make_tests_data_path('test_known_hosts') + + def setUp(self): + self.sshserver_port = 2222 + super(TestAppWithBadHostKey, self).setUp() + + @tornado.testing.gen_test + def test_app_with_bad_host_key(self): + url = self.get_url('/') + client = self.get_http_client() + body = urlencode(dict(self.body, username='foo')) + response = yield client.fetch(url, method='POST', body=body, + headers=self.headers) + data = json.loads(to_str(response.body)) + self.assertIsNone(data['id']) + self.assertIsNone(data['encoding']) + self.assertEqual('Bad host key.', data['status']) + + class TestAppWithTrustedStream(OtherTestBase): tdstream = '127.0.0.2' @@ -616,7 +640,7 @@ class TestAppNotFoundHandler(OtherTestBase): self.assertIn(b'404: Not Found', response.body) -class TestAppHeadRequest(OtherTestBase): +class TestAppWithHeadRequest(OtherTestBase): def test_with_index_path(self): response = self.fetch('/', method='HEAD') @@ -629,3 +653,20 @@ class TestAppHeadRequest(OtherTestBase): def test_with_not_found_path(self): response = self.fetch('/notfound', method='HEAD') self.assertEqual(response.code, 404) + + +class TestAppWithPutRequest(OtherTestBase): + + xsrf = False + + @tornado.testing.gen_test + def test_app_with_method_not_supported(self): + url = self.get_url('/') + client = self.get_http_client() + body = urlencode(dict(self.body, username='foo')) + + with self.assertRaises(HTTPError) as ctx: + yield client.fetch( + url, method='PUT', body=body, headers=self.headers + ) + self.assertIn('Method Not Allowed', ctx.exception.message) diff --git a/webssh/handler.py b/webssh/handler.py index f9bfd0d..680a40b 100644 --- a/webssh/handler.py +++ b/webssh/handler.py @@ -158,9 +158,7 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler): self.result = dict(id=None, status=None, encoding=None) def write_error(self, status_code, **kwargs): - if self.request.method != 'POST' or not swallow_http_errors: - super(IndexHandler, self).write_error(status_code, **kwargs) - else: + if swallow_http_errors and self.request.method == 'POST': exc_info = kwargs.get('exc_info') if exc_info: reason = getattr(exc_info[1], 'log_message', None) @@ -169,6 +167,8 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler): self.result.update(status=self._reason) self.set_status(200) self.finish(self.result) + else: + super(IndexHandler, self).write_error(status_code, **kwargs) def get_ssh_client(self): ssh = paramiko.SSHClient() diff --git a/webssh/settings.py b/webssh/settings.py index 4054fba..ae3a9d4 100644 --- a/webssh/settings.py +++ b/webssh/settings.py @@ -33,6 +33,7 @@ define('redirect', type=bool, default=True, help='Redirecting http to https') define('fbidhttp', type=bool, default=True, help='Forbid public plain http incoming requests') define('xheaders', type=bool, default=True, help='Support xheaders') +define('xsrf', type=bool, default=True, help='CSRF protection') define('wpintvl', type=int, default=0, help='Websocket ping interval') define('version', type=bool, help='Show version information', callback=print_version) @@ -48,7 +49,7 @@ def get_app_settings(options): static_path=os.path.join(base_dir, 'webssh', 'static'), websocket_ping_interval=options.wpintvl, debug=options.debug, - xsrf_cookies=True + xsrf_cookies=options.xsrf ) return settings