mirror of https://github.com/huashengdun/webssh
Refactored method is_forbidden
parent
9f6d900b23
commit
8a8d741230
|
@ -19,30 +19,31 @@ class TestMixinHandler(unittest.TestCase):
|
||||||
|
|
||||||
def test_is_forbidden(self):
|
def test_is_forbidden(self):
|
||||||
mhandler = MixinHandler()
|
mhandler = MixinHandler()
|
||||||
handler.https_server_enabled = True
|
handler.redirecting = True
|
||||||
options.fbidhttp = True
|
options.fbidhttp = True
|
||||||
options.redirect = True
|
|
||||||
|
|
||||||
context = Mock(
|
context = Mock(
|
||||||
address=('8.8.8.8', 8888),
|
address=('8.8.8.8', 8888),
|
||||||
trusted_downstream=['127.0.0.1'],
|
trusted_downstream=['127.0.0.1'],
|
||||||
_orig_protocol='http'
|
_orig_protocol='http'
|
||||||
)
|
)
|
||||||
self.assertTrue(mhandler.is_forbidden(context, ''))
|
hostname = '4.4.4.4'
|
||||||
|
self.assertTrue(mhandler.is_forbidden(context, hostname))
|
||||||
|
|
||||||
context = Mock(
|
context = Mock(
|
||||||
address=('8.8.8.8', 8888),
|
address=('8.8.8.8', 8888),
|
||||||
trusted_downstream=[],
|
trusted_downstream=[],
|
||||||
_orig_protocol='http'
|
_orig_protocol='http'
|
||||||
)
|
)
|
||||||
|
|
||||||
hostname = 'www.google.com'
|
hostname = 'www.google.com'
|
||||||
self.assertEqual(mhandler.is_forbidden(context, hostname), False)
|
self.assertEqual(mhandler.is_forbidden(context, hostname), False)
|
||||||
|
|
||||||
handler.https_server_enabled = False
|
context = Mock(
|
||||||
self.assertTrue(mhandler.is_forbidden(context, hostname))
|
address=('8.8.8.8', 8888),
|
||||||
|
trusted_downstream=[],
|
||||||
options.redirect = False
|
_orig_protocol='http'
|
||||||
|
)
|
||||||
|
hostname = '4.4.4.4'
|
||||||
self.assertTrue(mhandler.is_forbidden(context, hostname))
|
self.assertTrue(mhandler.is_forbidden(context, hostname))
|
||||||
|
|
||||||
context = Mock(
|
context = Mock(
|
||||||
|
@ -50,26 +51,21 @@ class TestMixinHandler(unittest.TestCase):
|
||||||
trusted_downstream=[],
|
trusted_downstream=[],
|
||||||
_orig_protocol='http'
|
_orig_protocol='http'
|
||||||
)
|
)
|
||||||
self.assertIsNone(mhandler.is_forbidden(context, ''))
|
hostname = 'www.google.com'
|
||||||
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
||||||
context = Mock(
|
|
||||||
address=('8.8.8.8', 8888),
|
|
||||||
trusted_downstream=[],
|
|
||||||
_orig_protocol='https'
|
|
||||||
)
|
|
||||||
self.assertIsNone(mhandler.is_forbidden(context, ''))
|
|
||||||
|
|
||||||
context = Mock(
|
|
||||||
address=('8.8.8.8', 8888),
|
|
||||||
trusted_downstream=[],
|
|
||||||
_orig_protocol='http'
|
|
||||||
)
|
|
||||||
hostname = '8.8.8.8'
|
|
||||||
self.assertTrue(mhandler.is_forbidden(context, hostname))
|
|
||||||
|
|
||||||
options.fbidhttp = False
|
options.fbidhttp = False
|
||||||
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
||||||
|
|
||||||
|
hostname = '4.4.4.4'
|
||||||
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
||||||
|
|
||||||
|
handler.redirecting = False
|
||||||
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
||||||
|
|
||||||
|
context._orig_protocol = 'https'
|
||||||
|
self.assertIsNone(mhandler.is_forbidden(context, hostname))
|
||||||
|
|
||||||
def test_get_redirect_url(self):
|
def test_get_redirect_url(self):
|
||||||
mhandler = MixinHandler()
|
mhandler = MixinHandler()
|
||||||
hostname = 'www.example.com'
|
hostname = 'www.example.com'
|
||||||
|
|
|
@ -11,12 +11,12 @@ class TestMain(unittest.TestCase):
|
||||||
app = Application()
|
app = Application()
|
||||||
app.listen = lambda x, y, **kwargs: 1
|
app.listen = lambda x, y, **kwargs: 1
|
||||||
|
|
||||||
handler.https_server_enabled = False
|
handler.redirecting = None
|
||||||
server_settings = dict()
|
server_settings = dict()
|
||||||
app_listen(app, 80, '127.0.0.1', server_settings)
|
app_listen(app, 80, '127.0.0.1', server_settings)
|
||||||
self.assertFalse(handler.https_server_enabled)
|
self.assertFalse(handler.redirecting)
|
||||||
|
|
||||||
handler.https_server_enabled = False
|
handler.redirecting = None
|
||||||
server_settings = dict(ssl_options='enabled')
|
server_settings = dict(ssl_options='enabled')
|
||||||
app_listen(app, 80, '127.0.0.1', server_settings)
|
app_listen(app, 80, '127.0.0.1', server_settings)
|
||||||
self.assertTrue(handler.https_server_enabled)
|
self.assertTrue(handler.redirecting)
|
||||||
|
|
|
@ -38,7 +38,7 @@ KEY_MAX_SIZE = 16384
|
||||||
DEFAULT_PORT = 22
|
DEFAULT_PORT = 22
|
||||||
|
|
||||||
swallow_http_errors = True
|
swallow_http_errors = True
|
||||||
https_server_enabled = False
|
redirecting = None
|
||||||
|
|
||||||
|
|
||||||
class InvalidValueError(Exception):
|
class InvalidValueError(Exception):
|
||||||
|
@ -78,6 +78,7 @@ class MixinHandler(object):
|
||||||
def is_forbidden(self, context, hostname):
|
def is_forbidden(self, context, hostname):
|
||||||
ip = context.address[0]
|
ip = context.address[0]
|
||||||
lst = context.trusted_downstream
|
lst = context.trusted_downstream
|
||||||
|
ip_address = None
|
||||||
|
|
||||||
if lst and ip not in lst:
|
if lst and ip not in lst:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
|
@ -85,15 +86,19 @@ class MixinHandler(object):
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if context._orig_protocol == 'http' and \
|
if context._orig_protocol == 'http':
|
||||||
not to_ip_address(ip).is_private:
|
if redirecting and not is_ip_hostname(hostname):
|
||||||
if options.redirect and https_server_enabled:
|
ip_address = to_ip_address(ip)
|
||||||
if not is_ip_hostname(hostname):
|
if not ip_address.is_private:
|
||||||
# redirecting
|
# redirecting
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if options.fbidhttp:
|
if options.fbidhttp:
|
||||||
logging.warning('Public plain http request is forbidden.')
|
if ip_address is None:
|
||||||
return True
|
ip_address = to_ip_address(ip)
|
||||||
|
if not ip_address.is_private:
|
||||||
|
logging.warning('Public plain http request is forbidden.')
|
||||||
|
return True
|
||||||
|
|
||||||
def get_redirect_url(self, hostname, port, uri):
|
def get_redirect_url(self, hostname, port, uri):
|
||||||
port = '' if port == 443 else ':%s' % port
|
port = '' if port == 443 else ':%s' % port
|
||||||
|
|
|
@ -34,7 +34,7 @@ def app_listen(app, port, address, server_settings):
|
||||||
server_type = 'http'
|
server_type = 'http'
|
||||||
else:
|
else:
|
||||||
server_type = 'https'
|
server_type = 'https'
|
||||||
handler.https_server_enabled = True
|
handler.redirecting = True if options.redirect else False
|
||||||
logging.info(
|
logging.info(
|
||||||
'Listening on {}:{} ({})'.format(address, port, server_type)
|
'Listening on {}:{} ({})'.format(address, port, server_type)
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue