# BSD 2-Clause License # # Apprise - Push Notification Library. # Copyright (c) 2025, Chris Caron # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. from inspect import cleandoc # Disable logging for a cleaner testing output import logging import os import re import sys from unittest import mock from urllib.parse import unquote from apprise import NotificationManager, utils logging.disable(logging.CRITICAL) # Ensure we don't create .pyc files for these tests sys.dont_write_bytecode = True # Grant access to our Notification Manager Singleton N_MGR = NotificationManager() def test_parse_qsd(): "utils: parse_qsd() testing" result = utils.parse.parse_qsd("a=1&b=&c&d=abcd") assert isinstance(result, dict) assert len(result) == 4 assert "qsd" in result assert "qsd+" in result assert "qsd-" in result assert "qsd:" in result assert len(result["qsd"]) == 4 assert "a" in result["qsd"] assert "b" in result["qsd"] assert "c" in result["qsd"] assert "d" in result["qsd"] assert len(result["qsd-"]) == 0 assert len(result["qsd+"]) == 0 assert len(result["qsd:"]) == 0 def test_parse_url_general(): "utils: parse_url() testing" result = utils.parse.parse_url("http://hostname") assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # GitHub Ticket 1234 - Unparseable Hostname result = utils.parse.parse_url("http://5t4m59hl-34343.euw.devtunnels.ms") assert result["schema"] == "http" assert result["host"] == "5t4m59hl-34343.euw.devtunnels.ms" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://5t4m59hl-34343.euw.devtunnels.ms" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("http://hostname/") assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # colon after hostname without port number is no good assert utils.parse.parse_url("http://hostname:") is None # An invalid port result = utils.parse.parse_url( "http://hostname:invalid", verify_host=False, strict_port=True ) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == "invalid" assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:invalid" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # However if we don't verify the host, it is okay result = utils.parse.parse_url("http://hostname:", verify_host=False) assert result["schema"] == "http" assert result["host"] == "hostname:" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # A port of Zero is not valid with strict port checking assert utils.parse.parse_url("http://hostname:0", strict_port=True) is None # Without strict port checking however, it is okay result = utils.parse.parse_url("http://hostname:0", strict_port=False) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 0 assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:0" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # A negative port is not valid assert ( utils.parse.parse_url("http://hostname:-92", strict_port=True) is None ) result = utils.parse.parse_url( "http://hostname:-92", verify_host=False, strict_port=True ) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == -92 assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:-92" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # A port that is too large is not valid assert ( utils.parse.parse_url("http://hostname:65536", strict_port=True) is None ) # This is an accetable port (the maximum) result = utils.parse.parse_url("http://hostname:65535", strict_port=True) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 65535 assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:65535" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # This is an accetable port (the maximum) result = utils.parse.parse_url("http://hostname:1", strict_port=True) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 1 assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:1" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # A port that was identfied as a string is invalid assert ( utils.parse.parse_url("http://hostname:invalid", strict_port=True) is None ) result = utils.parse.parse_url( "http://hostname:invalid", verify_host=False, strict_port=True ) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == "invalid" assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:invalid" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url( "http://hostname:invalid", verify_host=False, strict_port=False ) assert result["schema"] == "http" assert result["host"] == "hostname:invalid" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:invalid" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url( "http://hostname:invalid?key=value&-minuskey=mvalue", verify_host=False, strict_port=False, ) assert result["schema"] == "http" assert result["host"] == "hostname:invalid" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:invalid" assert unquote(result["qsd"]["-minuskey"]) == "mvalue" assert unquote(result["qsd"]["key"]) == "value" assert unquote(result["qsd-"]["minuskey"]) == "mvalue" assert result["qsd+"] == {} assert result["qsd:"] == {} # Handling of floats assert ( utils.parse.parse_url("http://hostname:4.2", strict_port=True) is None ) result = utils.parse.parse_url( "http://hostname:4.2", verify_host=False, strict_port=True ) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == "4.2" assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:4.2" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # A Port of zero is not acceptable for a regular hostname assert utils.parse.parse_url("http://hostname:0", strict_port=True) is None # No host verification (zero is an acceptable port when this is the case result = utils.parse.parse_url("http://hostname:0", verify_host=False) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 0 assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:0" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url( "http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080", verify_host=False, ) assert result["schema"] == "http" assert result["host"] == "[2001:db8:002a:3256:adfe:05c0:0003:0006]" assert result["port"] == 8080 assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert ( result["url"] == "http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080" ) assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url( "http://hostname:0", verify_host=False, strict_port=True ) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 0 assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://hostname:0" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("http://hostname/?-KeY=Value") assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert "-key" in result["qsd"] assert unquote(result["qsd"]["-key"]) == "Value" assert "KeY" in result["qsd-"] assert unquote(result["qsd-"]["KeY"]) == "Value" assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("http://hostname/?+KeY=Value") assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert "+key" in result["qsd"] assert "KeY" in result["qsd+"] assert result["qsd+"]["KeY"] == "Value" assert result["qsd-"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("http://hostname/?:kEy=vALUE") assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert ":key" in result["qsd"] assert "kEy" in result["qsd:"] assert result["qsd:"]["kEy"] == "vALUE" assert result["qsd+"] == {} assert result["qsd-"] == {} result = utils.parse.parse_url( "http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C&:colon=y" ) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert "+key" in result["qsd"] assert "-key" in result["qsd"] assert ":colon" in result["qsd"] assert result["qsd:"]["colon"] == "y" assert "key" in result["qsd"] assert "KeY" in result["qsd+"] assert result["qsd+"]["KeY"] == "ValueA" assert "kEy" in result["qsd-"] assert result["qsd-"]["kEy"] == "ValueB" assert result["qsd"]["key"] == "Value +C" assert result["qsd"]["+key"] == result["qsd+"]["KeY"] assert result["qsd"]["-key"] == result["qsd-"]["kEy"] result = utils.parse.parse_url( "http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C&:colon=y", plus_to_space=True, ) assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert "+key" in result["qsd"] assert "-key" in result["qsd"] assert ":colon" in result["qsd"] assert result["qsd:"]["colon"] == "y" assert "key" in result["qsd"] assert "KeY" in result["qsd+"] assert result["qsd+"]["KeY"] == "ValueA" assert "kEy" in result["qsd-"] assert result["qsd-"]["kEy"] == "ValueB" assert result["qsd"]["key"] == "Value C" assert result["qsd"]["+key"] == result["qsd+"]["KeY"] assert result["qsd"]["-key"] == result["qsd-"]["kEy"] result = utils.parse.parse_url("http://hostname////") assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("http://hostname:40////") assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 40 assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname:40/" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("HTTP://HoStNaMe:40/test.php") assert result["schema"] == "http" assert result["host"] == "HoStNaMe" assert result["port"] == 40 assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/test.php" assert result["path"] == "/" assert result["query"] == "test.php" assert result["url"] == "http://HoStNaMe:40/test.php" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("HTTPS://user@hostname/test.py") assert result["schema"] == "https" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] == "user" assert result["password"] is None assert result["fullpath"] == "/test.py" assert result["path"] == "/" assert result["query"] == "test.py" assert result["url"] == "https://user@hostname/test.py" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url(" HTTPS://///user@@@hostname///test.py ") assert result["schema"] == "https" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] == "user" assert result["password"] is None assert result["fullpath"] == "/test.py" assert result["path"] == "/" assert result["query"] == "test.py" assert result["url"] == "https://user@hostname/test.py" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url( "HTTPS://user:password@otherHost/full///path/name/", ) assert result["schema"] == "https" assert result["host"] == "otherHost" assert result["port"] is None assert result["user"] == "user" assert result["password"] == "password" assert result["fullpath"] == "/full/path/name/" assert result["path"] == "/full/path/name/" assert result["query"] is None assert result["url"] == "https://user:password@otherHost/full/path/name/" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url( "HTTPS://hostname/a/path/ending/with/slash/?key=value", ) assert result["schema"] == "https" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/a/path/ending/with/slash/" assert result["path"] == "/a/path/ending/with/slash/" assert result["query"] is None assert result["url"] == "https://hostname/a/path/ending/with/slash/" assert result["qsd"] == {"key": "value"} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # Handle garbage assert utils.parse.parse_url(None) is None result = utils.parse.parse_url( "mailto://user:password@otherHost/lead2gold@gmail.com" + "?from=test@test.com&name=Chris%20Caron&format=text" ) assert result["schema"] == "mailto" assert result["host"] == "otherHost" assert result["port"] is None assert result["user"] == "user" assert result["password"] == "password" assert unquote(result["fullpath"]) == "/lead2gold@gmail.com" assert result["path"] == "/" assert unquote(result["query"]) == "lead2gold@gmail.com" assert ( unquote(result["url"]) == "mailto://user:password@otherHost/lead2gold@gmail.com" ) assert len(result["qsd"]) == 3 assert "name" in result["qsd"] assert unquote(result["qsd"]["name"]) == "Chris Caron" assert "from" in result["qsd"] assert unquote(result["qsd"]["from"]) == "test@test.com" assert "format" in result["qsd"] assert unquote(result["qsd"]["format"]) == "text" assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # Test Passwords with question marks ?; not supported result = utils.parse.parse_url("http://user:pass.with.?question@host") assert result is None # just hostnames result = utils.parse.parse_url("nuxref.com") assert result["schema"] == "http" assert result["host"] == "nuxref.com" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "http://nuxref.com" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # just host and path result = utils.parse.parse_url("invalid/host") assert result["schema"] == "http" assert result["host"] == "invalid" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/host" assert result["path"] == "/" assert result["query"] == "host" assert result["url"] == "http://invalid/host" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # just all out invalid assert utils.parse.parse_url("?") is None assert utils.parse.parse_url("/") is None # Test some illegal strings result = utils.parse.parse_url(object, verify_host=False) assert result is None result = utils.parse.parse_url(None, verify_host=False) assert result is None # Just a schema; invalid host result = utils.parse.parse_url("test://") assert result is None # Do it again without host validation result = utils.parse.parse_url("test://", verify_host=False) assert result["schema"] == "test" # It's worth noting that the hostname is an empty string and is NEVER set # to None if it wasn't specified. assert result["host"] == "" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None assert result["url"] == "test://" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("testhostname") assert result["schema"] == "http" assert result["host"] == "testhostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None # The default_schema kicks in here assert result["url"] == "http://testhostname" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url("example.com", default_schema="unknown") assert result["schema"] == "unknown" assert result["host"] == "example.com" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None # The default_schema kicks in here assert result["url"] == "unknown://example.com" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # An empty string without a hostame is still valid if verify_host is set result = utils.parse.parse_url("", verify_host=False) assert result["schema"] == "http" assert result["host"] == "" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] is None assert result["path"] is None assert result["query"] is None # The default_schema kicks in here assert result["url"] == "http://" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # A messed up URL result = utils.parse.parse_url("test://:@/", verify_host=False) assert result["schema"] == "test" assert result["host"] == "" assert result["port"] is None assert result["user"] == "" assert result["password"] == "" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "test://:@/" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} result = utils.parse.parse_url( "crazy://:@//_/@^&/jack.json", verify_host=False ) assert result["schema"] == "crazy" assert result["host"] == "" assert result["port"] is None assert result["user"] == "" assert result["password"] == "" assert unquote(result["fullpath"]) == "/_/@^&/jack.json" assert unquote(result["path"]) == "/_/@^&/" assert result["query"] == "jack.json" assert unquote(result["url"]) == "crazy://:@/_/@^&/jack.json" assert result["qsd"] == {} assert result["qsd-"] == {} assert result["qsd+"] == {} assert result["qsd:"] == {} # Sanitizing result = utils.parse.parse_url( "hTTp://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C&:cOlON=YeS", sanitize=False, ) assert len(result["qsd-"]) == 1 assert len(result["qsd+"]) == 1 assert len(result["qsd"]) == 4 assert len(result["qsd:"]) == 1 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/" assert result["path"] == "/" assert result["query"] is None assert result["url"] == "http://hostname/" assert "+KeY" in result["qsd"] assert "-kEy" in result["qsd"] assert ":cOlON" in result["qsd"] assert result["qsd:"]["cOlON"] == "YeS" assert "key" not in result["qsd"] assert "KeY" in result["qsd+"] assert result["qsd+"]["KeY"] == "ValueA" assert "kEy" in result["qsd-"] assert result["qsd-"]["kEy"] == "ValueB" assert result["qsd"]["KEY"] == "Value +C" assert result["qsd"]["+KeY"] == result["qsd+"]["KeY"] assert result["qsd"]["-kEy"] == result["qsd-"]["kEy"] # Testing Defect 1264 - whitespaces in url result = utils.parse.parse_url( "posts://example.com/my endpoint?-token=ab cdefg" ) assert len(result["qsd-"]) == 1 assert len(result["qsd+"]) == 0 assert len(result["qsd"]) == 1 assert len(result["qsd:"]) == 0 assert result["schema"] == "posts" assert result["host"] == "example.com" assert result["port"] is None assert result["user"] is None assert result["password"] is None assert result["fullpath"] == "/my%20endpoint" assert result["path"] == "/" assert result["query"] == "my%20endpoint" assert result["url"] == "posts://example.com/my%20endpoint" assert "-token" in result["qsd"] assert result["qsd-"]["token"] == "ab cdefg" def test_parse_url_simple(): "utils: parse_url() testing" result = utils.parse.parse_url("http://hostname", simple=True) assert len(result) == 3 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["url"] == "http://hostname" result = utils.parse.parse_url("http://hostname/", simple=True) assert len(result) == 5 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "http://hostname/" # colon after hostname without port number is no good assert utils.parse.parse_url("http://hostname:", simple=True) is None # An invalid port result = utils.parse.parse_url( "http://hostname:invalid", verify_host=False, strict_port=True, simple=True, ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == "invalid" assert result["url"] == "http://hostname:invalid" # However if we don't verify the host, it is okay result = utils.parse.parse_url( "http://hostname:", verify_host=False, simple=True ) assert len(result) == 3 assert result["schema"] == "http" assert result["host"] == "hostname:" assert result["url"] == "http://hostname:" # A port of Zero is not valid with strict port checking assert ( utils.parse.parse_url( "http://hostname:0", strict_port=True, simple=True ) is None ) # Without strict port checking however, it is okay result = utils.parse.parse_url( "http://hostname:0", strict_port=False, simple=True ) assert len(result) == 4 assert result["schema"] == "http" assert result["port"] == 0 assert result["host"] == "hostname" assert result["url"] == "http://hostname:0" # A negative port is not valid assert ( utils.parse.parse_url( "http://hostname:-92", strict_port=True, simple=True ) is None ) result = utils.parse.parse_url( "http://hostname:-92", verify_host=False, strict_port=True, simple=True ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == -92 assert result["url"] == "http://hostname:-92" # A port that is too large is not valid assert ( utils.parse.parse_url( "http://hostname:65536", strict_port=True, simple=True ) is None ) # This is an accetable port (the maximum) result = utils.parse.parse_url( "http://hostname:65535", strict_port=True, simple=True ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 65535 assert result["url"] == "http://hostname:65535" # This is an accetable port (the maximum) result = utils.parse.parse_url( "http://hostname:1", strict_port=True, simple=True ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 1 assert result["url"] == "http://hostname:1" # A port that was identfied as a string is invalid assert ( utils.parse.parse_url( "http://hostname:invalid", strict_port=True, simple=True ) is None ) result = utils.parse.parse_url( "http://hostname:invalid", verify_host=False, strict_port=True, simple=True, ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == "invalid" assert result["url"] == "http://hostname:invalid" result = utils.parse.parse_url( "http://hostname:invalid", verify_host=False, strict_port=False, simple=True, ) assert len(result) == 3 assert result["schema"] == "http" assert result["host"] == "hostname:invalid" assert result["url"] == "http://hostname:invalid" result = utils.parse.parse_url( "http://hostname:invalid?key=value&-minuskey=mvalue", verify_host=False, strict_port=False, simple=True, ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname:invalid" assert result["url"] == "http://hostname:invalid" assert isinstance(result["qsd"], dict) assert len(result["qsd"]) == 2 assert unquote(result["qsd"]["-minuskey"]) == "mvalue" assert unquote(result["qsd"]["key"]) == "value" # Handling of floats assert ( utils.parse.parse_url( "http://hostname:4.2", strict_port=True, simple=True ) is None ) result = utils.parse.parse_url( "http://hostname:4.2", verify_host=False, strict_port=True, simple=True ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == "4.2" assert result["url"] == "http://hostname:4.2" # A Port of zero is not acceptable for a regular hostname assert ( utils.parse.parse_url( "http://hostname:0", strict_port=True, simple=True ) is None ) # No host verification (zero is an acceptable port when this is the case result = utils.parse.parse_url( "http://hostname:0", verify_host=False, simple=True ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 0 assert result["url"] == "http://hostname:0" result = utils.parse.parse_url( "http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080", verify_host=False, simple=True, ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "[2001:db8:002a:3256:adfe:05c0:0003:0006]" assert result["port"] == 8080 assert ( result["url"] == "http://[2001:db8:002a:3256:adfe:05c0:0003:0006]:8080" ) result = utils.parse.parse_url( "http://hostname:0", verify_host=False, strict_port=True, simple=True ) assert len(result) == 4 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 0 assert result["url"] == "http://hostname:0" result = utils.parse.parse_url("http://hostname/?-KeY=Value", simple=True) assert len(result) == 6 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "http://hostname/" assert "-key" in result["qsd"] assert unquote(result["qsd"]["-key"]) == "Value" result = utils.parse.parse_url("http://hostname/?+KeY=Value", simple=True) assert len(result) == 6 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "http://hostname/" assert "+key" in result["qsd"] assert result["qsd"]["+key"] == "Value" result = utils.parse.parse_url("http://hostname/?:kEy=vALUE", simple=True) assert len(result) == 6 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "http://hostname/" assert ":key" in result["qsd"] assert result["qsd"][":key"] == "vALUE" result = utils.parse.parse_url( "http://hostname/?+KeY=ValueA&-kEy=ValueB&KEY=Value%20+C&:colon=y", simple=True, ) assert len(result) == 6 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "http://hostname/" assert "+key" in result["qsd"] assert "-key" in result["qsd"] assert ":colon" in result["qsd"] assert result["qsd"][":colon"] == "y" assert result["qsd"]["key"] == "Value +C" assert result["qsd"]["+key"] == "ValueA" assert result["qsd"]["-key"] == "ValueB" result = utils.parse.parse_url("http://hostname////", simple=True) assert len(result) == 5 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "http://hostname/" result = utils.parse.parse_url("http://hostname:40////", simple=True) assert len(result) == 6 assert result["schema"] == "http" assert result["host"] == "hostname" assert result["port"] == 40 assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "http://hostname:40/" result = utils.parse.parse_url("HTTP://HoStNaMe:40/test.php", simple=True) assert len(result) == 7 assert result["schema"] == "http" assert result["host"] == "HoStNaMe" assert result["port"] == 40 assert result["fullpath"] == "/test.php" assert result["path"] == "/" assert result["query"] == "test.php" assert result["url"] == "http://HoStNaMe:40/test.php" result = utils.parse.parse_url( "HTTPS://user@hostname/test.py", simple=True ) assert len(result) == 7 assert result["schema"] == "https" assert result["host"] == "hostname" assert result["user"] == "user" assert result["fullpath"] == "/test.py" assert result["path"] == "/" assert result["query"] == "test.py" assert result["url"] == "https://user@hostname/test.py" result = utils.parse.parse_url( " HTTPS://///user@@@hostname///test.py ", simple=True ) assert len(result) == 7 assert result["schema"] == "https" assert result["host"] == "hostname" assert result["user"] == "user" assert result["fullpath"] == "/test.py" assert result["path"] == "/" assert result["query"] == "test.py" assert result["url"] == "https://user@hostname/test.py" result = utils.parse.parse_url( "HTTPS://user:password@otherHost/full///path/name/", simple=True, ) assert len(result) == 7 assert result["schema"] == "https" assert result["host"] == "otherHost" assert result["user"] == "user" assert result["password"] == "password" assert result["fullpath"] == "/full/path/name/" assert result["path"] == "/full/path/name/" assert result["url"] == "https://user:password@otherHost/full/path/name/" # Handle garbage assert utils.parse.parse_url(None) is None result = utils.parse.parse_url( "mailto://user:password@otherHost/lead2gold@gmail.com" + "?from=test@test.com&name=Chris%20Caron&format=text", simple=True, ) assert len(result) == 9 assert result["schema"] == "mailto" assert result["host"] == "otherHost" assert result["user"] == "user" assert result["password"] == "password" assert unquote(result["fullpath"]) == "/lead2gold@gmail.com" assert result["path"] == "/" assert unquote(result["query"]) == "lead2gold@gmail.com" assert ( unquote(result["url"]) == "mailto://user:password@otherHost/lead2gold@gmail.com" ) assert len(result["qsd"]) == 3 assert "name" in result["qsd"] assert unquote(result["qsd"]["name"]) == "Chris Caron" assert "from" in result["qsd"] assert unquote(result["qsd"]["from"]) == "test@test.com" assert "format" in result["qsd"] assert unquote(result["qsd"]["format"]) == "text" # Test Passwords with question marks ?; not supported result = utils.parse.parse_url( "http://user:pass.with.?question@host", simple=True ) assert result is None # just hostnames result = utils.parse.parse_url( "nuxref.com", simple=True, ) assert len(result) == 3 assert result["schema"] == "http" assert result["host"] == "nuxref.com" assert result["url"] == "http://nuxref.com" # just host and path result = utils.parse.parse_url("invalid/host", simple=True) assert len(result) == 6 assert result["schema"] == "http" assert result["host"] == "invalid" assert result["fullpath"] == "/host" assert result["path"] == "/" assert result["query"] == "host" assert result["url"] == "http://invalid/host" # just all out invalid assert utils.parse.parse_url("?", simple=True) is None assert utils.parse.parse_url("/", simple=True) is None # Test some illegal strings result = utils.parse.parse_url(object, verify_host=False, simple=True) assert result is None result = utils.parse.parse_url(None, verify_host=False, simple=True) assert result is None # Just a schema; invalid host result = utils.parse.parse_url("test://", simple=True) assert result is None # Do it again without host validation result = utils.parse.parse_url("test://", verify_host=False, simple=True) assert len(result) == 2 assert result["schema"] == "test" assert result["url"] == "test://" result = utils.parse.parse_url("testhostname", simple=True) assert len(result) == 3 assert result["schema"] == "http" assert result["host"] == "testhostname" # The default_schema kicks in here assert result["url"] == "http://testhostname" result = utils.parse.parse_url( "example.com", default_schema="unknown", simple=True ) assert len(result) == 3 assert result["schema"] == "unknown" assert result["host"] == "example.com" # The default_schema kicks in here assert result["url"] == "unknown://example.com" # An empty string without a hostame is still valid if verify_host is set result = utils.parse.parse_url("", verify_host=False, simple=True) assert len(result) == 2 assert result["schema"] == "http" # The default_schema kicks in here assert result["url"] == "http://" # A messed up URL result = utils.parse.parse_url( "test://:@/", verify_host=False, simple=True ) assert len(result) == 6 assert result["schema"] == "test" assert result["user"] == "" assert result["password"] == "" assert result["fullpath"] == "/" assert result["path"] == "/" assert result["url"] == "test://:@/" result = utils.parse.parse_url( "crazy://:@//_/@^&/jack.json", verify_host=False, simple=True ) assert len(result) == 7 assert result["schema"] == "crazy" assert result["user"] == "" assert result["password"] == "" assert unquote(result["fullpath"]) == "/_/@^&/jack.json" assert unquote(result["path"]) == "/_/@^&/" assert result["query"] == "jack.json" assert unquote(result["url"]) == "crazy://:@/_/@^&/jack.json" def test_url_assembly(): """ "utils: url_assembly() testing.""" url = "schema://user:password@hostname:port/path/?key=value" assert ( utils.parse.url_assembly( **utils.parse.parse_url(url, verify_host=False) ) == url ) # Same URL without trailing slash after path url = "schema://user:password@hostname:port/path?key=value" assert ( utils.parse.url_assembly( **utils.parse.parse_url(url, verify_host=False) ) == url ) url = "schema://user@hostname:port/path?key=value" assert ( utils.parse.url_assembly( **utils.parse.parse_url(url, verify_host=False) ) == url ) url = "schema://hostname:10/a/file.php" assert ( utils.parse.url_assembly( **utils.parse.parse_url(url, verify_host=False) ) == url ) # When spaces and special characters are introduced, the URL # is hard to mimic what was entered. Instead it is normalized url = ( "schema://hostname:10/a space/file.php?" "arg=a+space&arg2=a%20space&arg3=a space" ) assert ( utils.parse.url_assembly( **utils.parse.parse_url(url, verify_host=False) ) == "schema://hostname:10/a%20space/file.php?" "arg=a%2Bspace&arg2=a+space&arg3=a+space" ) # encode=True should only be used if you're passing in un-assembled # content... hence the following is likely not what is expected: assert ( utils.parse.url_assembly( **utils.parse.parse_url(url, verify_host=False), encode=True ) == "schema://hostname:10/a%2520space/file.php?" "arg=a%2Bspace&arg2=a+space&arg3=a+space" ) # But the following utilizes the encode=True and produces the # desired effects: content = { "host": "hostname", # Note that fullpath requires escaping in this case "fullpath": "/a space/file.php", "path": "/a space/", "query": "file.php", "schema": "schema", # our query arguments also require escaping as well "qsd": {"arg": "a+space", "arg2": "a space", "arg3": "a space"}, } assert ( utils.parse.url_assembly(**content, encode=True) == "schema://hostname/a%20space/file.php?" "arg=a%2Bspace&arg2=a+space&arg3=a+space" ) def test_parse_bool(): "utils: parse_bool() testing" assert utils.parse.parse_bool("Enabled", None) is True assert utils.parse.parse_bool("Disabled", None) is False assert utils.parse.parse_bool("Allow", None) is True assert utils.parse.parse_bool("Deny", None) is False assert utils.parse.parse_bool("Yes", None) is True assert utils.parse.parse_bool("YES", None) is True assert utils.parse.parse_bool("Always", None) is True assert utils.parse.parse_bool("No", None) is False assert utils.parse.parse_bool("NO", None) is False assert utils.parse.parse_bool("NEVER", None) is False assert utils.parse.parse_bool("TrUE", None) is True assert utils.parse.parse_bool("tRUe", None) is True assert utils.parse.parse_bool("FAlse", None) is False assert utils.parse.parse_bool("F", None) is False assert utils.parse.parse_bool("T", None) is True assert utils.parse.parse_bool("0", None) is False assert utils.parse.parse_bool("1", None) is True assert utils.parse.parse_bool("True", None) is True assert utils.parse.parse_bool("Yes", None) is True assert utils.parse.parse_bool(1, None) is True assert utils.parse.parse_bool(0, None) is False assert utils.parse.parse_bool(True, None) is True assert utils.parse.parse_bool(False, None) is False # only the int of 0 will return False since the function # casts this to a boolean assert utils.parse.parse_bool(2, None) is True # An empty list is still false assert utils.parse.parse_bool([], None) is False # But a list that contains something is True assert ( utils.parse.parse_bool( [ "value", ], None, ) is True ) # Use Default (which is False) assert utils.parse.parse_bool("OhYeah") is False # Adjust Default and get a different result assert utils.parse.parse_bool("OhYeah", True) is True def test_is_uuid(): """ API: is_uuid() function """ # Invalid Entries assert utils.parse.is_uuid("invalid") is False assert utils.parse.is_uuid(None) is False assert utils.parse.is_uuid(5) is False assert utils.parse.is_uuid(object) is False # A slightly invalid uuid4 entry assert utils.parse.is_uuid("591ed387-fa65-ac97-9712-b9d2a15e42a9") is False assert utils.parse.is_uuid("591ed387-fa65-Jc97-9712-b9d2a15e42a9") is False # Valid UUID4 Entries assert utils.parse.is_uuid("591ed387-fa65-4c97-9712-b9d2a15e42a9") is True assert utils.parse.is_uuid("32b0b447-fe84-4df1-8368-81925e729265") is True def test_is_hostname(): """ API: is_hostname() function """ # Valid Hostnames assert utils.parse.is_hostname("yahoo.ca") == "yahoo.ca" assert utils.parse.is_hostname("yahoo.ca.") == "yahoo.ca" assert ( utils.parse.is_hostname("valid-dashes-in-host.ca") == "valid-dashes-in-host.ca" ) assert ( utils.parse.is_hostname("valid-underscores_in_host.ca") == "valid-underscores_in_host.ca" ) # Underscores are supported by default assert ( utils.parse.is_hostname("valid_dashes_in_host.ca") == "valid_dashes_in_host.ca" ) # However they are not if specified otherwise: assert ( utils.parse.is_hostname("valid_dashes_in_host.ca", underscore=False) is False ) # Invalid Hostnames assert ( utils.parse.is_hostname("-hostname.that.starts.with.a.dash") is False ) assert utils.parse.is_hostname("invalid-characters_#^.ca") is False assert utils.parse.is_hostname(" spaces ") is False assert utils.parse.is_hostname(" ") is False assert utils.parse.is_hostname("") is False # Valid IPv4 Addresses assert utils.parse.is_hostname("127.0.0.1") == "127.0.0.1" assert utils.parse.is_hostname("0.0.0.0") == "0.0.0.0" assert utils.parse.is_hostname("255.255.255.255") == "255.255.255.255" # But not if we're not checking for this: assert utils.parse.is_hostname("127.0.0.1", ipv4=False) is False assert utils.parse.is_hostname("0.0.0.0", ipv4=False) is False assert utils.parse.is_hostname("255.255.255.255", ipv4=False) is False # Invalid IPv4 Addresses assert utils.parse.is_hostname("1.2.3") is False assert utils.parse.is_hostname("256.256.256.256") is False assert utils.parse.is_hostname("999.0.0.0") is False assert utils.parse.is_hostname("1.2.3.4.5") is False assert utils.parse.is_hostname(" 127.0.0.1 ") is False assert utils.parse.is_hostname(" ") is False assert utils.parse.is_hostname("") is False # Valid IPv6 Addresses (square brakets supported for URL construction) assert ( utils.parse.is_hostname("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]") == "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]" ) assert ( utils.parse.is_hostname("2001:0db8:85a3:0000:0000:8a2e:0370:7334") == "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]" ) assert ( utils.parse.is_hostname("[2001:db8:002a:3256:adfe:05c0:0003:0006]") == "[2001:db8:002a:3256:adfe:05c0:0003:0006]" ) # localhost assert utils.parse.is_hostname("::1") == "[::1]" assert utils.parse.is_hostname("0:0:0:0:0:0:0:1") == "[0:0:0:0:0:0:0:1]" # But not if we're not checking for this: assert ( utils.parse.is_hostname( "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]", ipv6=False ) is False ) assert ( utils.parse.is_hostname( "2001:0db8:85a3:0000:0000:8a2e:0370:7334", ipv6=False ) is False ) # Test hostnames with a single character hostname assert ( utils.parse.is_hostname("cloud.a.example.com", ipv4=False, ipv6=False) == "cloud.a.example.com" ) def test_is_ipaddr(): """ API: is_ipaddr() function """ # Valid IPv4 Addresses assert utils.parse.is_ipaddr("127.0.0.1") == "127.0.0.1" assert utils.parse.is_ipaddr("0.0.0.0") == "0.0.0.0" assert utils.parse.is_ipaddr("255.255.255.255") == "255.255.255.255" # Invalid IPv4 Addresses assert utils.parse.is_ipaddr("1.2.3") is False assert utils.parse.is_ipaddr("256.256.256.256") is False assert utils.parse.is_ipaddr("999.0.0.0") is False assert utils.parse.is_ipaddr("1.2.3.4.5") is False assert utils.parse.is_ipaddr(" 127.0.0.1 ") is False assert utils.parse.is_ipaddr(" ") is False assert utils.parse.is_ipaddr("") is False # Valid IPv6 Addresses (square brakets supported for URL construction) assert ( utils.parse.is_ipaddr("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]") == "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]" ) assert ( utils.parse.is_ipaddr("2001:0db8:85a3:0000:0000:8a2e:0370:7334") == "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]" ) assert ( utils.parse.is_ipaddr("[2001:db8:002a:3256:adfe:05c0:0003:0006]") == "[2001:db8:002a:3256:adfe:05c0:0003:0006]" ) # localhost assert utils.parse.is_ipaddr("::1") == "[::1]" assert utils.parse.is_ipaddr("0:0:0:0:0:0:0:1") == "[0:0:0:0:0:0:0:1]" def test_is_email(): """ API: is_email() function """ # Valid Emails results = utils.parse.is_email("test@gmail.com") assert results["name"] == "" assert results["email"] == "test@gmail.com" assert results["full_email"] == "test@gmail.com" assert results["domain"] == "gmail.com" assert results["user"] == "test" assert results["label"] == "" results = utils.parse.is_email("test@my-valid_host.com") assert results["name"] == "" assert results["email"] == "test@my-valid_host.com" assert results["full_email"] == "test@my-valid_host.com" assert results["domain"] == "my-valid_host.com" assert results["user"] == "test" assert results["label"] == "" results = utils.parse.is_email("tag+test@gmail.com") assert results["name"] == "" assert results["email"] == "test@gmail.com" assert results["full_email"] == "tag+test@gmail.com" assert results["domain"] == "gmail.com" assert results["user"] == "test" assert results["label"] == "tag" # Support Full Names as well results = utils.parse.is_email("Bill Gates: bgates@microsoft.com") assert results["name"] == "Bill Gates" assert results["email"] == "bgates@microsoft.com" assert results["full_email"] == "bgates@microsoft.com" assert results["domain"] == "microsoft.com" assert results["user"] == "bgates" assert results["label"] == "" results = utils.parse.is_email("Bill Gates ") assert results["name"] == "Bill Gates" assert results["email"] == "bgates@microsoft.com" assert results["full_email"] == "bgates@microsoft.com" assert results["domain"] == "microsoft.com" assert results["user"] == "bgates" assert results["label"] == "" results = utils.parse.is_email("Bill Gates: ") assert results["name"] == "Bill Gates" assert results["email"] == "bgates@microsoft.com" assert results["full_email"] == "bgates@microsoft.com" assert results["domain"] == "microsoft.com" assert results["user"] == "bgates" assert results["label"] == "" results = utils.parse.is_email("Sundar Pichai ") assert results["name"] == "Sundar Pichai" assert results["email"] == "spichai@gmail.com" assert results["full_email"] == "ceo+spichai@gmail.com" assert results["domain"] == "gmail.com" assert results["user"] == "spichai" assert results["label"] == "ceo" # Support Quotes results = utils.parse.is_email('"Chris Hemsworth" ') assert results["name"] == "Chris Hemsworth" assert results["email"] == "ch@test.com" assert results["full_email"] == "ch@test.com" assert results["domain"] == "test.com" assert results["user"] == "ch" assert results["label"] == "" # An email without name, but contains delimiters results = utils.parse.is_email(" ") assert results["name"] == "" assert results["email"] == "spichai@gmail.com" assert results["full_email"] == "spichai@gmail.com" assert results["domain"] == "gmail.com" assert results["user"] == "spichai" assert results["label"] == "" # a valid email not properly delimited with a colon or angle bracket # We do a best guess and still parse it correctly results = utils.parse.is_email("Name valid@example.com") assert results["name"] == "Name" assert results["email"] == "valid@example.com" assert results["full_email"] == "valid@example.com" assert results["domain"] == "example.com" assert results["user"] == "valid" assert results["label"] == "" # a valid email not properly delimited with a colon or angle bracket # We do a best guess and still parse it correctly results = utils.parse.is_email("Руслан Эра russian+russia@example.ru") assert results["name"] == "Руслан Эра" assert results["email"] == "russia@example.ru" assert results["full_email"] == "russian+russia@example.ru" assert results["domain"] == "example.ru" assert results["user"] == "russia" assert results["label"] == "russian" # Invalid Emails assert utils.parse.is_email("invalid.com") is False assert utils.parse.is_email(object()) is False assert utils.parse.is_email(None) is False assert utils.parse.is_email("Just A Name") is False assert utils.parse.is_email("Name ") is False # Extended valid emails # # The first + denotes our label, so this test really validates # that there is a correct split and parsing of our email results = utils.parse.is_email("a-z0-9_!#$%&*+/=?%`{|}~^.-@gmail.com") assert results["name"] == "" assert results["label"] == "a-z0-9_!#$%&*" assert results["email"] == "/=?%`{|}~^.-@gmail.com" assert results["full_email"] == "a-z0-9_!#$%&*+/=?%`{|}~^.-@gmail.com" assert results["domain"] == "gmail.com" assert results["user"] == "/=?%`{|}~^.-" # A similar test without '+' (use of a label) # The first + denotes our label, so this test really validates # that there is a correct split and parsing of our email results = utils.parse.is_email("a-z0-9_!#$%&*/=?%`{|}~^.-@gmail.com") assert results["name"] == "" assert results["label"] == "" assert results["email"] == "a-z0-9_!#$%&*/=?%`{|}~^.-@gmail.com" assert results["full_email"] == "a-z0-9_!#$%&*/=?%`{|}~^.-@gmail.com" assert results["domain"] == "gmail.com" assert results["user"] == "a-z0-9_!#$%&*/=?%`{|}~^.-" def test_is_call_sign_no(): """ API: is_call_sign() function """ # Invalid numbers assert utils.parse.is_call_sign(None) is False assert utils.parse.is_call_sign(42) is False assert utils.parse.is_call_sign(object) is False assert utils.parse.is_call_sign("") is False assert utils.parse.is_call_sign("1") is False assert utils.parse.is_call_sign("12") is False assert utils.parse.is_call_sign("abc") is False assert utils.parse.is_call_sign("+()") is False assert utils.parse.is_call_sign("+") is False assert utils.parse.is_call_sign(None) is False assert utils.parse.is_call_sign(42) is False # To short or 2 long assert utils.parse.is_call_sign("DF1AB") is False assert utils.parse.is_call_sign("DF1ABCX") is False assert utils.parse.is_call_sign("DF1ABCEFG") is False assert utils.parse.is_call_sign("1ABCX") is False # 4th character is not an number assert utils.parse.is_call_sign("XXXXXX") is False # Some valid checks result = utils.parse.is_call_sign("DF1ABC") assert isinstance(result, dict) assert result["callsign"] == "DF1ABC" assert result["ssid"] == "" # Get our SSID result = utils.parse.is_call_sign("DF1ABC-14") assert result["callsign"] == "DF1ABC" assert result["ssid"] == "-14" def test_is_phone_no(): """ API: is_phone_no() function """ # Invalid numbers assert utils.parse.is_phone_no(None) is False assert utils.parse.is_phone_no(42) is False assert utils.parse.is_phone_no(object) is False assert utils.parse.is_phone_no("") is False assert utils.parse.is_phone_no("1") is False assert utils.parse.is_phone_no("12") is False assert utils.parse.is_phone_no("abc") is False assert utils.parse.is_phone_no("+()") is False assert utils.parse.is_phone_no("+") is False assert utils.parse.is_phone_no(None) is False assert utils.parse.is_phone_no(42) is False assert utils.parse.is_phone_no(object, min_len=0) is False assert utils.parse.is_phone_no("", min_len=1) is False assert utils.parse.is_phone_no("abc", min_len=0) is False assert utils.parse.is_phone_no("", min_len=0) is False # Ambigious, but will document it here in this test as such results = utils.parse.is_phone_no("+((()))--+", min_len=0) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "" assert results["pretty"] == "" assert results["full"] == "" # Valid phone numbers assert utils.parse.is_phone_no("+(0)") is False results = utils.parse.is_phone_no("+(0)", min_len=1) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "0" assert results["pretty"] == "0" assert results["full"] == "0" assert utils.parse.is_phone_no("1") is False results = utils.parse.is_phone_no("1", min_len=1) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "1" assert results["pretty"] == "1" assert results["full"] == "1" assert utils.parse.is_phone_no("12") is False results = utils.parse.is_phone_no("12", min_len=2) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "12" assert results["pretty"] == "12" assert results["full"] == "12" assert utils.parse.is_phone_no("911") is False results = utils.parse.is_phone_no("911", min_len=3) assert isinstance(results, dict) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "911" assert results["pretty"] == "911" assert results["full"] == "911" assert utils.parse.is_phone_no("1234") is False results = utils.parse.is_phone_no("1234", min_len=4) assert isinstance(results, dict) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "1234" assert results["pretty"] == "1234" assert results["full"] == "1234" assert utils.parse.is_phone_no("12345") is False results = utils.parse.is_phone_no("12345", min_len=5) assert isinstance(results, dict) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "12345" assert results["pretty"] == "12345" assert results["full"] == "12345" assert utils.parse.is_phone_no("123456") is False results = utils.parse.is_phone_no("123456", min_len=6) assert isinstance(results, dict) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "123456" assert results["pretty"] == "123456" assert results["full"] == "123456" # at 7 digits, the format hyphenates in the `pretty` section assert utils.parse.is_phone_no("1234567") is False results = utils.parse.is_phone_no("1234567", min_len=7) assert isinstance(results, dict) assert results["country"] == "" assert results["area"] == "" assert results["line"] == "1234567" assert results["pretty"] == "123-4567" assert results["full"] == "1234567" results = utils.parse.is_phone_no("1(800) 123-4567") assert isinstance(results, dict) assert results["country"] == "1" assert results["area"] == "800" assert results["line"] == "1234567" assert results["pretty"] == "+1 800-123-4567" assert results["full"] == "18001234567" def test_parse_call_sign(): """utils: parse_call_sign() testing""" # A simple single array entry (As str) results = utils.parse.parse_call_sign("") assert isinstance(results, list) assert len(results) == 0 # just delimeters results = utils.parse.parse_call_sign(", ,, , ,,, ") assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_call_sign(None) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_call_sign(42) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_call_sign( "this is not a parseable call sign at all" ) assert isinstance(results, list) assert len(results) == 9 results = utils.parse.parse_call_sign( "this is not a parseable call sign at all", store_unparseable=False ) assert isinstance(results, list) assert len(results) == 0 # Now test valid call signs results = utils.parse.parse_call_sign("0A1DEF") assert isinstance(results, list) assert len(results) == 1 assert "0A1DEF" in results results = utils.parse.parse_call_sign("0A1DEF, DF1ABC") assert isinstance(results, list) assert len(results) == 2 assert "0A1DEF" in results assert "DF1ABC" in results def test_parse_phone_no(): """utils: parse_phone_no() testing""" # A simple single array entry (As str) results = utils.parse.parse_phone_no("") assert isinstance(results, list) assert len(results) == 0 # just delimeters results = utils.parse.parse_phone_no(", ,, , ,,, ") assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_phone_no(",") assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_phone_no(None) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_phone_no(42) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_phone_no( "this is not a parseable phoneno at all" ) assert isinstance(results, list) assert len(results) == 8 # Now we do it again with the store_unparsable flag set to False results = utils.parse.parse_phone_no( "this is not a parseable email at all", store_unparseable=False ) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_phone_no("+", store_unparseable=False) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_phone_no("(", store_unparseable=False) assert isinstance(results, list) assert len(results) == 0 # Number is too short results = utils.parse.parse_phone_no("0", store_unparseable=False) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_phone_no("12", store_unparseable=False) assert isinstance(results, list) assert len(results) == 0 # Now test valid phone numbers results = utils.parse.parse_phone_no("+1 (124) 245 2345") assert isinstance(results, list) assert len(results) == 1 assert "+1 (124) 245 2345" in results results = utils.parse.parse_phone_no("911", store_unparseable=False) assert isinstance(results, list) assert len(results) == 1 assert "911" in results results = utils.parse.parse_phone_no( "911, 123-123-1234", store_unparseable=False ) assert isinstance(results, list) assert len(results) == 2 assert "911" in results assert "123-123-1234" in results # Space variations results = utils.parse.parse_phone_no(" 911 , +1 (123) 123-1234") assert isinstance(results, list) assert len(results) == 2 assert "911" in results assert "+1 (123) 123-1234" in results results = utils.parse.parse_phone_no(" 911 , + 1 ( 123 ) 123-1234") assert isinstance(results, list) assert len(results) == 2 assert "911" in results assert "+ 1 ( 123 ) 123-1234" in results def test_parse_emails(): """utils: parse_emails() testing""" # A simple single array entry (As str) results = utils.parse.parse_emails("") assert isinstance(results, list) assert len(results) == 0 # just delimeters results = utils.parse.parse_emails(", ,, , ,,, ") assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_emails(",") assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_emails(None) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_emails(42) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_emails("this is not a parseable email at all") assert isinstance(results, list) assert len(results) == 8 # Now we do it again with the store_unparsable flag set to False results = utils.parse.parse_emails( "this is not a parseable email at all", store_unparseable=False ) assert isinstance(results, list) assert len(results) == 0 # Now test valid emails results = utils.parse.parse_emails("user@example.com") assert isinstance(results, list) assert len(results) == 1 assert "user@example.com" in results results = utils.parse.parse_emails("a@") assert isinstance(results, list) assert len(results) == 1 assert "a@" in results results = utils.parse.parse_emails("user1@example.com user2@example.com") assert isinstance(results, list) assert len(results) == 2 assert "user1@example.com" in results assert "user2@example.com" in results # Commas and spaces found inside URLs are ignored emails = [ "user1@example.com,", "test1@example.com,,, abcd@example.com", "Chuck Norris roundhouse@kick.com", "David Spade dspade@example.com, Yours Truly yours@truly.com", ] results = utils.parse.parse_emails(", ".join(emails)) assert isinstance(results, list) assert len(results) == 6 assert "user1@example.com" in results assert "test1@example.com" in results assert "abcd@example.com" in results assert "Chuck Norris roundhouse@kick.com" in results assert "David Spade dspade@example.com" in results assert "Yours Truly yours@truly.com" in results # Test triangle bracket parsing # Commas and spaces found inside URLs are ignored emails = [ "User1 user1@example.com", "User 2 user2@example.com", "User Three ", "The Forth User: ", "5th User: user4@example.com", ] results = utils.parse.parse_emails(", ".join(emails)) assert isinstance(results, list) assert len(results) == len(emails) for email in emails: assert email in results is_email = utils.parse.is_email(email) assert is_email assert is_email.get("name") # pass the entries in as a list results = utils.parse.parse_emails(emails) assert isinstance(results, list) assert len(results) == len(emails) for email in emails: assert email in results # Pass in some unparseables results = utils.parse.parse_emails("garbage") assert isinstance(results, list) assert len(results) == 1 results = utils.parse.parse_emails("garbage", store_unparseable=False) assert isinstance(results, list) assert len(results) == 0 # Pass in garbage results = utils.parse.parse_emails(object) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_emails(42) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_emails([None, object, 42]) assert isinstance(results, list) assert len(results) == 0 def test_parse_urls(): """utils: parse_urls() testing""" # A simple single array entry (As str) results = utils.parse.parse_urls("") assert isinstance(results, list) assert len(results) == 0 # just delimeters results = utils.parse.parse_urls(", ,, , ,,, ") assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_urls(",") assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_urls(None) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_urls(42) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_urls("this is not a parseable url at all") assert isinstance(results, list) # we still end up returning this assert len(results) == 8 results = utils.parse.parse_urls( "this is not a parseable url at all", store_unparseable=False ) assert isinstance(results, list) assert len(results) == 0 # Now test valid URLs results = utils.parse.parse_urls("windows://") assert isinstance(results, list) assert len(results) == 1 assert "windows://" in results results = utils.parse.parse_urls("windows:// gnome://") assert isinstance(results, list) assert len(results) == 2 assert "windows://" in results assert "gnome://" in results # We don't want to parse out URLs that are part of another URL's arguments results = utils.parse.parse_urls("discord://host?url=https://localhost") assert isinstance(results, list) assert len(results) == 1 assert "discord://host?url=https://localhost" in results # Commas and spaces found inside URLs are ignored urls = [ ( "mailgun://noreply@sandbox.mailgun.org/apikey/" "?to=test@example.com,test2@example.com,," " abcd@example.com" ), ( "mailgun://noreply@sandbox.another.mailgun.org/apikey/" "?to=hello@example.com,,hmmm@example.com,, abcd@example.com, ," ), "windows://", ] # Since comma's and whitespace are the delimiters; they won't be # present at the end of the URL; so we just need to write a special # rstrip() as a regular exression to handle whitespace (\s) and comma # delimiter rstrip_re = re.compile(r"[\s,]+$") # Since a comma acts as a delimiter, we run a risk of a problem where the # comma exists as part of the URL and is therefore lost if it was found # at the end of it. results = utils.parse.parse_urls(", ".join(urls)) assert isinstance(results, list) assert len(results) == len(urls) for url in urls: assert rstrip_re.sub("", url) in results # However if a comma is found at the end of a single url without a new # match to hit, it is saved and not lost # The comma at the end of the password will not be lost if we're # dealing with a single entry: url = "http://hostname?password=,abcd," results = utils.parse.parse_urls(url) assert isinstance(results, list) assert len(results) == 1 assert url in results # however if we have multiple entries, commas and spaces between # URLs will be lost, however the last URL will not lose the comma urls = [ "schema1://hostname?password=,abcd,", "schema2://hostname?password=,abcd,", ] results = utils.parse.parse_urls(", ".join(urls)) assert isinstance(results, list) assert len(results) == len(urls) # No match because the comma is gone in the results entry # schema1://hostname?password=,abcd assert urls[0] not in results assert urls[0][:-1] in results # However we wouldn't have lost the comma in the second one: # schema2://hostname?password=,abcd, assert urls[1] in results # Pass the list in (as a list); results are the same results = utils.parse.parse_urls(urls) assert isinstance(results, list) assert len(results) == len(urls) # Pass in garbage results = utils.parse.parse_urls(object) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_urls(42) assert isinstance(results, list) assert len(results) == 0 results = utils.parse.parse_urls([None, object, 42]) assert isinstance(results, list) assert len(results) == 0 def test_dict_full_update(): """utils: dict_full_update() testing""" dict_1 = { "a": 1, "b": 2, "c": 3, "d": { "z": 27, "y": 26, "x": 25, }, } dict_2 = { "d": { "x": "updated", "w": 24, }, "c": "updated", "e": 5, } utils.logic.dict_full_update(dict_1, dict_2) # Dictionary 2 is untouched assert len(dict_2) == 3 assert dict_2["c"] == "updated" assert dict_2["d"]["w"] == 24 assert dict_2["d"]["x"] == "updated" assert dict_2["e"] == 5 # Dictionary 3 however has entries from Dict 2 applied # without disrupting entries that were not matched. assert len(dict_1) == 5 assert dict_1["a"] == 1 assert dict_1["b"] == 2 assert dict_1["c"] == "updated" assert dict_1["d"]["w"] == 24 assert dict_1["d"]["x"] == "updated" assert dict_1["d"]["y"] == 26 assert dict_1["d"]["z"] == 27 assert dict_1["e"] == 5 def test_parse_list(): """utils: parse_list() testing""" # A simple single array entry (As str) results = utils.parse.parse_list( ".mkv,.avi,.divx,.xvid,.mov,.wmv,.mp4,.mpg,.mpeg,.vob,.iso" ) assert results == sorted([ ".divx", ".iso", ".mkv", ".mov", ".mpg", ".avi", ".mpeg", ".vob", ".xvid", ".wmv", ".mp4", ]) class StrangeObject: def __str__(self): return ".avi" # Now 2 lists with lots of duplicates and other delimiters results = utils.parse.parse_list( ".mkv,.avi,.divx,.xvid,.mov,.wmv,.mp4,.mpg .mpeg,.vob,,; ;", (".mkv,.avi,.divx,.xvid,.mov ", " .wmv,.mp4;.mpg,.mpeg,"), ".vob,.iso", [ ".vob", [ ".vob", ".mkv", StrangeObject(), ], ], StrangeObject(), ) assert results == sorted([ ".divx", ".iso", ".mkv", ".mov", ".mpg", ".avi", ".mpeg", ".vob", ".xvid", ".wmv", ".mp4", ]) # Garbage in is removed assert utils.parse.parse_list(object(), 42, None) == [] # Now a list with extras we want to add as strings # empty entries are removed results = utils.parse.parse_list( [ ".divx", ".iso", ".mkv", ".mov", "", " ", ".avi", ".mpeg", ".vob", ".xvid", ".mp4", ], ".mov,.wmv,.mp4,.mpg", ) assert results == sorted([ ".divx", ".wmv", ".iso", ".mkv", ".mov", ".mpg", ".avi", ".vob", ".xvid", ".mpeg", ".mp4", ]) def test_import_module(tmpdir): """utils: import_module testing""" # Prepare ourselves a file to work with bad_file_base = tmpdir.mkdir("a") bad_file = bad_file_base.join("README.md") bad_file.write(cleandoc(""" I'm a README file, not a Python one. I can't be loaded """)) assert utils.module.import_module(str(bad_file), "invalidfile1") is None assert ( utils.module.import_module(str(bad_file_base), "invalidfile2") is None ) def test_module_detection(tmpdir): """utils: test_module_detection() testing""" # Clear our working variables so they don't obstruct with the tests here N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() # Test case where we load invalid data N_MGR.module_detection(None) # Invalid data does not load anything assert len(N_MGR._paths_previously_scanned) == 0 assert len(N_MGR._custom_module_map) == 0 # Prepare ourselves a file to work with notify_hook_a_base = tmpdir.mkdir("a") notify_hook_a = notify_hook_a_base.join("hook.py") notify_hook_a.write(cleandoc(""" from apprise.decorators import notify @notify(on="clihook") def mywrapper(body, title, notify_type, *args, **kwargs): pass """)) notify_ignore = notify_hook_a_base.join("README.md") notify_ignore.write(cleandoc(""" We're not a .py file, so this file gets gracefully skipped """)) # Not previously loaded assert "clihook" not in N_MGR # load entry by string N_MGR.module_detection(str(notify_hook_a)) N_MGR.module_detection(str(notify_ignore)) N_MGR.module_detection(str(notify_hook_a_base)) assert len(N_MGR._paths_previously_scanned) == 3 assert len(N_MGR._custom_module_map) == 1 # Now loaded assert "clihook" in N_MGR # load entry by array N_MGR.module_detection([str(notify_hook_a)]) # No changes to our path assert len(N_MGR._paths_previously_scanned) == 3 assert len(N_MGR._custom_module_map) == 1 # Reset our variables for the next test N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() # Hidden files are ignored notify_hook_b_base = tmpdir.mkdir("b") notify_hook_b = notify_hook_b_base.join(".hook.py") notify_hook_b.write(cleandoc(""" from apprise.decorators import notify # this is in a hidden file so it will not load @notify(on="hidden") def mywrapper(body, title, notify_type, *args, **kwargs): pass """)) assert "hidden" not in N_MGR N_MGR.module_detection([str(notify_hook_b)]) # Verify that it did not load assert "hidden" not in N_MGR # Path was scanned; nothing loaded assert len(N_MGR._paths_previously_scanned) == 1 assert len(N_MGR._custom_module_map) == 0 # Reset our variables for the next test N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() # modules with no hooks found are ignored notify_hook_c_base = tmpdir.mkdir("c") notify_hook_c = notify_hook_c_base.join("empty.py") notify_hook_c.write("") N_MGR.module_detection([str(notify_hook_c)]) # File was found, no custom modules assert len(N_MGR._paths_previously_scanned) == 1 assert len(N_MGR._custom_module_map) == 0 # A new path scanned N_MGR.module_detection([str(notify_hook_c_base)]) assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 0 def create_hook(tdir, cache=True, on="valid1"): """Just a temporary hook creation tool for writing a working notify hook.""" tdir.write(cleandoc(f""" from apprise.decorators import notify # this is a good hook but burried in hidden directory which won't # be accessed unless the file is pointed to via absolute path @notify(on="{on}") def mywrapper(body, title, notify_type, *args, **kwargs): pass """)) N_MGR.module_detection([str(tdir)], cache=cache) create_hook(notify_hook_c, on="valid1") assert "valid1" not in N_MGR # Even if we correct our empty file; the fact the directory has been # scanned and failed to load (same with file), it won't be loaded # a second time. This is intentional since module_detection() get's # called for every AppriseAsset() object creation. This prevents us # from reloading conent over and over again wasting resources assert "valid1" not in N_MGR N_MGR.module_detection([str(notify_hook_c)]) assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 0 # Even by absolute path... N_MGR.module_detection([str(notify_hook_c)]) assert "valid1" not in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 0 # However we can bypass the cache if we really want to N_MGR.module_detection([str(notify_hook_c_base)], cache=False) assert "valid1" in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 1 # Bypassing it twice causes the module to load twice (not very efficient) # However we can bypass the cache if we really want to N_MGR.module_detection([str(notify_hook_c_base)], cache=False) assert "valid1" in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 1 # If you update the module (corrupting it in the process and reload) notify_hook_c.write(cleandoc(""" raise ValueError """)) # Force no cache to cause the file to be replaced N_MGR.module_detection([str(notify_hook_c_base)], cache=False) # Our valid entry is no longer loaded assert "valid1" not in N_MGR # No change to scanned paths assert len(N_MGR._paths_previously_scanned) == 2 # The previously loaded module is now gone assert len(N_MGR._custom_module_map) == 0 # Reload our valid1 entry create_hook(notify_hook_c, on="valid1", cache=False) assert "valid1" in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 1 # Prepare an empty file notify_hook_c.write("") N_MGR.module_detection([str(notify_hook_c_base)], cache=False) # Our valid entry is no longer loaded assert "valid1" not in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 0 # Now reload our module again (this time rather then an exception, the # module is read back and swaps `valid1` for `valid2` create_hook(notify_hook_c, on="valid1", cache=False) assert "valid1" in N_MGR assert "valid2" not in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 1 create_hook(notify_hook_c, on="valid2", cache=False) assert "valid1" not in N_MGR assert "valid2" in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 1 # Reset our variables for the next test create_hook(notify_hook_c, on="valid1", cache=False) del N_MGR["valid1"] N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() notify_hook_d = notify_hook_c_base.join(".ignore.py") notify_hook_d.write("") notify_hook_e_base = notify_hook_c_base.mkdir(".ignore") notify_hook_e = notify_hook_e_base.join("__init__.py") notify_hook_e.write(cleandoc(""" from apprise.decorators import notify # this is a good hook but burried in hidden directory which won't # be accessed unless the file is pointed to via absolute path @notify(on="valid2") def mywrapper(body, title, notify_type, *args, **kwargs): pass """)) # Try to load our base directory again; this time we search by the # directory; the only edge case we're testing here is it will not # even look at the .ignore.py file found since it is invalid N_MGR.module_detection([str(notify_hook_c_base)]) assert "valid1" in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert len(N_MGR._custom_module_map) == 1 # Reset our variables for the next test del N_MGR._schema_map["valid1"] N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() # Try to load our base directory again N_MGR.module_detection([str(notify_hook_c)]) assert "valid1" in N_MGR # Hidden directories are not scanned assert "valid2" not in N_MGR assert len(N_MGR._paths_previously_scanned) == 1 assert str(notify_hook_c) in N_MGR._paths_previously_scanned assert len(N_MGR._custom_module_map) == 1 # However a direct reference to the hidden directory is okay N_MGR.module_detection([str(notify_hook_e_base)]) # We loaded our module assert "valid2" in N_MGR assert len(N_MGR._paths_previously_scanned) == 3 assert str(notify_hook_c) in N_MGR._paths_previously_scanned assert str(notify_hook_e) in N_MGR._paths_previously_scanned assert str(notify_hook_e_base) in N_MGR._paths_previously_scanned assert len(N_MGR._custom_module_map) == 2 # Reset our variables for the next test del N_MGR._schema_map["valid1"] del N_MGR._schema_map["valid2"] N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() # Load our file directly assert "valid2" not in N_MGR N_MGR.module_detection([str(notify_hook_e)]) # Now we have it loaded as expected assert "valid2" in N_MGR assert len(N_MGR._paths_previously_scanned) == 1 assert str(notify_hook_e) in N_MGR._paths_previously_scanned assert len(N_MGR._custom_module_map) == 1 # however if we try to load the base directory where the __init__.py # was already loaded from, it will not change anything N_MGR.module_detection([str(notify_hook_e_base)]) assert "valid2" in N_MGR assert len(N_MGR._paths_previously_scanned) == 2 assert str(notify_hook_e) in N_MGR._paths_previously_scanned assert str(notify_hook_e_base) in N_MGR._paths_previously_scanned assert len(N_MGR._custom_module_map) == 1 # Tidy up for the next test del N_MGR._schema_map["valid2"] N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() assert "valid1" not in N_MGR assert "valid2" not in N_MGR assert "valid3" not in N_MGR notify_hook_f_base = tmpdir.mkdir("f") notify_hook_f = notify_hook_f_base.join("invalid.py") notify_hook_f.write(cleandoc(""" from apprise.decorators import notify # A very invalid hook type... on should not be None @notify(on=None) def mywrapper(body, title, notify_type, *args, **kwargs): pass # An invalid name @notify(on='valid1', name=None) def mywrapper(body, title, notify_type, *args, **kwargs): pass # Another invalid name (so it's ignored) @notify(on='valid2', name=object) def mywrapper(body, title, notify_type, *args, **kwargs): pass # Simply put... the name has to be a string to be referenced # however this will still be loaded @notify(on='valid3', name=4) def mywrapper(body, title, notify_type, *args, **kwargs): pass """)) N_MGR.module_detection([str(notify_hook_f)]) assert len(N_MGR._paths_previously_scanned) == 1 assert len(N_MGR._custom_module_map) == 1 assert "valid1" in N_MGR assert "valid2" in N_MGR assert "valid3" in N_MGR # Reset our variables for the next test del N_MGR._schema_map["valid1"] del N_MGR._schema_map["valid2"] del N_MGR._schema_map["valid3"] N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() # Now test the handling of just bad data entirely notify_hook_g_base = tmpdir.mkdir("g") notify_hook_g = notify_hook_g_base.join("binary.py") with open(str(notify_hook_g), "wb") as fout: fout.write(os.urandom(512)) N_MGR.module_detection([str(notify_hook_g)]) assert len(N_MGR._paths_previously_scanned) == 1 assert len(N_MGR._custom_module_map) == 0 # Reset our variables before we leave N_MGR._paths_previously_scanned.clear() N_MGR._custom_module_map.clear() def test_exclusive_match(): """utils: is_exclusive_match() testing""" # No Logic always returns True if there is also no data assert utils.logic.is_exclusive_match(data=None, logic=None) is True assert utils.logic.is_exclusive_match(data=None, logic=set()) is True assert utils.logic.is_exclusive_match(data="", logic=set()) is True assert utils.logic.is_exclusive_match(data="", logic=set()) is True # however, once data is introduced, True is no longer returned # if no logic has been specified assert utils.logic.is_exclusive_match(data="check", logic=set()) is False assert ( utils.logic.is_exclusive_match(data=["check", "checkb"], logic=set()) is False ) # String delimters are stripped out so that a list can be formed # the below is just an empty token list assert utils.logic.is_exclusive_match(data=set(), logic=",; ,") is True # garbage logic is never an exclusive match assert utils.logic.is_exclusive_match(data=set(), logic=object()) is False assert ( utils.logic.is_exclusive_match( data=set(), logic=[ object(), ], ) is False ) # # Test with logic: # data = {"abc"} # def in data assert utils.logic.is_exclusive_match(logic="def", data=data) is False # def in data assert ( utils.logic.is_exclusive_match( logic=[ "def", ], data=data, ) is False ) # def in data assert utils.logic.is_exclusive_match(logic=("def",), data=data) is False # def in data assert ( utils.logic.is_exclusive_match( logic={ "def", }, data=data, ) is False ) # abc in data assert ( utils.logic.is_exclusive_match( logic=[ "abc", ], data=data, ) is True ) # abc in data assert utils.logic.is_exclusive_match(logic=("abc",), data=data) is True # abc in data assert ( utils.logic.is_exclusive_match( logic={ "abc", }, data=data, ) is True ) # abc or def in data assert utils.logic.is_exclusive_match(logic="abc, def", data=data) is True # # Update our data set so we can do more advance checks # data = {"abc", "def", "efg", "xyz"} # match_all matches everything assert utils.logic.is_exclusive_match(logic="all", data=data) is True assert utils.logic.is_exclusive_match(logic=["all"], data=data) is True # def and abc in data assert ( utils.logic.is_exclusive_match(logic=[("abc", "def")], data=data) is True ) # cba and abc in data assert ( utils.logic.is_exclusive_match(logic=[("cba", "abc")], data=data) is False ) # www or zzz or abc and xyz assert ( utils.logic.is_exclusive_match( logic=["www", "zzz", ("abc", "xyz")], data=data ) is True ) # www or zzz or abc and xyz (strings are valid too) assert ( utils.logic.is_exclusive_match( logic=["www", "zzz", "abc, xyz"], data=data ) is True ) # www or zzz or abc and jjj assert ( utils.logic.is_exclusive_match( logic=["www", "zzz", ("abc", "jjj")], data=data ) is False ) # # Empty data set # data = set() assert utils.logic.is_exclusive_match(logic=["www"], data=data) is False assert utils.logic.is_exclusive_match(logic="all", data=data) is True # # Update our data set so we can do more advance checks # data = {"always", "entry1"} # We'll always match on the with keyword always assert utils.logic.is_exclusive_match(logic="always", data=data) is True assert utils.logic.is_exclusive_match(logic="garbage", data=data) is True # However we will not match if we turn this feature off assert ( utils.logic.is_exclusive_match( logic="garbage", data=data, match_always=False ) is False ) # Change default value from 'all' to 'match_me'. Logic matches # so we pass assert ( utils.logic.is_exclusive_match( logic="match_me", data=data, match_all="match_me" ) is True ) def test_apprise_validate_regex(): """ API: Apprise() Validate Regex tests """ assert utils.parse.validate_regex(None) is None assert utils.parse.validate_regex(object) is None assert utils.parse.validate_regex(42) is None assert utils.parse.validate_regex("") is None assert utils.parse.validate_regex(" ") is None assert utils.parse.validate_regex("abc") == "abc" # value is a keyword that is extracted (if found) assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[^-]+)-", fmt="{value}" ) == "abcd" ) assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[^-]+)-", strip=False, fmt="{value}" ) == " abcd " ) # String flags supported in addition to numeric assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[^-]+)-", "i", fmt="{value}" ) == "abcd" ) assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[^-]+)-", re.I, fmt="{value}" ) == "abcd" ) # Test multiple flag settings assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[^-]+)-", "isax", fmt="{value}" ) == "abcd" ) # Invalid flags are just ignored. The below fails to match # because the default value of 'i' is over-ridden by what is # identfied below, and no flag is set at the end of the day assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[ABCD]+)-", "-%2gb", fmt="{value}" ) is None ) assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[ABCD]+)-", "", fmt="{value}" ) is None ) assert ( utils.parse.validate_regex( "- abcd -", r"-(?P[ABCD]+)-", None, fmt="{value}" ) is None ) def test_apply_templating(): """utils: apply_template() testing""" template = "Hello {{fname}}, How are you {{whence}}?" result = utils.templates.apply_template( template, **{"fname": "Chris", "whence": "this morning"} ) assert isinstance(result, str) assert result == "Hello Chris, How are you this morning?" # In this example 'whence' isn't provided, so it isn't swapped result = utils.templates.apply_template(template, **{"fname": "Chris"}) assert isinstance(result, str) assert result == "Hello Chris, How are you {{whence}}?" # white space won't cause any ill affects: template = "Hello {{ fname }}, How are you {{ whence}}?" result = utils.templates.apply_template( template, **{"fname": "Chris", "whence": "this morning"} ) assert isinstance(result, str) assert result == "Hello Chris, How are you this morning?" # No arguments won't cause any problems template = "Hello {{fname}}, How are you {{whence}}?" result = utils.templates.apply_template(template) assert isinstance(result, str) assert result == template # Wrong elements are simply ignored result = utils.templates.apply_template( template, **{"fname": "l2g", "whence": "this evening", "ignore": "me"} ) assert isinstance(result, str) assert result == "Hello l2g, How are you this evening?" # Empty template makes things easy result = utils.templates.apply_template( "", **{"fname": "l2g", "whence": "this evening"} ) assert isinstance(result, str) assert result == "" # Regular expressions are safely escapped and act as normal # tokens: template = "Hello {{.*}}, How are you {{[A-Z0-9]+}}?" result = utils.templates.apply_template( template, **{".*": "l2g", "[A-Z0-9]+": "this afternoon"} ) assert result == "Hello l2g, How are you this afternoon?" # JSON is handled too such as escaping quotes template = '{value: "{{ value }}"}' result = utils.templates.apply_template( template, app_mode=utils.templates.TemplateType.JSON, **{"value": '"quotes are escaped"'}, ) assert result == '{value: "\\"quotes are escaped\\""}' def test_cwe312_word(): """utils: cwe312_word() testing""" assert utils.cwe312.cwe312_word(None) is None assert utils.cwe312.cwe312_word(42) == 42 assert utils.cwe312.cwe312_word("") == "" assert utils.cwe312.cwe312_word(" ") == " " assert utils.cwe312.cwe312_word("!") == "!" assert utils.cwe312.cwe312_word("a") == "a" assert utils.cwe312.cwe312_word("ab") == "ab" assert utils.cwe312.cwe312_word("abc") == "abc" assert utils.cwe312.cwe312_word("abcd") == "abcd" assert utils.cwe312.cwe312_word("abcd", force=True) == "a...d" assert utils.cwe312.cwe312_word("abc--d") == "abc--d" assert utils.cwe312.cwe312_word("a-domain.ca") == "a...a" # Variances to still catch domain assert ( utils.cwe312.cwe312_word("a-domain.ca", advanced=False) == "a-domain.ca" ) assert ( utils.cwe312.cwe312_word("a-domain.ca", threshold=6) == "a-domain.ca" ) def test_cwe312_url(): """utils: cwe312_url() testing""" assert utils.cwe312.cwe312_url(None) is None assert utils.cwe312.cwe312_url(42) == 42 assert utils.cwe312.cwe312_url("http://") == "http://" assert utils.cwe312.cwe312_url("discord://") == "discord://" assert utils.cwe312.cwe312_url("path") == "http://path" assert utils.cwe312.cwe312_url("path/") == "http://path/" # Now test http:// private data assert ( utils.cwe312.cwe312_url("http://user:pass123@localhost") == "http://user:p...3@localhost" ) assert ( utils.cwe312.cwe312_url("http://user@localhost") == "http://user@localhost" ) assert ( utils.cwe312.cwe312_url("http://user@localhost?password=abc123") == "http://user@localhost?password=a...3" ) assert ( utils.cwe312.cwe312_url("http://user@localhost?secret=secret-.12345") == "http://user@localhost?secret=s...5" ) assert ( utils.cwe312.cwe312_url( "slack://mybot@xoxb-43598234231-3248932482278" "-BZK5Wj15B9mPh1RkShJoCZ44" "/lead2gold@gmail.com" ) == "slack://mybot@x...4/l...m" ) assert ( utils.cwe312.cwe312_url( "slack://test@B4QP3WWB4/J3QWT41JM/XIl2ffpqXkzkwMXrJdevi7W3/#random" ) == "slack://test@B...4/J...M/X...3/" ) def test_base64_encode_decode(): """Utils:Base64:URLEncode & Decode.""" assert utils.base64.base64_urlencode(None) is None assert utils.base64.base64_urlencode(42) is None assert utils.base64.base64_urlencode(object) is None assert utils.base64.base64_urlencode({}) is None assert utils.base64.base64_urlencode("") is None assert utils.base64.base64_urlencode("abc") is None assert utils.base64.base64_urlencode(b"") == "" assert utils.base64.base64_urlencode(b"abc") == "YWJj" assert utils.base64.base64_urldecode(None) is None assert utils.base64.base64_urldecode(42) is None assert utils.base64.base64_urldecode(object) is None assert utils.base64.base64_urldecode({}) is None assert utils.base64.base64_urldecode("abc") == b"i\xb7" assert utils.base64.base64_urldecode("") == b"" assert utils.base64.base64_urldecode("YWJj") == b"abc" def test_dict_base64_codec(tmpdir): """Test encoding/decoding of base64 content.""" original = { "int": 1, "float": 2.3, } encoded, needs_decoding = utils.base64.encode_b64_dict(original) assert encoded == {"int": "b64:MQ==", "float": "b64:Mi4z"} assert needs_decoding is True decoded = utils.base64.decode_b64_dict(encoded) assert decoded == original with mock.patch("json.dumps", side_effect=TypeError()): encoded, needs_decoding = utils.base64.encode_b64_dict(original) # we failed assert needs_decoding is False assert encoded == { "int": "1", "float": "2.3", } def test_dir_size(tmpdir): """Test dir size tool.""" # Nothing to find/see size, _errors = utils.disk.dir_size(str(tmpdir)) assert size == 0 assert len(_errors) == 0 # Write a file in our root directory tmpdir.join("root.psdata").write("0" * 1024 * 1024) # Prepare some more directories namespace_1 = tmpdir.mkdir("abcdefg") namespace_2 = tmpdir.mkdir("defghij") namespace_2.join("cache.psdata").write("0" * 1024 * 1024) size, _errors = utils.disk.dir_size(str(tmpdir)) assert size == 1024 * 1024 * 2 assert len(_errors) == 0 # Write another file namespace_1.join("cache.psdata").write("0" * 1024 * 1024) size, _errors = utils.disk.dir_size(str(tmpdir)) assert size == 1024 * 1024 * 3 assert len(_errors) == 0 size, _errors = utils.disk.dir_size(str(namespace_1)) assert size == 1024 * 1024 assert len(_errors) == 0 # Create a directory insde one of our namespaces subspace_1 = namespace_1.mkdir("zyx") size, _errors = utils.disk.dir_size(str(namespace_1)) assert size == 1024 * 1024 subspace_1.join("cache.psdata").write("0" * 1024 * 1024) size, _errors = utils.disk.dir_size(str(tmpdir)) assert size == 1024 * 1024 * 4 assert len(_errors) == 0 # Recursion limit reduced... no change at 2 as we can go 2 # diretories deep no problem size, _errors = utils.disk.dir_size(str(tmpdir), max_depth=2) assert size == 1024 * 1024 * 4 assert len(_errors) == 0 size, _errors = utils.disk.dir_size(str(tmpdir), max_depth=1) assert size == 1024 * 1024 * 3 # we can't get into our subspace_1 assert len(_errors) == 1 assert str(subspace_1) in _errors size, _errors = utils.disk.dir_size(str(tmpdir), max_depth=0) assert size == 1024 * 1024 # we can't get into our namespace directories assert len(_errors) == 2 assert str(namespace_1) in _errors assert str(namespace_2) in _errors # Let's cause problems now and test the output size, _errors = utils.disk.dir_size("invalid-directory", missing_okay=True) assert size == 0 assert len(_errors) == 0 size, _errors = utils.disk.dir_size( "invalid-directory", missing_okay=False ) assert size == 0 assert len(_errors) == 1 assert "invalid-directory" in _errors with mock.patch("os.scandir", side_effect=OSError()): size, _errors = utils.disk.dir_size(str(tmpdir), missing_okay=True) assert size == 0 assert len(_errors) == 1 assert str(tmpdir) in _errors with mock.patch("os.scandir") as mock_scandir: mock_entry = mock.MagicMock() mock_entry.is_file.side_effect = OSError() mock_entry.path = "/test/path" # Mock the scandir return value to yield the mock entry mock_scandir.return_value.__enter__.return_value = [mock_entry] size, _errors = utils.disk.dir_size(str(tmpdir)) assert size == 0 assert len(_errors) == 1 assert mock_entry.path in _errors with mock.patch("os.scandir") as mock_scandir: mock_entry = mock.MagicMock() mock_entry.is_file.return_value = False mock_entry.is_dir.side_effect = OSError() mock_entry.path = "/test/path" # Mock the scandir return value to yield the mock entry mock_scandir.return_value.__enter__.return_value = [mock_entry] size, _errors = utils.disk.dir_size(str(tmpdir)) assert len(_errors) == 1 assert mock_entry.path in _errors with mock.patch("os.scandir") as mock_scandir: mock_entry = mock.MagicMock() mock_entry.is_file.return_value = False mock_entry.is_dir.return_value = False # Mock the scandir return value to yield the mock entry mock_scandir.return_value.__enter__.return_value = [mock_entry] size, _errors = utils.disk.dir_size(str(tmpdir)) assert size == 0 assert len(_errors) == 0 with mock.patch("os.scandir") as mock_scandir: mock_entry = mock.MagicMock() mock_entry.is_file.side_effect = FileNotFoundError() mock_entry.path = "/test/path" # Mock the scandir return value to yield the mock entry mock_scandir.return_value.__enter__.return_value = [mock_entry] size, _errors = utils.disk.dir_size(str(tmpdir)) assert size == 0 # No file isn't a problem, we're calculating disksize anyway, # one less thing to calculate assert len(_errors) == 0 def test_bytes_to_str(): """Test Bytes to String representation.""" # Garbage Entry assert utils.disk.bytes_to_str(None) is None assert utils.disk.bytes_to_str("") is None assert utils.disk.bytes_to_str("GARBAGE") is None # Good Entries assert utils.disk.bytes_to_str(0) == "0.00B" assert utils.disk.bytes_to_str(1) == "1.00B" assert utils.disk.bytes_to_str(1.1) == "1.10B" assert utils.disk.bytes_to_str(1024) == "1.00KB" assert utils.disk.bytes_to_str(1024 * 1024) == "1.00MB" assert utils.disk.bytes_to_str(1024 * 1024 * 1024) == "1.00GB" assert utils.disk.bytes_to_str(1024 * 1024 * 1024 * 1024) == "1.00TB" # Support strings too assert utils.disk.bytes_to_str("0") == "0.00B" assert utils.disk.bytes_to_str("1024") == "1.00KB"