import unittest
import paramiko

from tornado.httputil import HTTPServerRequest
from tornado.options import options
from tests.utils import read_file, make_tests_data_path
from webssh import handler
from webssh import worker
from webssh.handler import (
    MixinHandler, WsockHandler, PrivateKey, InvalidValueError
)

try:
    from unittest.mock import Mock
except ImportError:
    from mock import Mock


class TestMixinHandler(unittest.TestCase):

    def test_is_forbidden(self):
        mhandler = MixinHandler()
        handler.redirecting = True
        options.fbidhttp = True

        context = Mock(
            address=('8.8.8.8', 8888),
            trusted_downstream=['127.0.0.1'],
            _orig_protocol='http'
        )
        hostname = '4.4.4.4'
        self.assertTrue(mhandler.is_forbidden(context, hostname))

        context = Mock(
            address=('8.8.8.8', 8888),
            trusted_downstream=[],
            _orig_protocol='http'
        )
        hostname = 'www.google.com'
        self.assertEqual(mhandler.is_forbidden(context, hostname), False)

        context = Mock(
            address=('8.8.8.8', 8888),
            trusted_downstream=[],
            _orig_protocol='http'
        )
        hostname = '4.4.4.4'
        self.assertTrue(mhandler.is_forbidden(context, hostname))

        context = Mock(
            address=('192.168.1.1', 8888),
            trusted_downstream=[],
            _orig_protocol='http'
        )
        hostname = 'www.google.com'
        self.assertIsNone(mhandler.is_forbidden(context, hostname))

        options.fbidhttp = False
        self.assertIsNone(mhandler.is_forbidden(context, hostname))

        hostname = '4.4.4.4'
        self.assertIsNone(mhandler.is_forbidden(context, hostname))

        handler.redirecting = False
        self.assertIsNone(mhandler.is_forbidden(context, hostname))

        context._orig_protocol = 'https'
        self.assertIsNone(mhandler.is_forbidden(context, hostname))

    def test_get_redirect_url(self):
        mhandler = MixinHandler()
        hostname = 'www.example.com'
        uri = '/'
        port = 443

        self.assertEqual(
            mhandler.get_redirect_url(hostname, port, uri=uri),
            'https://www.example.com/'
        )

        port = 4433
        self.assertEqual(
            mhandler.get_redirect_url(hostname, port, uri),
            'https://www.example.com:4433/'
        )

    def test_get_client_addr(self):
        mhandler = MixinHandler()
        client_addr = ('8.8.8.8', 8888)
        context_addr = ('127.0.0.1', 1234)
        options.xheaders = True

        mhandler.context = Mock(address=context_addr)
        mhandler.get_real_client_addr = lambda: None
        self.assertEqual(mhandler.get_client_addr(), context_addr)

        mhandler.context = Mock(address=context_addr)
        mhandler.get_real_client_addr = lambda: client_addr
        self.assertEqual(mhandler.get_client_addr(), client_addr)

        options.xheaders = False
        mhandler.context = Mock(address=context_addr)
        mhandler.get_real_client_addr = lambda: client_addr
        self.assertEqual(mhandler.get_client_addr(), context_addr)

    def test_get_real_client_addr(self):
        x_forwarded_for = '1.1.1.1'
        x_forwarded_port = 1111
        x_real_ip = '2.2.2.2'
        x_real_port = 2222
        fake_port = 65535

        mhandler = MixinHandler()
        mhandler.request = HTTPServerRequest(uri='/')
        mhandler.request.remote_ip = x_forwarded_for

        self.assertIsNone(mhandler.get_real_client_addr())

        mhandler.request.headers.add('X-Forwarded-For', x_forwarded_for)
        self.assertEqual(mhandler.get_real_client_addr(),
                         (x_forwarded_for, fake_port))

        mhandler.request.headers.add('X-Forwarded-Port', fake_port + 1)
        self.assertEqual(mhandler.get_real_client_addr(),
                         (x_forwarded_for, fake_port))

        mhandler.request.headers['X-Forwarded-Port'] = x_forwarded_port
        self.assertEqual(mhandler.get_real_client_addr(),
                         (x_forwarded_for, x_forwarded_port))

        mhandler.request.remote_ip = x_real_ip

        mhandler.request.headers.add('X-Real-Ip', x_real_ip)
        self.assertEqual(mhandler.get_real_client_addr(),
                         (x_real_ip, fake_port))

        mhandler.request.headers.add('X-Real-Port', fake_port + 1)
        self.assertEqual(mhandler.get_real_client_addr(),
                         (x_real_ip, fake_port))

        mhandler.request.headers['X-Real-Port'] = x_real_port
        self.assertEqual(mhandler.get_real_client_addr(),
                         (x_real_ip, x_real_port))


class TestPrivateKey(unittest.TestCase):

    def get_pk_obj(self, fname, password=None):
        key = read_file(make_tests_data_path(fname))
        return PrivateKey(key, password=password, filename=fname)

    def _test_with_encrypted_key(self, fname, password, klass):
        pk = self.get_pk_obj(fname, password='')
        with self.assertRaises(InvalidValueError) as ctx:
            pk.get_pkey_obj()
        self.assertIn('Need a passphrase', str(ctx.exception))

        pk = self.get_pk_obj(fname, password='wrongpass')
        with self.assertRaises(InvalidValueError) as ctx:
            pk.get_pkey_obj()
        self.assertIn('wrong passphrase', str(ctx.exception))

        pk = self.get_pk_obj(fname, password=password)
        self.assertIsInstance(pk.get_pkey_obj(), klass)

    def test_class_with_invalid_key_length(self):
        key = u'a' * (PrivateKey.max_length + 1)

        with self.assertRaises(InvalidValueError) as ctx:
            PrivateKey(key)
        self.assertIn('Invalid key length', str(ctx.exception))

    def test_get_pkey_obj_with_invalid_key(self):
        key = u'a b c'
        fname = 'abc'

        pk = PrivateKey(key, filename=fname)
        with self.assertRaises(InvalidValueError) as ctx:
            pk.get_pkey_obj()
        self.assertIn('Invalid key {}'.format(fname), str(ctx.exception))

    def test_get_pkey_obj_with_plain_rsa_key(self):
        pk = self.get_pk_obj('test_rsa.key')
        self.assertIsInstance(pk.get_pkey_obj(), paramiko.RSAKey)

    def test_get_pkey_obj_with_plain_ed25519_key(self):
        pk = self.get_pk_obj('test_ed25519.key')
        self.assertIsInstance(pk.get_pkey_obj(), paramiko.Ed25519Key)

    def test_get_pkey_obj_with_encrypted_rsa_key(self):
        fname = 'test_rsa_password.key'
        password = 'television'
        self._test_with_encrypted_key(fname, password, paramiko.RSAKey)

    def test_get_pkey_obj_with_encrypted_ed25519_key(self):
        fname = 'test_ed25519_password.key'
        password = 'abc123'
        self._test_with_encrypted_key(fname, password, paramiko.Ed25519Key)

    def test_get_pkey_obj_with_encrypted_new_rsa_key(self):
        fname = 'test_new_rsa_password.key'
        password = '123456'
        self._test_with_encrypted_key(fname, password, paramiko.RSAKey)

    def test_get_pkey_obj_with_plain_new_dsa_key(self):
        pk = self.get_pk_obj('test_new_dsa.key')
        self.assertIsInstance(pk.get_pkey_obj(), paramiko.DSSKey)

    def test_parse_name(self):
        key = u'-----BEGIN PRIVATE KEY-----'
        pk = PrivateKey(key)
        name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
        self.assertIsNone(name)

        key = u'-----BEGIN xxx PRIVATE KEY-----'
        pk = PrivateKey(key)
        name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
        self.assertIsNone(name)

        key = u'-----BEGIN  RSA PRIVATE KEY-----'
        pk = PrivateKey(key)
        name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
        self.assertIsNone(name)

        key = u'-----BEGIN RSA  PRIVATE KEY-----'
        pk = PrivateKey(key)
        name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
        self.assertIsNone(name)

        key = u'-----BEGIN RSA PRIVATE  KEY-----'
        pk = PrivateKey(key)
        name, _ = pk.parse_name(pk.iostr, pk.tag_to_name)
        self.assertIsNone(name)

        for tag, to_name in PrivateKey.tag_to_name.items():
            key = u'-----BEGIN {} PRIVATE KEY----- \r\n'.format(tag)
            pk = PrivateKey(key)
            name, length = pk.parse_name(pk.iostr, pk.tag_to_name)
            self.assertEqual(name, to_name)
            self.assertEqual(length, len(key))


class TestWsockHandler(unittest.TestCase):

    def test_check_origin(self):
        request = HTTPServerRequest(uri='/')
        obj = Mock(spec=WsockHandler, request=request)

        obj.origin_policy = 'same'
        request.headers['Host'] = 'www.example.com:4433'
        origin = 'https://www.example.com:4433'
        self.assertTrue(WsockHandler.check_origin(obj, origin))

        origin = 'https://www.example.com'
        self.assertFalse(WsockHandler.check_origin(obj, origin))

        obj.origin_policy = 'primary'
        self.assertTrue(WsockHandler.check_origin(obj, origin))

        origin = 'https://blog.example.com'
        self.assertTrue(WsockHandler.check_origin(obj, origin))

        origin = 'https://blog.example.org'
        self.assertFalse(WsockHandler.check_origin(obj, origin))

        origin = 'https://blog.example.org'
        obj.origin_policy = {'https://blog.example.org'}
        self.assertTrue(WsockHandler.check_origin(obj, origin))

        origin = 'http://blog.example.org'
        obj.origin_policy = {'http://blog.example.org'}
        self.assertTrue(WsockHandler.check_origin(obj, origin))

        origin = 'http://blog.example.org'
        obj.origin_policy = {'https://blog.example.org'}
        self.assertFalse(WsockHandler.check_origin(obj, origin))

        obj.origin_policy = '*'
        origin = 'https://blog.example.org'
        self.assertTrue(WsockHandler.check_origin(obj, origin))

    def test_failed_weak_ref(self):
        request = HTTPServerRequest(uri='/')
        obj = Mock(spec=WsockHandler, request=request)
        obj.src_addr = ("127.0.0.1", 8888)

        class FakeWeakRef:
            def __init__(self):
                self.count = 0

            def __call__(self):
                self.count += 1
                return None

        ref = FakeWeakRef()
        obj.worker_ref = ref
        WsockHandler.on_message(obj, b'{"data": "somestuff"}')
        self.assertGreaterEqual(ref.count, 1)
        obj.close.assert_called_with(reason='No worker found')

    def test_worker_closed(self):
        request = HTTPServerRequest(uri='/')
        obj = Mock(spec=WsockHandler, request=request)
        obj.src_addr = ("127.0.0.1", 8888)

        class Worker:
            def __init__(self):
                self.closed = True

        class FakeWeakRef:
            def __call__(self):
                return Worker()

        ref = FakeWeakRef()
        obj.worker_ref = ref
        WsockHandler.on_message(obj, b'{"data": "somestuff"}')
        obj.close.assert_called_with(reason='Worker closed')