webssh/tests/test_handler.py

280 lines
9.6 KiB
Python
Raw Normal View History

2018-04-24 01:59:51 +00:00
import unittest
2018-04-25 13:05:01 +00:00
import paramiko
2018-04-24 01:59:51 +00:00
2018-04-24 02:57:25 +00:00
from tornado.httputil import HTTPServerRequest
from tornado.options import options
from tests.utils import read_file, make_tests_data_path
from webssh import handler
from webssh.handler import (
2019-06-27 04:52:19 +00:00
MixinHandler, WsockHandler, PrivateKey, InvalidValueError
)
2018-04-24 01:59:51 +00:00
try:
from unittest.mock import Mock
except ImportError:
from mock import Mock
2018-04-24 01:59:51 +00:00
class TestMixinHandler(unittest.TestCase):
def test_is_forbidden(self):
mhandler = MixinHandler()
2019-01-16 14:58:49 +00:00
handler.redirecting = True
options.fbidhttp = True
2018-10-19 10:18:55 +00:00
context = Mock(
address=('8.8.8.8', 8888),
trusted_downstream=['127.0.0.1'],
_orig_protocol='http'
)
2019-01-16 14:58:49 +00:00
hostname = '4.4.4.4'
self.assertTrue(mhandler.is_forbidden(context, hostname))
2018-10-19 10:18:55 +00:00
context = Mock(
address=('8.8.8.8', 8888),
trusted_downstream=[],
_orig_protocol='http'
)
2018-10-21 06:07:44 +00:00
hostname = 'www.google.com'
self.assertEqual(mhandler.is_forbidden(context, hostname), False)
2018-10-19 10:18:55 +00:00
context = Mock(
address=('8.8.8.8', 8888),
trusted_downstream=[],
2019-01-16 14:58:49 +00:00
_orig_protocol='http'
)
2019-01-16 14:58:49 +00:00
hostname = '4.4.4.4'
self.assertTrue(mhandler.is_forbidden(context, hostname))
2018-10-21 06:07:44 +00:00
context = Mock(
2019-01-16 14:58:49 +00:00
address=('192.168.1.1', 8888),
2018-10-21 06:07:44 +00:00
trusted_downstream=[],
_orig_protocol='http'
)
2019-01-16 14:58:49 +00:00
hostname = 'www.google.com'
self.assertIsNone(mhandler.is_forbidden(context, hostname))
options.fbidhttp = False
self.assertIsNone(mhandler.is_forbidden(context, hostname))
2019-01-16 14:58:49 +00:00
hostname = '4.4.4.4'
self.assertIsNone(mhandler.is_forbidden(context, hostname))
handler.redirecting = False
self.assertIsNone(mhandler.is_forbidden(context, hostname))
context._orig_protocol = 'https'
self.assertIsNone(mhandler.is_forbidden(context, hostname))
2018-10-21 06:07:44 +00:00
def test_get_redirect_url(self):
mhandler = MixinHandler()
2018-10-21 06:07:44 +00:00
hostname = 'www.example.com'
uri = '/'
port = 443
2018-10-21 06:45:58 +00:00
self.assertEqual(
mhandler.get_redirect_url(hostname, port, uri=uri),
2018-10-21 06:07:44 +00:00
'https://www.example.com/'
)
port = 4433
2018-10-21 06:45:58 +00:00
self.assertEqual(
mhandler.get_redirect_url(hostname, port, uri),
2018-10-21 06:07:44 +00:00
'https://www.example.com:4433/'
)
2018-10-18 12:25:30 +00:00
def test_get_client_addr(self):
mhandler = MixinHandler()
2018-10-18 12:25:30 +00:00
client_addr = ('8.8.8.8', 8888)
context_addr = ('127.0.0.1', 1234)
options.xheaders = True
mhandler.context = Mock(address=context_addr)
mhandler.get_real_client_addr = lambda: None
self.assertEqual(mhandler.get_client_addr(), context_addr)
2018-10-18 12:25:30 +00:00
mhandler.context = Mock(address=context_addr)
mhandler.get_real_client_addr = lambda: client_addr
self.assertEqual(mhandler.get_client_addr(), client_addr)
2018-10-18 12:25:30 +00:00
options.xheaders = False
mhandler.context = Mock(address=context_addr)
mhandler.get_real_client_addr = lambda: client_addr
self.assertEqual(mhandler.get_client_addr(), context_addr)
2018-10-18 12:25:30 +00:00
2018-04-25 11:01:54 +00:00
def test_get_real_client_addr(self):
2018-10-10 02:51:40 +00:00
x_forwarded_for = '1.1.1.1'
x_forwarded_port = 1111
x_real_ip = '2.2.2.2'
x_real_port = 2222
fake_port = 65535
mhandler = MixinHandler()
mhandler.request = HTTPServerRequest(uri='/')
mhandler.request.remote_ip = x_forwarded_for
2018-10-10 02:51:40 +00:00
self.assertIsNone(mhandler.get_real_client_addr())
2018-04-24 01:59:51 +00:00
mhandler.request.headers.add('X-Forwarded-For', x_forwarded_for)
self.assertEqual(mhandler.get_real_client_addr(),
2018-10-10 02:51:40 +00:00
(x_forwarded_for, fake_port))
mhandler.request.headers.add('X-Forwarded-Port', fake_port + 1)
self.assertEqual(mhandler.get_real_client_addr(),
2018-10-10 02:51:40 +00:00
(x_forwarded_for, fake_port))
mhandler.request.headers['X-Forwarded-Port'] = x_forwarded_port
self.assertEqual(mhandler.get_real_client_addr(),
2018-10-10 02:51:40 +00:00
(x_forwarded_for, x_forwarded_port))
2018-04-24 02:23:45 +00:00
mhandler.request.remote_ip = x_real_ip
2018-04-24 02:23:45 +00:00
mhandler.request.headers.add('X-Real-Ip', x_real_ip)
self.assertEqual(mhandler.get_real_client_addr(),
2018-10-10 02:51:40 +00:00
(x_real_ip, fake_port))
2018-04-24 01:59:51 +00:00
mhandler.request.headers.add('X-Real-Port', fake_port + 1)
self.assertEqual(mhandler.get_real_client_addr(),
2018-10-10 02:51:40 +00:00
(x_real_ip, fake_port))
2018-04-25 11:01:54 +00:00
mhandler.request.headers['X-Real-Port'] = x_real_port
self.assertEqual(mhandler.get_real_client_addr(),
2018-10-10 02:51:40 +00:00
(x_real_ip, x_real_port))
2018-04-25 13:05:01 +00:00
2019-06-27 04:52:19 +00:00
class TestPrivateKey(unittest.TestCase):
2018-04-25 13:05:01 +00:00
2019-06-27 04:52:19 +00:00
def get_pk_obj(self, fname, password=None):
key = read_file(make_tests_data_path(fname))
2019-06-27 04:52:19 +00:00
return PrivateKey(key, password=password, filename=fname)
2019-06-27 04:52:19 +00:00
def _test_with_encrypted_key(self, fname, password, klass):
pk = self.get_pk_obj(fname, password='')
with self.assertRaises(InvalidValueError) as ctx:
pk.get_pkey_obj()
2019-09-06 17:54:24 +00:00
self.assertIn('Need a passphrase', str(ctx.exception))
2019-06-27 04:52:19 +00:00
pk = self.get_pk_obj(fname, password='wrongpass')
with self.assertRaises(InvalidValueError) as ctx:
pk.get_pkey_obj()
2019-09-06 18:19:24 +00:00
self.assertIn('wrong passphrase', str(ctx.exception))
2019-06-27 04:52:19 +00:00
pk = self.get_pk_obj(fname, password=password)
self.assertIsInstance(pk.get_pkey_obj(), klass)
2018-04-25 13:05:01 +00:00
2019-06-27 04:52:19 +00:00
def test_class_with_invalid_key_length(self):
key = u'a' * (PrivateKey.max_length + 1)
2018-04-25 13:05:01 +00:00
2019-06-27 04:52:19 +00:00
with self.assertRaises(InvalidValueError) as ctx:
PrivateKey(key)
self.assertIn('Invalid key length', str(ctx.exception))
2019-06-27 04:52:19 +00:00
def test_get_pkey_obj_with_invalid_key(self):
key = u'a b c'
fname = 'abc'
2018-04-25 13:05:01 +00:00
2019-06-27 04:52:19 +00:00
pk = PrivateKey(key, filename=fname)
with self.assertRaises(InvalidValueError) as ctx:
2019-06-27 04:52:19 +00:00
pk.get_pkey_obj()
self.assertIn('Invalid key {}'.format(fname), str(ctx.exception))
2018-04-25 13:05:01 +00:00
2019-06-27 04:52:19 +00:00
def test_get_pkey_obj_with_plain_rsa_key(self):
pk = self.get_pk_obj('test_rsa.key')
self.assertIsInstance(pk.get_pkey_obj(), paramiko.RSAKey)
2019-06-27 04:52:19 +00:00
def test_get_pkey_obj_with_plain_ed25519_key(self):
pk = self.get_pk_obj('test_ed25519.key')
self.assertIsInstance(pk.get_pkey_obj(), paramiko.Ed25519Key)
2019-06-27 04:52:19 +00:00
def test_get_pkey_obj_with_encrypted_rsa_key(self):
fname = 'test_rsa_password.key'
password = 'television'
self._test_with_encrypted_key(fname, password, paramiko.RSAKey)
2018-04-25 13:05:01 +00:00
2019-06-27 04:52:19 +00:00
def test_get_pkey_obj_with_encrypted_ed25519_key(self):
2018-04-25 13:05:01 +00:00
fname = 'test_ed25519_password.key'
password = 'abc123'
2019-06-27 04:52:19 +00:00
self._test_with_encrypted_key(fname, password, paramiko.Ed25519Key)
2019-12-11 01:32:42 +00:00
def test_get_pkey_obj_with_encrypted_new_rsa_key(self):
fname = 'test_new_rsa_password.key'
password = '123456'
self._test_with_encrypted_key(fname, password, paramiko.RSAKey)
def test_get_pkey_obj_with_plain_new_dsa_key(self):
pk = self.get_pk_obj('test_new_dsa.key')
self.assertIsInstance(pk.get_pkey_obj(), paramiko.DSSKey)
2019-07-01 14:10:49 +00:00
def test_parse_name(self):
key = u'-----BEGIN PRIVATE KEY-----'
pk = PrivateKey(key)
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
self.assertIsNone(name)
key = u'-----BEGIN xxx PRIVATE KEY-----'
pk = PrivateKey(key)
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
self.assertIsNone(name)
key = u'-----BEGIN RSA PRIVATE KEY-----'
pk = PrivateKey(key)
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
self.assertIsNone(name)
key = u'-----BEGIN RSA PRIVATE KEY-----'
pk = PrivateKey(key)
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
self.assertIsNone(name)
key = u'-----BEGIN RSA PRIVATE KEY-----'
pk = PrivateKey(key)
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
self.assertIsNone(name)
for tag, to_name in PrivateKey.tag_to_name.items():
key = u'-----BEGIN {} PRIVATE KEY----- \r\n'.format(tag)
pk = PrivateKey(key)
name, length = pk.parse_name(pk.iostr, pk.tag_to_name)
self.assertEqual(name, to_name)
self.assertEqual(length, len(key))
class TestWsockHandler(unittest.TestCase):
def test_check_origin(self):
request = HTTPServerRequest(uri='/')
obj = Mock(spec=WsockHandler, request=request)
2019-01-19 08:46:25 +00:00
obj.origin_policy = 'same'
request.headers['Host'] = 'www.example.com:4433'
origin = 'https://www.example.com:4433'
self.assertTrue(WsockHandler.check_origin(obj, origin))
origin = 'https://www.example.com'
self.assertFalse(WsockHandler.check_origin(obj, origin))
2019-01-19 08:46:25 +00:00
obj.origin_policy = 'primary'
self.assertTrue(WsockHandler.check_origin(obj, origin))
origin = 'https://blog.example.com'
self.assertTrue(WsockHandler.check_origin(obj, origin))
origin = 'https://blog.example.org'
self.assertFalse(WsockHandler.check_origin(obj, origin))
2019-01-19 08:46:25 +00:00
origin = 'https://blog.example.org'
obj.origin_policy = {'https://blog.example.org'}
self.assertTrue(WsockHandler.check_origin(obj, origin))
origin = 'http://blog.example.org'
obj.origin_policy = {'http://blog.example.org'}
self.assertTrue(WsockHandler.check_origin(obj, origin))
origin = 'http://blog.example.org'
obj.origin_policy = {'https://blog.example.org'}
self.assertFalse(WsockHandler.check_origin(obj, origin))
obj.origin_policy = '*'
origin = 'https://blog.example.org'
self.assertTrue(WsockHandler.check_origin(obj, origin))