# BSD 2-Clause License # # Apprise - Push Notification Library. # Copyright (c) 2025, Chris Caron # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # Disable logging for a cleaner testing output import logging import sys from unittest import mock import pytest from apprise import ( Apprise, AppriseAsset, AppriseConfig, ConfigFormat, ConfigurationManager, ContentIncludeMode, NotificationManager, NotifyFormat, ) from apprise.config import ConfigBase from apprise.config.file import ConfigFile from apprise.plugins import NotifyBase logging.disable(logging.CRITICAL) # Grant access to our Notification Manager Singleton N_MGR = NotificationManager() # Grant access to our Configuration Manager Singleton C_MGR = ConfigurationManager() def test_apprise_config(tmpdir): """ API: AppriseConfig basic testing """ # Create ourselves a config object ac = AppriseConfig() # There are no servers loaded assert len(ac) == 0 # Object can be directly checked as a boolean; response is False # when there are no entries loaded assert not ac # lets try anyway assert len(ac.servers()) == 0 t = tmpdir.mkdir("simple-formatting").join("apprise") t.write(""" # A comment line over top of a URL mailto://usera:pass@gmail.com # A line with mulitiple tag assignments to it taga,tagb=gnome:// # Event if there is accidental leading spaces, this configuation # is accepting of htat and will not exclude them tagc=kde:// # A very poorly structured url sns://:@/ # Just 1 token provided causes exception sns://T1JJ3T3L2/ # XML xml://localhost/?+HeaderEntry=Test&:IgnoredEntry=Ignored """) # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # One configuration file should have been found assert len(ac) == 1 # Object can be directly checked as a boolean; response is True # when there is at least one entry assert ac # We should be able to read our 4 servers from that assert len(ac.servers()) == 4 # Get our URL back assert isinstance(ac[0].url(), str) # Test cases where our URL is invalid t = tmpdir.mkdir("strange-lines").join("apprise") t.write(""" # basicly this consists of defined tags and no url tag= """) # Create ourselves a config object ac = AppriseConfig(paths=str(t), asset=AppriseAsset()) # One configuration file should have been found assert len(ac) == 1 # No urls were set assert len(ac.servers()) == 0 # Create a ConfigBase object cb = ConfigBase() # Test adding of all entries assert ac.add(configs=cb, asset=AppriseAsset(), tag="test") is True # Test adding of all entries assert ( ac.add( configs=[ "file://?", ], asset=AppriseAsset(), tag="test", ) is False ) # Test the adding of garbage assert ac.add(configs=object()) is False # Try again but enforce our format ac = AppriseConfig(paths=f"file://{t!s}?format=text") # One configuration file should have been found assert len(ac) == 1 # No urls were set assert len(ac.servers()) == 0 # # Test Internatialization and the handling of unicode characters # istr = """ # Iñtërnâtiônàlization Testing windows://""" # Write our content to our file t = tmpdir.mkdir("internationalization").join("apprise") with open(str(t), "wb") as f: f.write(istr.encode("latin-1")) # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # One configuration file should have been found assert len(ac) == 1 # This will fail because our default encoding is utf-8; however the file # we opened was not; it was latin-1 and could not be parsed. assert len(ac.servers()) == 0 # Test iterator count = 0 for _entry in ac: count += 1 assert len(ac) == count # We can fix this though; set our encoding to latin-1 ac = AppriseConfig(paths=f"file://{t!s}?encoding=latin-1") # One configuration file should have been found assert len(ac) == 1 # Our URL should be found assert len(ac.servers()) == 1 # Get our URL back assert isinstance(ac[0].url(), str) # pop an entry from our list assert isinstance(ac.pop(0), ConfigBase) # Determine we have no more configuration entries loaded assert len(ac) == 0 # # Test buffer handling (and overflow) t = tmpdir.mkdir("buffer-handling").join("apprise") buf = "gnome://" t.write(buf) # Reset our config object ac.clear() # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # update our length to be the size of our actual file ac[0].max_buffer_size = len(buf) # One configuration file should have been found assert len(ac) == 1 assert len(ac.servers()) == 1 # update our buffer size to be slightly smaller then what we allow ac[0].max_buffer_size = len(buf) - 1 # Content is automatically cached; so even though we adjusted the buffer # above, our results have been cached so we get a 1 response. assert len(ac.servers()) == 1 def test_apprise_multi_config_entries(tmpdir): """ API: AppriseConfig basic multi-adding functionality """ # temporary file to work with t = tmpdir.mkdir("apprise-multi-add").join("apprise") buf = """ good://hostname """ t.write(buf) # temporary empty file to work with te = tmpdir.join("apprise-multi-add", "apprise-empty") te.write("") # Define our good:// url class GoodNotification(NotifyBase): def __init__(self, **kwargs): super().__init__(notify_format=NotifyFormat.HTML, **kwargs) def notify(self, **kwargs): # Pretend everything is okay return True def url(self, **kwargs): # support url() return "" # Store our good notification in our schema map N_MGR._schema_map["good"] = GoodNotification # Create ourselves a config object ac = AppriseConfig() # There are no servers loaded assert len(ac) == 0 # Support adding of muilt strings and objects: assert ac.add(configs=(str(t), str(t))) is True assert ( ac.add(configs=(ConfigFile(path=str(te)), ConfigFile(path=str(t)))) is True ) # don't support the adding of invalid content assert ac.add(configs=(object(), object())) is False assert ac.add(configs=object()) is False # Try to pop an element out of range with pytest.raises(IndexError): ac.server_pop(len(ac.servers())) # Pop our elements while len(ac.servers()) > 0: assert isinstance(ac.server_pop(len(ac.servers()) - 1), NotifyBase) def test_apprise_add_config(): """API AppriseConfig.add_config()""" content = """ # A comment line over top of a URL mailto://usera:pass@gmail.com # A line with mulitiple tag assignments to it taga,tagb=gnome:// # Event if there is accidental leading spaces, this configuation # is accepting of htat and will not exclude them tagc=kde:// # A very poorly structured url sns://:@/ # Just 1 token provided causes exception sns://T1JJ3T3L2/ """ # Create ourselves a config object ac = AppriseConfig() assert ac.add_config(content=content) is True # One configuration file should have been found assert len(ac) == 1 assert ac[0].config_format is ConfigFormat.TEXT # Object can be directly checked as a boolean; response is True # when there is at least one entry assert ac # We should be able to read our 3 servers from that assert len(ac.servers()) == 3 # Get our URL back assert isinstance(ac[0].url(), str) # Test invalid content assert ac.add_config(content=object()) is False assert ac.add_config(content=42) is False assert ac.add_config(content=None) is False # Still only one server loaded assert len(ac) == 1 # Test having a pre-defined asset object and tag created assert ( ac.add_config(content=content, asset=AppriseAsset(), tag="a") is True ) # Now there are 2 servers loaded assert len(ac) == 2 # and 6 urls.. (as we've doubled up) assert len(ac.servers()) == 6 content = """ # A YAML File urls: - mailto://usera:pass@gmail.com - gnome://: tag: taga,tagb - json://localhost: +HeaderEntry1: 'a header entry' -HeaderEntryDepricated: 'a deprecated entry' :HeaderEntryIgnored: 'an ignored header entry' - xml://localhost: +HeaderEntry1: 'a header entry' -HeaderEntryDepricated: 'a deprecated entry' :HeaderEntryIgnored: 'an ignored header entry' """ # Create ourselves a config object ac = AppriseConfig() assert ac.add_config(content=content) is True # One configuration file should have been found assert len(ac) == 1 assert ac[0].config_format is ConfigFormat.YAML # Object can be directly checked as a boolean; response is True # when there is at least one entry assert ac # We should be able to read our 4 servers from that assert len(ac.servers()) == 4 # Now an invalid configuration file content = "invalid" # Create ourselves a config object ac = AppriseConfig() assert ac.add_config(content=content) is False # Nothing is loaded assert len(ac.servers()) == 0 def test_apprise_config_tagging(tmpdir): """ API: AppriseConfig tagging """ # temporary file to work with t = tmpdir.mkdir("tagging").join("apprise") buf = "gnome://" t.write(buf) # Create ourselves a config object ac = AppriseConfig() # Add an item associated with tag a assert ac.add(configs=str(t), asset=AppriseAsset(), tag="a") is True # Add an item associated with tag b assert ac.add(configs=str(t), asset=AppriseAsset(), tag="b") is True # Add an item associated with tag a or b assert ac.add(configs=str(t), asset=AppriseAsset(), tag="a,b") is True # Now filter: a: assert len(ac.servers(tag="a")) == 2 # Now filter: a or b: assert len(ac.servers(tag="a,b")) == 3 # Now filter: a and b assert len(ac.servers(tag=[("a", "b")])) == 1 # all matches everything assert len(ac.servers(tag="all")) == 3 # Test cases using the `always` keyword # Create ourselves a config object ac = AppriseConfig() # Add an item associated with tag a assert ac.add(configs=str(t), asset=AppriseAsset(), tag="a,always") is True # Add an item associated with tag b assert ac.add(configs=str(t), asset=AppriseAsset(), tag="b") is True # Add an item associated with tag a or b assert ac.add(configs=str(t), asset=AppriseAsset(), tag="c,d") is True # Now filter: a: assert len(ac.servers(tag="a")) == 1 # Now filter: a or b: assert len(ac.servers(tag="a,b")) == 2 # Now filter: e # we'll match the `always' assert len(ac.servers(tag="e")) == 1 assert len(ac.servers(tag="e", match_always=False)) == 0 # all matches everything assert len(ac.servers(tag="all")) == 3 # Now filter: d # we'll match the `always' tag assert len(ac.servers(tag="d")) == 2 assert len(ac.servers(tag="d", match_always=False)) == 1 def test_apprise_config_instantiate(): """ API: AppriseConfig.instantiate() """ assert ( AppriseConfig.instantiate("file://?", suppress_exceptions=True) is None ) assert ( AppriseConfig.instantiate("invalid://?", suppress_exceptions=True) is None ) class BadConfig(ConfigBase): # always allow incusion allow_cross_includes = ContentIncludeMode.ALWAYS def __init__(self, **kwargs): super().__init__(**kwargs) # We fail whenever we're initialized raise TypeError() @staticmethod def parse_url(url, *args, **kwargs): # always parseable return ConfigBase.parse_url(url, verify_host=False) # Store our bad configuration in our schema map C_MGR["bad"] = BadConfig with pytest.raises(TypeError): AppriseConfig.instantiate("bad://path", suppress_exceptions=False) # Same call but exceptions suppressed assert ( AppriseConfig.instantiate("bad://path", suppress_exceptions=True) is None ) def test_invalid_apprise_config(tmpdir): """Parse invalid configuration includes.""" class BadConfig(ConfigBase): # always allow incusion allow_cross_includes = ContentIncludeMode.ALWAYS def __init__(self, **kwargs): super().__init__(**kwargs) # We intentionally fail whenever we're initialized raise TypeError() @staticmethod def parse_url(url, *args, **kwargs): # always parseable return ConfigBase.parse_url(url, verify_host=False) # Store our bad configuration in our schema map C_MGR["bad"] = BadConfig # temporary file to work with t = tmpdir.mkdir("apprise-bad-obj").join("invalid") buf = f""" # Include an invalid schema include invalid:// # An unparsable valid schema include https:// # A valid configuration that will throw an exception include bad:// # Include ourselves (So our recursive includes fails as well) include {t!s} """ t.write(buf) # Create ourselves a config object with caching disbled ac = AppriseConfig(recursion=2, insecure_includes=True, cache=False) # Nothing loaded yet assert len(ac) == 0 # Add our config assert ac.add(configs=str(t), asset=AppriseAsset()) is True # One configuration file assert len(ac) == 1 # All of the servers were invalid and would not load assert len(ac.servers()) == 0 def test_apprise_config_with_apprise_obj(tmpdir): """ API: ConfigBase - parse valid config """ # temporary file to work with t = tmpdir.mkdir("apprise-obj").join("apprise") buf = """ good://hostname localhost=good://localhost """ t.write(buf) # Define our good:// url class GoodNotification(NotifyBase): def __init__(self, **kwargs): super().__init__(notify_format=NotifyFormat.HTML, **kwargs) def notify(self, **kwargs): # Pretend everything is okay return True def url(self, **kwargs): # support url() return "" # Store our good notification in our schema map N_MGR._schema_map["good"] = GoodNotification # Create ourselves a config object ac = AppriseConfig(cache=False) # Nothing loaded yet assert len(ac) == 0 # Add an item associated with tag a assert ac.add(configs=str(t), asset=AppriseAsset(), tag="a") is True # One configuration file assert len(ac) == 1 # 2 services found in it assert len(ac.servers()) == 2 # Pop one of them (at index 0) ac.server_pop(0) # Verify that it no longer listed assert len(ac.servers()) == 1 # Test our ability to add Config objects to our apprise object a = Apprise() # Add our configuration object assert a.add(servers=ac) is True # Detect our 1 entry (originally there were 2 but we deleted one) assert len(a) == 1 # Notify our service assert a.notify(body="apprise configuration power!") is True # Add our configuration object assert ( a.add(servers=[AppriseConfig(str(t)), AppriseConfig(str(t))]) is True ) # Detect our 5 loaded entries now; 1 from first config, and another # 2x2 based on adding our list above assert len(a) == 5 # We can't add garbage assert a.add(servers=object()) is False assert a.add(servers=[object(), object()]) is False # Our length is unchanged assert len(a) == 5 # reference index 0 of our list ref = a[0] assert isinstance(ref, NotifyBase) # Our length is unchanged assert len(a) == 5 # pop the index ref_popped = a.pop(0) # Verify our response assert isinstance(ref_popped, NotifyBase) # Our length drops by 1 assert len(a) == 4 # Content popped is the same as one referenced by index # earlier assert ref == ref_popped # pop an index out of range with pytest.raises(IndexError): a.pop(len(a)) # Our length remains unchanged assert len(a) == 4 # Reference content out of range with pytest.raises(IndexError): a[len(a)] # reference index at the end of our list ref = a[len(a) - 1] # Verify our response assert isinstance(ref, NotifyBase) # Our length stays the same assert len(a) == 4 # We can pop from the back of the list without a problem too ref_popped = a.pop(len(a) - 1) # Verify our response assert isinstance(ref_popped, NotifyBase) # Content popped is the same as one referenced by index # earlier assert ref == ref_popped # Our length drops by 1 assert len(a) == 3 # Now we'll test adding another element to the list so that it mixes up # our response object. # Below we add 3 different types, a ConfigBase, NotifyBase, and URL assert ( a.add( servers=[ ConfigFile(path=(str(t))), "good://another.host", GoodNotification(**{"host": "nuxref.com"}), ] ) is True ) # Our length increases by 4 (2 entries in the config file, + 2 others) assert len(a) == 7 # reference index at the end of our list ref = a[len(a) - 1] # Verify our response assert isinstance(ref, NotifyBase) # We can pop from the back of the list without a problem too ref_popped = a.pop(len(a) - 1) # Verify our response assert isinstance(ref_popped, NotifyBase) # Content popped is the same as one referenced by index # earlier assert ref == ref_popped # Our length drops by 1 assert len(a) == 6 # pop our list while len(a) > 0: assert isinstance(a.pop(len(a) - 1), NotifyBase) def test_recursive_config_inclusion(tmpdir): """ API: Apprise() Recursive Config Inclusion """ # To test our config classes, we make three dummy configs class ConfigCrossPostAlways(ConfigFile): """A dummy config that is set to always allow inclusion.""" service_name = "always" # protocol protocol = "always" # Always type allow_cross_includes = ContentIncludeMode.ALWAYS class ConfigCrossPostStrict(ConfigFile): """A dummy config that is set to strict inclusion.""" service_name = "strict" # protocol protocol = "strict" # Always type allow_cross_includes = ContentIncludeMode.STRICT class ConfigCrossPostNever(ConfigFile): """A dummy config that is set to never allow inclusion.""" service_name = "never" # protocol protocol = "never" # Always type allow_cross_includes = ContentIncludeMode.NEVER # store our entries C_MGR["never"] = ConfigCrossPostNever C_MGR["strict"] = ConfigCrossPostStrict C_MGR["always"] = ConfigCrossPostAlways # Make our new path valid suite = tmpdir.mkdir("apprise_config_recursion") cfg01 = suite.join("cfg01.cfg") cfg02 = suite.mkdir("dir1").join("cfg02.cfg") cfg03 = suite.mkdir("dir2").join("cfg03.cfg") cfg04 = suite.mkdir("dir3").join("cfg04.cfg") # Populate our files with valid configuration include lines cfg01.write(f""" # json entry json://localhost:8080 # absolute path inclusion to ourselves include {cfg01!s}""") cfg02.write(""" # json entry json://localhost:8080 # recursively include ourselves include cfg02.cfg""") cfg03.write(""" # xml entry xml://localhost:8080 # relative path inclusion include ../dir1/cfg02.cfg # test that we can't include invalid entries include invalid://entry # Include non includable type include memory://""") cfg04.write(f""" # xml entry xml://localhost:8080 # always include of our file include always://{cfg04!s} # never include of our file include never://{cfg04!s} # strict include of our file include strict://{cfg04!s}""") # Create ourselves a config object ac = AppriseConfig() # There are no servers loaded assert len(ac) == 0 # load our configuration assert ac.add(configs=str(cfg01)) is True # verify it loaded assert len(ac) == 1 # 1 service will be loaded as there is no recursion at this point assert len(ac.servers()) == 1 # Create ourselves a config object ac = AppriseConfig(recursion=1) # load our configuration assert ac.add(configs=str(cfg01)) is True # verify one configuration file loaded however since it recursively # loaded itself 1 more time, it still doesn't impact the load count: assert len(ac) == 1 # 2 services loaded now that we loaded the same file twice assert len(ac.servers()) == 2 # # Now we test relative file inclusion # # Create ourselves a config object ac = AppriseConfig(recursion=10) # There are no servers loaded assert len(ac) == 0 # load our configuration assert ac.add(configs=str(cfg02)) is True # verify it loaded assert len(ac) == 1 # 11 services loaded because we reloaded ourselves 10 times # after loading the first entry assert len(ac.servers()) == 11 # Test our include modes (strict, always, and never) # Create ourselves a config object ac = AppriseConfig(recursion=1) # There are no servers loaded assert len(ac) == 0 # load our configuration assert ac.add(configs=str(cfg04)) is True # verify it loaded assert len(ac) == 1 # 2 servers loaded # 1 - from the file read (which is set at mode STRICT # 1 - from the always:// # # The never:// can ever be includeed, and the strict:// is ot of type # file:// (the one doing the include) so it is also ignored. # # By turning on the insecure_includes, we can include the strict files too assert len(ac.servers()) == 2 # Create ourselves a config object ac = AppriseConfig(recursion=1, insecure_includes=True) # There are no servers loaded assert len(ac) == 0 # load our configuration assert ac.add(configs=str(cfg04)) is True # verify it loaded assert len(ac) == 1 # 3 servers loaded # 1 - from the file read (which is set at mode STRICT # 1 - from the always:// # 1 - from the strict:// (due to insecure_includes set) assert len(ac.servers()) == 3 def test_apprise_config_file_loading(tmpdir): """ API: AppriseConfig() URL Testing """ config_path = tmpdir / "apprise.yml" # Create a temporary config file config_path.write("urls:\n - json://localhost") # Flow from README.md ap = Apprise() ap.add("xml://localhost") config = AppriseConfig() config.add(str(config_path)) ap.add(config) # Using urls() assert len(ap.urls()) == 2 def test_apprise_config_matrix_load(): """ API: AppriseConfig() matrix initialization """ import apprise class ConfigDummy(ConfigBase): """A dummy wrapper for testing the different options in the load_matrix function.""" # The default descriptive name associated with the Notification service_name = "dummy" # protocol as tuple protocol = ("uh", "oh") # secure protocol as tuple secure_protocol = ("no", "yes") class ConfigDummy2(ConfigBase): """A dummy wrapper for testing the different options in the load_matrix function.""" # The default descriptive name associated with the Notification service_name = "dummy2" # secure protocol as tuple secure_protocol = ("true", "false") class ConfigDummy3(ConfigBase): """A dummy wrapper for testing the different options in the load_matrix function.""" # The default descriptive name associated with the Notification service_name = "dummy3" # secure protocol as string secure_protocol = "true" class ConfigDummy4(ConfigBase): """A dummy wrapper for testing the different options in the load_matrix function.""" # The default descriptive name associated with the Notification service_name = "dummy4" # protocol as string protocol = "true" # Generate ourselves a fake entry apprise.config.ConfigDummy = ConfigDummy apprise.config.ConfigDummy2 = ConfigDummy2 apprise.config.ConfigDummy3 = ConfigDummy3 apprise.config.ConfigDummy4 = ConfigDummy4 def test_configmatrix_dynamic_importing(tmpdir): """ API: Apprise() Config Matrix Importing """ # Make our new path valid suite = tmpdir.mkdir("apprise_config_test_suite") suite.join("__init__.py").write("") module_name = "badconfig" # Update our path to point to our new test suite sys.path.insert(0, str(suite)) # Create a base area to work within base = suite.mkdir(module_name) base.join("__init__.py").write("") # Test no app_id base.join("ConfigBadFile1.py").write(""" class ConfigBadFile1: pass""") # No class of the same name base.join("ConfigBadFile2.py").write(""" class BadClassName: pass""") # Exception thrown base.join("ConfigBadFile3.py").write("""raise ImportError()""") # Utilizes a schema:// already occupied (as string) base.join("ConfigGoober.py").write(""" from apprise.config import ConfigBase class ConfigGoober(ConfigBase): # This class tests the fact we have a new class name, but we're # trying to over-ride items previously used # The default simple (insecure) protocol (used by ConfigHTTP) protocol = ('http', 'goober') # The default secure protocol (used by ConfigHTTP) secure_protocol = 'https' @staticmethod def parse_url(url, *args, **kwargs): # always parseable return ConfigBase.parse_url(url, verify_host=False)""") # Utilizes a schema:// already occupied (as tuple) base.join("ConfigBugger.py").write(""" from apprise.config import ConfigBase class ConfigBugger(ConfigBase): # This class tests the fact we have a new class name, but we're # trying to over-ride items previously used # The default simple (insecure) protocol (used by ConfigHTTP), the other # isn't protocol = ('http', 'bugger-test' ) # The default secure protocol (used by ConfigHTTP), the other isn't secure_protocol = ('https', ['garbage']) @staticmethod def parse_url(url, *args, **kwargs): # always parseable return ConfigBase.parse_url(url, verify_host=False)""") @mock.patch("os.path.getsize") def test_config_base_parse_inaccessible_text_file(mock_getsize, tmpdir): """ API: ConfigBase.parse_inaccessible_text_file """ # temporary file to work with t = tmpdir.mkdir("inaccessible").join("apprise") buf = "gnome://" t.write(buf) # Set getsize return value mock_getsize.return_value = None mock_getsize.side_effect = OSError # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # The following internally throws an exception but still counts # as a loaded configuration file assert len(ac) == 1 # Thus no notifications are loaded assert len(ac.servers()) == 0 def test_config_base_parse_yaml_file01(tmpdir): """ API: ConfigBase.parse_yaml_file (#1) """ t = tmpdir.mkdir("empty-file").join("apprise.yml") t.write("") # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # The number of configuration files that exist assert len(ac) == 1 # no notifications are loaded assert len(ac.servers()) == 0 def test_config_base_parse_yaml_file02(tmpdir): """ API: ConfigBase.parse_yaml_file (#2) """ t = tmpdir.mkdir("matching-tags").join("apprise.yml") t.write("""urls: - pover://nsisxnvnqixq39t0cw54pxieyvtdd9@2jevtmstfg5a7hfxndiybasttxxfku: - tag: test1 - pover://rg8ta87qngcrkc6t4qbykxktou0uug@tqs3i88xlufexwl8t4asglt4zp5wfn: - tag: test2 - pover://jcqgnlyq2oetea4qg3iunahj8d5ijm@evalvutkhc8ipmz2lcgc70wtsm0qpb: - tag: test3""") # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # The number of configuration files that exist assert len(ac) == 1 # no notifications are loaded assert len(ac.servers()) == 3 # Test our ability to add Config objects to our apprise object a = Apprise() # Add our configuration object assert a.add(servers=ac) is True # Detect our 3 entry as they should have loaded successfully assert len(a) == 3 # No match assert sum(1 for _ in a.find("no-match")) == 0 # Match everything assert sum(1 for _ in a.find("all")) == 3 # Match test1 entry assert sum(1 for _ in a.find("test1")) == 1 # Match test2 entry assert sum(1 for _ in a.find("test2")) == 1 # Match test3 entry assert sum(1 for _ in a.find("test3")) == 1 # Match test1 or test3 entry assert sum(1 for _ in a.find("test1, test3")) == 2 def test_config_base_parse_yaml_file03(tmpdir): """ API: ConfigBase.parse_yaml_file (#3) """ t = tmpdir.mkdir("bad-first-entry").join("apprise.yml") # The first entry is -tag and not tag # The element is therefore not picked up; This causes us to display # some warning messages to the screen complaining of this typo yet # still allowing us to load the URL since it is valid t.write("""urls: - pover://nsisxnvnqixq39t0cw54pxieyvtdd9@2jevtmstfg5a7hfxndiybasttxxfku: -tag: test1 - pover://rg8ta87qngcrkc6t4qbykxktou0uug@tqs3i88xlufexwl8t4asglt4zp5wfn: - tag: test2 - pover://jcqgnlyq2oetea4qg3iunahj8d5ijm@evalvutkhc8ipmz2lcgc70wtsm0qpb: - tag: test3""") # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # The number of configuration files that exist assert len(ac) == 1 # no notifications lines processed is 3 assert len(ac.servers()) == 3 # Test our ability to add Config objects to our apprise object a = Apprise() # Add our configuration object assert a.add(servers=ac) is True # Detect our 3 entry as they should have loaded successfully assert len(a) == 3 # No match assert sum(1 for _ in a.find("no-match")) == 0 # Match everything assert sum(1 for _ in a.find("all")) == 3 # No match for bad entry assert sum(1 for _ in a.find("test1")) == 0 # Match test2 entry assert sum(1 for _ in a.find("test2")) == 1 # Match test3 entry assert sum(1 for _ in a.find("test3")) == 1 # Match test1 or test3 entry; (only matches test3) assert sum(1 for _ in a.find("test1, test3")) == 1 def test_config_base_parse_yaml_file04(tmpdir): """ API: ConfigBase.parse_yaml_file (#4) Test the always keyword """ t = tmpdir.mkdir("always-keyword").join("apprise.yml") t.write("""urls: - pover://nsisxnvnqixq39t0cw54pxieyvtdd9@2jevtmstfg5a7hfxndiybasttxxfku: - tag: test1,always - pover://rg8ta87qngcrkc6t4qbykxktou0uug@tqs3i88xlufexwl8t4asglt4zp5wfn: - tag: test2 - pover://jcqgnlyq2oetea4qg3iunahj8d5ijm@evalvutkhc8ipmz2lcgc70wtsm0qpb: - tag: test3""") # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # The number of configuration files that exist assert len(ac) == 1 # no notifications are loaded assert len(ac.servers()) == 3 # Test our ability to add Config objects to our apprise object a = Apprise() # Add our configuration object assert a.add(servers=ac) is True # Detect our 3 entry as they should have loaded successfully assert len(a) == 3 # No match still matches `always` keyword assert sum(1 for _ in a.find("no-match")) == 1 # Unless we explicitly do not look for that file assert sum(1 for _ in a.find("no-match", match_always=False)) == 0 # Match everything assert sum(1 for _ in a.find("all")) == 3 # Match test1 entry (also has `always` keyword assert sum(1 for _ in a.find("test1")) == 1 assert sum(1 for _ in a.find("test1", match_always=False)) == 1 # Match test2 entry (and test1 due to always keyword) assert sum(1 for _ in a.find("test2")) == 2 assert sum(1 for _ in a.find("test2", match_always=False)) == 1 # Match test3 entry (and test1 due to always keyword) assert sum(1 for _ in a.find("test3")) == 2 assert sum(1 for _ in a.find("test3", match_always=False)) == 1 # Match test1 or test3 entry assert sum(1 for _ in a.find("test1, test3")) == 2 def test_apprise_config_template_parse(tmpdir): """ API: AppriseConfig parsing of templates """ # Create ourselves a config object ac = AppriseConfig() t = tmpdir.mkdir("template-testing").join("apprise.yml") t.write(""" tag: - company # A comment line over top of a URL urls: - mailto://user:pass@example.com: - to: user1@gmail.com cc: test@hotmail.com - to: user2@gmail.com tag: co-worker """) # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # 2 emails to be sent assert len(ac.servers()) == 2 # The below checks are very customized for NotifyMail but just # test that the content got passed correctly assert (False, "user1@gmail.com") in ac[0][0].targets assert "test@hotmail.com" in ac[0][0].cc assert "company" in ac[0][1].tags assert (False, "user2@gmail.com") in ac[0][1].targets assert "company" in ac[0][1].tags assert "co-worker" in ac[0][1].tags # # Specifically test _special_token_handler() # tokens = { # This maps to itself (bcc); no change here "bcc": "user@test.com", # This should get mapped to 'targets' "to": "user1@abc.com", # white space and tab is intentionally added to the end to verify we # do not play/tamper with information "targets": "user2@abc.com, user3@abc.com \t", # If the end user provides a configuration for data we simply don't use # this isn't a proble... we simply don't touch it either; we leave it # as is. "ignore": "not-used", } result = ConfigBase._special_token_handler("mailto", tokens) # to gets mapped to targets assert "to" not in result # bcc is allowed here assert "bcc" in result assert "targets" in result # Not used, but also not touched; this entry should still be in our result # set assert "ignore" in result # We'll concatinate all of our targets together assert len(result["targets"]) == 2 assert "user1@abc.com" in result["targets"] # Content is passed as is assert "user2@abc.com, user3@abc.com \t" in result["targets"] # We re-do the simmiar test above. The very key difference is the # `targets` is a list already (it's expected type) so `to` can properly be # concatinated into the list vs the above (which tries to correct the # situation) tokens = { # This maps to itself (bcc); no change here "bcc": "user@test.com", # This should get mapped to 'targets' "to": "user1@abc.com", # similar to the above test except targets is now a proper # dictionary allowing the `to` (when translated to `targets`) to get # appended to it "targets": ["user2@abc.com", "user3@abc.com"], # If the end user provides a configuration for data we simply don't use # this isn't a proble... we simply don't touch it either; we leave it # as is. "ignore": "not-used", } result = ConfigBase._special_token_handler("mailto", tokens) # to gets mapped to targets assert "to" not in result # bcc is allowed here assert "bcc" in result assert "targets" in result # Not used, but also not touched; this entry should still be in our result # set assert "ignore" in result # Now we'll see the new user added as expected (concatinated into our list) assert len(result["targets"]) == 3 assert "user1@abc.com" in result["targets"] assert "user2@abc.com" in result["targets"] assert "user3@abc.com" in result["targets"] # Test providing a list t.write(""" # A comment line over top of a URL urls: - mailtos://user:pass@example.com: - smtp: smtp3-dev.google.gmail.com to: - John Smith - Jason Tater - user3@gmail.com - to: Henry Fisher , Jason Archie smtp_host: smtp5-dev.google.gmail.com tag: drinking-buddy # provide case where the URL includes some input too # In both of these cases, the cc and targets (to) get over-ridden # by values below - mailtos://user:pass@example.com/arnold@imdb.com/?cc=bill@micro.com/: to: - override01@gmail.com cc: - override02@gmail.com - sinch://: - spi: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa token: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb # Test a case where we expect a string, but yaml reads it in as # a number from: 10005243890 to: +1(123)555-1234 """) # Create ourselves a config object ac = AppriseConfig(paths=str(t)) # 2 emails to be sent and 1 Sinch service call assert len(ac.servers()) == 4 # Verify our users got placed into the to assert len(ac[0][0].targets) == 3 assert ("John Smith", "user1@gmail.com") in ac[0][0].targets assert ("Jason Tater", "user2@gmail.com") in ac[0][0].targets assert (False, "user3@gmail.com") in ac[0][0].targets assert ac[0][0].smtp_host == "smtp3-dev.google.gmail.com" assert len(ac[0][1].targets) == 2 assert ("Henry Fisher", "user4@gmail.com") in ac[0][1].targets assert ("Jason Archie", "user5@gmail.com") in ac[0][1].targets assert "drinking-buddy" in ac[0][1].tags assert ac[0][1].smtp_host == "smtp5-dev.google.gmail.com" # Our third test tests cases where some variables are defined inline # and additional ones are defined below that share the same token space assert len(ac[0][2].targets) == 1 assert len(ac[0][2].cc) == 1 assert (False, "override01@gmail.com") in ac[0][2].targets assert "override02@gmail.com" in ac[0][2].cc # Test our Since configuration now: assert len(ac[0][3].targets) == 1 assert ac[0][3].service_plan_id == "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" assert ac[0][3].source == "+10005243890" assert ac[0][3].targets[0] == "+11235551234"