import io import random import ssl import sys import os.path import unittest import paramiko import tornado.options as options from tests.utils import make_tests_data_path from webssh.policy import load_host_keys from webssh.settings import ( get_host_keys_settings, get_policy_setting, base_dir, print_version, get_ssl_context, get_trusted_downstream, get_origin_setting ) from webssh.utils import UnicodeType from webssh._version import __version__ class TestSettings(unittest.TestCase): def test_print_version(self): sys_stdout = sys.stdout sys.stdout = io.StringIO() if UnicodeType == str else io.BytesIO() self.assertEqual(print_version(False), None) self.assertEqual(sys.stdout.getvalue(), '') with self.assertRaises(SystemExit): self.assertEqual(print_version(True), None) self.assertEqual(sys.stdout.getvalue(), __version__ + '\n') sys.stdout = sys_stdout def test_get_host_keys_settings(self): options.hostfile = '' options.syshostfile = '' dic = get_host_keys_settings(options) filename = os.path.join(base_dir, 'known_hosts') self.assertEqual(dic['host_keys'], load_host_keys(filename)) self.assertEqual(dic['host_keys_filename'], filename) self.assertEqual( dic['system_host_keys'], load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) ) options.hostfile = make_tests_data_path('known_hosts_example') options.syshostfile = make_tests_data_path('known_hosts_example2') dic2 = get_host_keys_settings(options) self.assertEqual(dic2['host_keys'], load_host_keys(options.hostfile)) self.assertEqual(dic2['host_keys_filename'], options.hostfile) self.assertEqual(dic2['system_host_keys'], load_host_keys(options.syshostfile)) def test_get_policy_setting(self): options.policy = 'warning' options.hostfile = '' options.syshostfile = '' settings = get_host_keys_settings(options) instance = get_policy_setting(options, settings) self.assertIsInstance(instance, paramiko.client.WarningPolicy) options.policy = 'autoadd' options.hostfile = '' options.syshostfile = '' settings = get_host_keys_settings(options) instance = get_policy_setting(options, settings) self.assertIsInstance(instance, paramiko.client.AutoAddPolicy) os.unlink(settings['host_keys_filename']) options.policy = 'reject' options.hostfile = '' options.syshostfile = '' settings = get_host_keys_settings(options) try: instance = get_policy_setting(options, settings) except ValueError: self.assertFalse( settings['host_keys'] and settings['system_host_keys'] ) else: self.assertIsInstance(instance, paramiko.client.RejectPolicy) def test_get_ssl_context(self): options.certfile = '' options.keyfile = '' ssl_ctx = get_ssl_context(options) self.assertIsNone(ssl_ctx) options.certfile = 'provided' options.keyfile = '' with self.assertRaises(ValueError) as ctx: ssl_ctx = get_ssl_context(options) self.assertEqual('keyfile is not provided', str(ctx.exception)) options.certfile = '' options.keyfile = 'provided' with self.assertRaises(ValueError) as ctx: ssl_ctx = get_ssl_context(options) self.assertEqual('certfile is not provided', str(ctx.exception)) options.certfile = 'FileDoesNotExist' options.keyfile = make_tests_data_path('cert.key') with self.assertRaises(ValueError) as ctx: ssl_ctx = get_ssl_context(options) self.assertIn('does not exist', str(ctx.exception)) options.certfile = make_tests_data_path('cert.key') options.keyfile = 'FileDoesNotExist' with self.assertRaises(ValueError) as ctx: ssl_ctx = get_ssl_context(options) self.assertIn('does not exist', str(ctx.exception)) options.certfile = make_tests_data_path('cert.key') options.keyfile = make_tests_data_path('cert.key') with self.assertRaises(ssl.SSLError) as ctx: ssl_ctx = get_ssl_context(options) options.certfile = make_tests_data_path('cert.crt') options.keyfile = make_tests_data_path('cert.key') ssl_ctx = get_ssl_context(options) self.assertIsNotNone(ssl_ctx) def test_get_trusted_downstream(self): tdstream = '' result = set() self.assertEqual(get_trusted_downstream(tdstream), result) tdstream = '1.1.1.1, 2.2.2.2' result = set(['1.1.1.1', '2.2.2.2']) self.assertEqual(get_trusted_downstream(tdstream), result) tdstream = '1.1.1.1, 2.2.2.2, 2.2.2.2' result = set(['1.1.1.1', '2.2.2.2']) self.assertEqual(get_trusted_downstream(tdstream), result) tdstream = '1.1.1.1, 2.2.2.' with self.assertRaises(ValueError): get_trusted_downstream(tdstream) def test_get_origin_setting(self): options.debug = False options.origin = '*' with self.assertRaises(ValueError): get_origin_setting(options) options.debug = True self.assertEqual(get_origin_setting(options), '*') options.origin = random.choice(['Same', 'Primary']) self.assertEqual(get_origin_setting(options), options.origin.lower()) options.origin = '' with self.assertRaises(ValueError): get_origin_setting(options) options.origin = ',' with self.assertRaises(ValueError): get_origin_setting(options) options.origin = 'www.example.com, https://www.example.org' result = {'http://www.example.com', 'https://www.example.org'} self.assertEqual(get_origin_setting(options), result) options.origin = 'www.example.com:80, www.example.org:443' result = {'http://www.example.com', 'https://www.example.org'} self.assertEqual(get_origin_setting(options), result)