mirror of https://github.com/huashengdun/webssh
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
341 lines
12 KiB
341 lines
12 KiB
import io
|
|
import unittest
|
|
import paramiko
|
|
|
|
from tornado.httputil import HTTPServerRequest
|
|
from tornado.options import options
|
|
from tests.utils import read_file, make_tests_data_path
|
|
from webssh import handler
|
|
from webssh import worker
|
|
from webssh.handler import (
|
|
IndexHandler, MixinHandler, WsockHandler, PrivateKey, InvalidValueError, SSHClient
|
|
)
|
|
|
|
try:
|
|
from unittest.mock import Mock
|
|
except ImportError:
|
|
from mock import Mock
|
|
|
|
|
|
class TestMixinHandler(unittest.TestCase):
|
|
|
|
def test_is_forbidden(self):
|
|
mhandler = MixinHandler()
|
|
handler.redirecting = True
|
|
options.fbidhttp = True
|
|
|
|
context = Mock(
|
|
address=('8.8.8.8', 8888),
|
|
trusted_downstream=['127.0.0.1'],
|
|
_orig_protocol='http'
|
|
)
|
|
hostname = '4.4.4.4'
|
|
self.assertTrue(mhandler.is_forbidden(context, hostname))
|
|
|
|
context = Mock(
|
|
address=('8.8.8.8', 8888),
|
|
trusted_downstream=[],
|
|
_orig_protocol='http'
|
|
)
|
|
hostname = 'www.google.com'
|
|
self.assertEqual(mhandler.is_forbidden(context, hostname), False)
|
|
|
|
context = Mock(
|
|
address=('8.8.8.8', 8888),
|
|
trusted_downstream=[],
|
|
_orig_protocol='http'
|
|
)
|
|
hostname = '4.4.4.4'
|
|
self.assertTrue(mhandler.is_forbidden(context, hostname))
|
|
|
|
context = Mock(
|
|
address=('192.168.1.1', 8888),
|
|
trusted_downstream=[],
|
|
_orig_protocol='http'
|
|
)
|
|
hostname = 'www.google.com'
|
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
|
|
|
options.fbidhttp = False
|
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
|
|
|
hostname = '4.4.4.4'
|
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
|
|
|
handler.redirecting = False
|
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
|
|
|
context._orig_protocol = 'https'
|
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
|
|
|
def test_get_redirect_url(self):
|
|
mhandler = MixinHandler()
|
|
hostname = 'www.example.com'
|
|
uri = '/'
|
|
port = 443
|
|
|
|
self.assertEqual(
|
|
mhandler.get_redirect_url(hostname, port, uri=uri),
|
|
'https://www.example.com/'
|
|
)
|
|
|
|
port = 4433
|
|
self.assertEqual(
|
|
mhandler.get_redirect_url(hostname, port, uri),
|
|
'https://www.example.com:4433/'
|
|
)
|
|
|
|
def test_get_client_addr(self):
|
|
mhandler = MixinHandler()
|
|
client_addr = ('8.8.8.8', 8888)
|
|
context_addr = ('127.0.0.1', 1234)
|
|
options.xheaders = True
|
|
|
|
mhandler.context = Mock(address=context_addr)
|
|
mhandler.get_real_client_addr = lambda: None
|
|
self.assertEqual(mhandler.get_client_addr(), context_addr)
|
|
|
|
mhandler.context = Mock(address=context_addr)
|
|
mhandler.get_real_client_addr = lambda: client_addr
|
|
self.assertEqual(mhandler.get_client_addr(), client_addr)
|
|
|
|
options.xheaders = False
|
|
mhandler.context = Mock(address=context_addr)
|
|
mhandler.get_real_client_addr = lambda: client_addr
|
|
self.assertEqual(mhandler.get_client_addr(), context_addr)
|
|
|
|
def test_get_real_client_addr(self):
|
|
x_forwarded_for = '1.1.1.1'
|
|
x_forwarded_port = 1111
|
|
x_real_ip = '2.2.2.2'
|
|
x_real_port = 2222
|
|
fake_port = 65535
|
|
|
|
mhandler = MixinHandler()
|
|
mhandler.request = HTTPServerRequest(uri='/')
|
|
mhandler.request.remote_ip = x_forwarded_for
|
|
|
|
self.assertIsNone(mhandler.get_real_client_addr())
|
|
|
|
mhandler.request.headers.add('X-Forwarded-For', x_forwarded_for)
|
|
self.assertEqual(mhandler.get_real_client_addr(),
|
|
(x_forwarded_for, fake_port))
|
|
|
|
mhandler.request.headers.add('X-Forwarded-Port', fake_port + 1)
|
|
self.assertEqual(mhandler.get_real_client_addr(),
|
|
(x_forwarded_for, fake_port))
|
|
|
|
mhandler.request.headers['X-Forwarded-Port'] = x_forwarded_port
|
|
self.assertEqual(mhandler.get_real_client_addr(),
|
|
(x_forwarded_for, x_forwarded_port))
|
|
|
|
mhandler.request.remote_ip = x_real_ip
|
|
|
|
mhandler.request.headers.add('X-Real-Ip', x_real_ip)
|
|
self.assertEqual(mhandler.get_real_client_addr(),
|
|
(x_real_ip, fake_port))
|
|
|
|
mhandler.request.headers.add('X-Real-Port', fake_port + 1)
|
|
self.assertEqual(mhandler.get_real_client_addr(),
|
|
(x_real_ip, fake_port))
|
|
|
|
mhandler.request.headers['X-Real-Port'] = x_real_port
|
|
self.assertEqual(mhandler.get_real_client_addr(),
|
|
(x_real_ip, x_real_port))
|
|
|
|
|
|
class TestPrivateKey(unittest.TestCase):
|
|
|
|
def get_pk_obj(self, fname, password=None):
|
|
key = read_file(make_tests_data_path(fname))
|
|
return PrivateKey(key, password=password, filename=fname)
|
|
|
|
def _test_with_encrypted_key(self, fname, password, klass):
|
|
pk = self.get_pk_obj(fname, password='')
|
|
with self.assertRaises(InvalidValueError) as ctx:
|
|
pk.get_pkey_obj()
|
|
self.assertIn('Need a passphrase', str(ctx.exception))
|
|
|
|
pk = self.get_pk_obj(fname, password='wrongpass')
|
|
with self.assertRaises(InvalidValueError) as ctx:
|
|
pk.get_pkey_obj()
|
|
self.assertIn('wrong passphrase', str(ctx.exception))
|
|
|
|
pk = self.get_pk_obj(fname, password=password)
|
|
self.assertIsInstance(pk.get_pkey_obj(), klass)
|
|
|
|
def test_class_with_invalid_key_length(self):
|
|
key = u'a' * (PrivateKey.max_length + 1)
|
|
|
|
with self.assertRaises(InvalidValueError) as ctx:
|
|
PrivateKey(key)
|
|
self.assertIn('Invalid key length', str(ctx.exception))
|
|
|
|
def test_get_pkey_obj_with_invalid_key(self):
|
|
key = u'a b c'
|
|
fname = 'abc'
|
|
|
|
pk = PrivateKey(key, filename=fname)
|
|
with self.assertRaises(InvalidValueError) as ctx:
|
|
pk.get_pkey_obj()
|
|
self.assertIn('Invalid key {}'.format(fname), str(ctx.exception))
|
|
|
|
def test_get_pkey_obj_with_plain_rsa_key(self):
|
|
pk = self.get_pk_obj('test_rsa.key')
|
|
self.assertIsInstance(pk.get_pkey_obj(), paramiko.RSAKey)
|
|
|
|
def test_get_pkey_obj_with_plain_ed25519_key(self):
|
|
pk = self.get_pk_obj('test_ed25519.key')
|
|
self.assertIsInstance(pk.get_pkey_obj(), paramiko.Ed25519Key)
|
|
|
|
def test_get_pkey_obj_with_encrypted_rsa_key(self):
|
|
fname = 'test_rsa_password.key'
|
|
password = 'television'
|
|
self._test_with_encrypted_key(fname, password, paramiko.RSAKey)
|
|
|
|
def test_get_pkey_obj_with_encrypted_ed25519_key(self):
|
|
fname = 'test_ed25519_password.key'
|
|
password = 'abc123'
|
|
self._test_with_encrypted_key(fname, password, paramiko.Ed25519Key)
|
|
|
|
def test_get_pkey_obj_with_encrypted_new_rsa_key(self):
|
|
fname = 'test_new_rsa_password.key'
|
|
password = '123456'
|
|
self._test_with_encrypted_key(fname, password, paramiko.RSAKey)
|
|
|
|
def test_get_pkey_obj_with_plain_new_dsa_key(self):
|
|
pk = self.get_pk_obj('test_new_dsa.key')
|
|
self.assertIsInstance(pk.get_pkey_obj(), paramiko.DSSKey)
|
|
|
|
def test_parse_name(self):
|
|
key = u'-----BEGIN PRIVATE KEY-----'
|
|
pk = PrivateKey(key)
|
|
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
|
|
self.assertIsNone(name)
|
|
|
|
key = u'-----BEGIN xxx PRIVATE KEY-----'
|
|
pk = PrivateKey(key)
|
|
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
|
|
self.assertIsNone(name)
|
|
|
|
key = u'-----BEGIN RSA PRIVATE KEY-----'
|
|
pk = PrivateKey(key)
|
|
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
|
|
self.assertIsNone(name)
|
|
|
|
key = u'-----BEGIN RSA PRIVATE KEY-----'
|
|
pk = PrivateKey(key)
|
|
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
|
|
self.assertIsNone(name)
|
|
|
|
key = u'-----BEGIN RSA PRIVATE KEY-----'
|
|
pk = PrivateKey(key)
|
|
name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
|
|
self.assertIsNone(name)
|
|
|
|
for tag, to_name in PrivateKey.tag_to_name.items():
|
|
key = u'-----BEGIN {} PRIVATE KEY----- \r\n'.format(tag)
|
|
pk = PrivateKey(key)
|
|
name, length = pk.parse_name(pk.iostr, pk.tag_to_name)
|
|
self.assertEqual(name, to_name)
|
|
self.assertEqual(length, len(key))
|
|
|
|
|
|
class TestWsockHandler(unittest.TestCase):
|
|
|
|
def test_check_origin(self):
|
|
request = HTTPServerRequest(uri='/')
|
|
obj = Mock(spec=WsockHandler, request=request)
|
|
|
|
obj.origin_policy = 'same'
|
|
request.headers['Host'] = 'www.example.com:4433'
|
|
origin = 'https://www.example.com:4433'
|
|
self.assertTrue(WsockHandler.check_origin(obj, origin))
|
|
|
|
origin = 'https://www.example.com'
|
|
self.assertFalse(WsockHandler.check_origin(obj, origin))
|
|
|
|
obj.origin_policy = 'primary'
|
|
self.assertTrue(WsockHandler.check_origin(obj, origin))
|
|
|
|
origin = 'https://blog.example.com'
|
|
self.assertTrue(WsockHandler.check_origin(obj, origin))
|
|
|
|
origin = 'https://blog.example.org'
|
|
self.assertFalse(WsockHandler.check_origin(obj, origin))
|
|
|
|
origin = 'https://blog.example.org'
|
|
obj.origin_policy = {'https://blog.example.org'}
|
|
self.assertTrue(WsockHandler.check_origin(obj, origin))
|
|
|
|
origin = 'http://blog.example.org'
|
|
obj.origin_policy = {'http://blog.example.org'}
|
|
self.assertTrue(WsockHandler.check_origin(obj, origin))
|
|
|
|
origin = 'http://blog.example.org'
|
|
obj.origin_policy = {'https://blog.example.org'}
|
|
self.assertFalse(WsockHandler.check_origin(obj, origin))
|
|
|
|
obj.origin_policy = '*'
|
|
origin = 'https://blog.example.org'
|
|
self.assertTrue(WsockHandler.check_origin(obj, origin))
|
|
|
|
def test_failed_weak_ref(self):
|
|
request = HTTPServerRequest(uri='/')
|
|
obj = Mock(spec=WsockHandler, request=request)
|
|
obj.src_addr = ("127.0.0.1", 8888)
|
|
|
|
class FakeWeakRef:
|
|
def __init__(self):
|
|
self.count = 0
|
|
|
|
def __call__(self):
|
|
self.count += 1
|
|
return None
|
|
|
|
ref = FakeWeakRef()
|
|
obj.worker_ref = ref
|
|
WsockHandler.on_message(obj, b'{"data": "somestuff"}')
|
|
self.assertGreaterEqual(ref.count, 1)
|
|
obj.close.assert_called_with(reason='No worker found')
|
|
|
|
def test_worker_closed(self):
|
|
request = HTTPServerRequest(uri='/')
|
|
obj = Mock(spec=WsockHandler, request=request)
|
|
obj.src_addr = ("127.0.0.1", 8888)
|
|
|
|
class Worker:
|
|
def __init__(self):
|
|
self.closed = True
|
|
|
|
class FakeWeakRef:
|
|
def __call__(self):
|
|
return Worker()
|
|
|
|
ref = FakeWeakRef()
|
|
obj.worker_ref = ref
|
|
WsockHandler.on_message(obj, b'{"data": "somestuff"}')
|
|
obj.close.assert_called_with(reason='Worker closed')
|
|
|
|
class TestIndexHandler(unittest.TestCase):
|
|
def test_null_in_encoding(self):
|
|
handler = Mock(spec=IndexHandler)
|
|
|
|
# This is a little nasty, but the index handler has a lot of
|
|
# dependencies to mock. Mocking out everything but the bits
|
|
# we want to test lets us test this case without needing to
|
|
# refactor the relevant code out of IndexHandler
|
|
def parse_encoding(data):
|
|
return IndexHandler.parse_encoding(handler, data)
|
|
handler.parse_encoding = parse_encoding
|
|
|
|
ssh = Mock(spec=SSHClient)
|
|
stdin = io.BytesIO()
|
|
stdout = io.BytesIO(initial_bytes=b"UTF-8\0")
|
|
stderr = io.BytesIO()
|
|
ssh.exec_command.return_value = (stdin, stdout, stderr)
|
|
|
|
encoding = IndexHandler.get_default_encoding(handler, ssh)
|
|
self.assertEquals("utf-8", encoding)
|
|
|