diff --git a/README.md b/README.md index 03258f01..5cbda9a2 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ The table below identifies the services this tool supports and some example serv | -------------------- | ---------- | ------------ | -------------- | | [AWS SNS](https://github.com/caronc/apprise/wiki/Notify_sns) | sns:// | (TCP) 443 | sns://AccessKeyID/AccessSecretKey/RegionName/+PhoneNo
sns://AccessKeyID/AccessSecretKey/RegionName/+PhoneNo1/+PhoneNo2/+PhoneNoN
sns://AccessKeyID/AccessSecretKey/RegionName/Topic
sns://AccessKeyID/AccessSecretKey/RegionName/Topic1/Topic2/TopicN | [ClickSend](https://github.com/caronc/apprise/wiki/Notify_clicksend) | clicksend:// | (TCP) 443 | clicksend://user:pass@PhoneNo
clicksend://user:pass@ToPhoneNo1/ToPhoneNo2/ToPhoneNoN +| [DAPNET](https://github.com/caronc/apprise/wiki/Notify_dapnet) | dapnet:// | (TCP) 80 | dapnet://user:pass@callsign
dapnet://user:pass@callsign1/callsign2/callsignN | [D7 Networks](https://github.com/caronc/apprise/wiki/Notify_d7networks) | d7sms:// | (TCP) 443 | d7sms://user:pass@PhoneNo
d7sms://user:pass@ToPhoneNo1/ToPhoneNo2/ToPhoneNoN | [DingTalk](https://github.com/caronc/apprise/wiki/Notify_dingtalk) | dingtalk:// | (TCP) 443 | dingtalk://token/
dingtalk://token/ToPhoneNo
dingtalk://token/ToPhoneNo1/ToPhoneNo2/ToPhoneNo1/ | [Kavenegar](https://github.com/caronc/apprise/wiki/Notify_kavenegar) | kavenegar:// | (TCP) 443 | kavenegar://ApiKey/ToPhoneNo
kavenegar://FromPhoneNo@ApiKey/ToPhoneNo
kavenegar://ApiKey/ToPhoneNo1/ToPhoneNo2/ToPhoneNoN diff --git a/apprise/plugins/NotifyDapnet.py b/apprise/plugins/NotifyDapnet.py new file mode 100644 index 00000000..ee5f0409 --- /dev/null +++ b/apprise/plugins/NotifyDapnet.py @@ -0,0 +1,393 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2019 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +# To use this plugin, sign up with Hampager (you need to be a licensed +# ham radio operator +# http://www.hampager.de/ +# +# You're done at this point, you only need to know your user/pass that +# you signed up with. + +# The following URLs would be accepted by Apprise: +# - dapnet://{user}:{password}@{callsign} +# - dapnet://{user}:{password}@{callsign1}/{callsign2} + +# Optional parameters: +# - priority (NORMAL or EMERGENCY). Default: NORMAL +# - txgroups --> comma-separated list of DAPNET transmitter +# groups. Default: 'dl-all' +# https://hampager.de/#/transmitters/groups + +from json import dumps + +# The API reference used to build this plugin was documented here: +# https://hampager.de/dokuwiki/doku.php#dapnet_api +# +import requests +from requests.auth import HTTPBasicAuth + +from .NotifyBase import NotifyBase +from ..AppriseLocale import gettext_lazy as _ +from ..URLBase import PrivacyMode +from ..common import NotifyType +from ..utils import is_call_sign +from ..utils import parse_call_sign +from ..utils import parse_list +from ..utils import parse_bool + + +class DapnetPriority(object): + NORMAL = 0 + EMERGENCY = 1 + + +DAPNET_PRIORITIES = ( + DapnetPriority.NORMAL, + DapnetPriority.EMERGENCY, +) + + +class NotifyDapnet(NotifyBase): + """ + A wrapper for DAPNET / Hampager Notifications + """ + + # The default descriptive name associated with the Notification + service_name = 'Dapnet' + + # The services URL + service_url = 'https://hampager.de/' + + # The default secure protocol + secure_protocol = 'dapnet' + + # A URL that takes you to the setup/help of the specific protocol + setup_url = 'https://github.com/caronc/apprise/wiki/Notify_dapnet' + + # Dapnet uses the http protocol with JSON requests + notify_url = 'http://www.hampager.de:8080/calls' + + # The maximum length of the body + body_maxlen = 80 + + # A title can not be used for Dapnet Messages. Setting this to zero will + # cause any title (if defined) to get placed into the message body. + title_maxlen = 0 + + # The maximum amount of emails that can reside within a single transmission + default_batch_size = 50 + + # Define object templates + templates = ('{schema}://{user}:{password}@{targets}',) + + # Define our template tokens + template_tokens = dict( + NotifyBase.template_tokens, + **{ + 'user': { + 'name': _('User Name'), + 'type': 'string', + 'required': True, + }, + 'password': { + 'name': _('Password'), + 'type': 'string', + 'private': True, + 'required': True, + }, + 'target_callsign': { + 'name': _('Target Callsign'), + 'type': 'string', + 'regex': ( + r'^[a-z0-9]{2,5}(-[a-z0-9]{1,2})?$', 'i', + ), + 'map_to': 'targets', + }, + 'targets': { + 'name': _('Targets'), + 'type': 'list:string', + 'required': True, + }, + } + ) + + # Define our template arguments + template_args = dict( + NotifyBase.template_args, + **{ + 'to': { + 'name': _('Target Callsign'), + 'type': 'string', + 'map_to': 'targets', + }, + 'priority': { + 'name': _('Priority'), + 'type': 'choice:int', + 'values': DAPNET_PRIORITIES, + 'default': DapnetPriority.NORMAL, + }, + 'txgroups': { + 'name': _('Transmitter Groups'), + 'type': 'string', + 'default': 'dl-all', + 'private': True, + }, + 'batch': { + 'name': _('Batch Mode'), + 'type': 'bool', + 'default': False, + }, + } + ) + + def __init__(self, targets=None, priority=None, txgroups=None, + batch=False, **kwargs): + """ + Initialize Dapnet Object + """ + super(NotifyDapnet, self).__init__(**kwargs) + + # Parse our targets + self.targets = list() + + # get the emergency prio setting + if priority not in DAPNET_PRIORITIES: + self.priority = self.template_args['priority']['default'] + else: + self.priority = priority + + if not (self.user and self.password): + msg = 'A Dapnet user/pass was not provided.' + self.logger.warning(msg) + raise TypeError(msg) + + # Get the transmitter group + self.txgroups = parse_list( + NotifyDapnet.template_args['txgroups']['default'] + if not txgroups else txgroups) + + # Prepare Batch Mode Flag + self.batch = batch + + for target in parse_call_sign(targets): + # Validate targets and drop bad ones: + result = is_call_sign(target) + if not result: + self.logger.warning( + 'Dropping invalid Amateur radio call sign ({}).'.format( + target), + ) + continue + + # Store callsign + self.targets.append(result['callsign']) + + return + + def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): + """ + Perform Dapnet Notification + """ + + if not self.targets: + # There is no one to email; we're done + self.logger.warning( + 'There are no Amateur radio callsigns to notify') + return False + + # Send in batches if identified to do so + batch_size = 1 if not self.batch else self.default_batch_size + + headers = { + 'User-Agent': self.app_id, + 'Content-Type': 'application/json; charset=utf-8', + } + + # error tracking (used for function return) + has_error = False + + # prepare the emergency mode + emergency_mode = True \ + if self.priority == DapnetPriority.EMERGENCY else False + + # Create a copy of the targets list + targets = list(self.targets) + + for index in range(0, len(targets), batch_size): + + # prepare JSON payload + payload = { + 'text': body, + 'callSignNames': targets[index:index + batch_size], + 'transmitterGroupNames': self.txgroups, + 'emergency': emergency_mode, + } + + self.logger.debug('DAPNET POST URL: %s' % self.notify_url) + self.logger.debug('DAPNET Payload: %s' % dumps(payload)) + + # Always call throttle before any remote server i/o is made + self.throttle() + try: + r = requests.post( + self.notify_url, + data=dumps(payload), + headers=headers, + auth=HTTPBasicAuth( + username=self.user, password=self.password), + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + if r.status_code != requests.codes.created: + # We had a problem + + self.logger.warning( + 'Failed to send DAPNET notification {} to {}: ' + 'error={}.'.format( + payload['text'], + ' to {}'.format(self.targets), + r.status_code + ) + ) + + self.logger.debug( + 'Response Details:\r\n{}'.format(r.content)) + + # Mark our failure + has_error = True + + else: + self.logger.info( + 'Sent \'{}\' DAPNET notification {}'.format( + payload['text'], 'to {}'.format(self.targets) + ) + ) + + except requests.RequestException as e: + self.logger.warning( + 'A Connection error occurred sending DAPNET ' + 'notification to {}'.format(self.targets) + ) + self.logger.debug('Socket Exception: %s' % str(e)) + + # Mark our failure + has_error = True + + return not has_error + + def url(self, privacy=False, *args, **kwargs): + """ + Returns the URL built dynamically based on specified arguments. + """ + + # Define any URL parameters + _map = { + DapnetPriority.NORMAL: 'normal', + DapnetPriority.EMERGENCY: 'emergency', + } + + # Define any URL parameters + params = { + 'priority': 'normal' if self.priority not in _map + else _map[self.priority], + 'batch': 'yes' if self.batch else 'no', + 'txgroups': ','.join(self.txgroups), + } + + # Extend our parameters + params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) + + # Setup Authentication + auth = '{user}:{password}@'.format( + user=NotifyDapnet.quote(self.user, safe=""), + password=self.pprint( + self.password, privacy, mode=PrivacyMode.Secret, safe='' + ), + ) + + return '{schema}://{auth}{targets}?{params}'.format( + schema=self.secure_protocol, + auth=auth, + targets='/'.join([self.pprint(x, privacy, safe='') + for x in self.targets]), + params=NotifyDapnet.urlencode(params), + ) + + @staticmethod + def parse_url(url): + """ + Parses the URL and returns enough arguments that can allow + us to re-instantiate this object. + + """ + results = NotifyBase.parse_url(url, verify_host=False) + if not results: + # We're done early as we couldn't load the results + return results + + # All elements are targets + results['targets'] = [NotifyDapnet.unquote(results['host'])] + + # All entries after the hostname are additional targets + results['targets'].extend(NotifyDapnet.split_path(results['fullpath'])) + + # Support the 'to' variable so that we can support rooms this way too + # The 'to' makes it easier to use yaml configuration + if 'to' in results['qsd'] and len(results['qsd']['to']): + results['targets'] += parse_call_sign(results['qsd']['to']) + + # Check for priority + if 'priority' in results['qsd'] and len(results['qsd']['priority']): + _map = { + # Letter Assignments + 'n': DapnetPriority.NORMAL, + 'e': DapnetPriority.EMERGENCY, + 'no': DapnetPriority.NORMAL, + 'em': DapnetPriority.EMERGENCY, + # Numeric assignments + '0': DapnetPriority.NORMAL, + '1': DapnetPriority.EMERGENCY, + } + try: + results['priority'] = \ + _map[results['qsd']['priority'][0:2].lower()] + + except KeyError: + # No priority was set + pass + + # Check for one or multiple transmitter groups (comma separated) + # and split them up, when necessary + if 'txgroups' in results['qsd']: + results['txgroups'] = \ + [x.lower() for x in + NotifyDapnet.parse_list(results['qsd']['txgroups'])] + + # Get Batch Mode Flag + results['batch'] = \ + parse_bool(results['qsd'].get( + 'batch', NotifyDapnet.template_args['batch']['default'])) + + return results diff --git a/apprise/utils.py b/apprise/utils.py index 27b263c3..0c525d03 100644 --- a/apprise/utils.py +++ b/apprise/utils.py @@ -133,6 +133,17 @@ IS_PHONE_NO = re.compile(r'^\+?(?P[0-9\s)(+-]+)\s*$') PHONE_NO_DETECTION_RE = re.compile( r'\s*([+(\s]*[0-9][0-9()\s-]+[0-9])(?=$|[\s,+(]+[0-9])', re.I) +# A simple verification check to make sure the content specified +# rougly conforms to a ham radio call sign before we parse it further +IS_CALL_SIGN = re.compile( + r'^(?P[a-z0-9]{2,3}[0-9][a-z0-9]{3})' + r'(?P-[a-z0-9]{1,2})?\s*$', re.I) + +# Regular expression used to destinguish between multiple ham radio call signs +CALL_SIGN_DETECTION_RE = re.compile( + r'\s*([a-z0-9]{2,3}[0-9][a-z0-9]{3}(?:-[a-z0-9]{1,2})?)' + r'(?=$|[\s,]+[a-z0-9]{4,6})', re.I) + # Regular expression used to destinguish between multiple URLs URL_DETECTION_RE = re.compile( r'([a-z0-9]+?:\/\/.*?)(?=$|[\s,]+[a-z0-9]{2,9}?:\/\/)', re.I) @@ -372,6 +383,37 @@ def is_phone_no(phone, min_len=11): } +def is_call_sign(callsign): + """Determine if the specified entry is a ham radio call sign + + Args: + callsign (str): The string you want to check. + + Returns: + bool: Returns False if the address specified is not a phone number + """ + + try: + result = IS_CALL_SIGN.match(callsign) + if not result: + # not parseable content as it does not even conform closely to a + # callsign + return False + + except TypeError: + # not parseable content + return False + + ssid = result.group('ssid') + return { + # always treat call signs as uppercase content + 'callsign': result.group('callsign').upper(), + # Prevent the storing of the None keyword in the event the SSID was + # not detected + 'ssid': ssid if ssid else '', + } + + def is_email(address): """Determine if the specified entry is an email address @@ -766,6 +808,43 @@ def parse_phone_no(*args, **kwargs): return result +def parse_call_sign(*args, **kwargs): + """ + Takes a string containing ham radio call signs separated by + comma and/or spacesand returns a list. + """ + + # for Python 2.7 support, store_unparsable is not in the url above + # as just parse_emails(*args, store_unparseable=True) since it is + # an invalid syntax. This is the workaround to be backards compatible: + store_unparseable = kwargs.get('store_unparseable', True) + + result = [] + for arg in args: + if isinstance(arg, six.string_types) and arg: + _result = CALL_SIGN_DETECTION_RE.findall(arg) + if _result: + result += _result + + elif not _result and store_unparseable: + # we had content passed into us that was lost because it was + # so poorly formatted that it didn't even come close to + # meeting the regular expression we defined. We intentially + # keep it as part of our result set so that parsing done + # at a higher level can at least report this to the end user + # and hopefully give them some indication as to what they + # may have done wrong. + result += \ + [x for x in filter(bool, re.split(STRING_DELIMITERS, arg))] + + elif isinstance(arg, (set, list, tuple)): + # Use recursion to handle the list of call signs + result += parse_call_sign( + *arg, store_unparseable=store_unparseable) + + return result + + def parse_emails(*args, **kwargs): """ Takes a string containing emails separated by comma's and/or spaces and diff --git a/packaging/redhat/python-apprise.spec b/packaging/redhat/python-apprise.spec index aedd3017..3030ea58 100644 --- a/packaging/redhat/python-apprise.spec +++ b/packaging/redhat/python-apprise.spec @@ -47,7 +47,7 @@ Apprise is a Python package for simplifying access to all of the different notification services that are out there. Apprise opens the door and makes it easy to access: -Apprise API, AWS SES, AWS SNS, Boxcar, ClickSend, DingTalk, Discord, E-Mail, +Apprise API, AWS SES, AWS SNS, Boxcar, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby, Faast, FCM, Flock, Gitter, Google Chat, Gotify, Growl, Home Assistant, IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, MacOSX, Mailgun, Mattermost, Matrix, Microsoft Windows, Microsoft Teams, MessageBird, MQTT, MSG91, MyAndroid, diff --git a/setup.py b/setup.py index 92a4cba4..a6838132 100755 --- a/setup.py +++ b/setup.py @@ -70,10 +70,10 @@ setup( cmdclass=cmdclass, url='https://github.com/caronc/apprise', keywords='Push Notifications Alerts Email AWS SES SNS Boxcar ClickSend ' - 'Dingtalk Discord Dbus Emby Faast FCM Flock Gitter Gnome Google Chat ' - 'Gotify Growl Home Assistant IFTTT Join Kavenegar KODI Kumulos ' - 'LaMetric MacOS Mailgun Matrix Mattermost MessageBird MQTT MSG91 ' - 'Nexmo Nextcloud Notica Notifico Office365 OneSignal Opsgenie ' + 'DAPNET Dingtalk Discord Dbus Emby Faast FCM Flock Gitter Gnome ' + 'Google Chat Gotify Growl Home Assistant IFTTT Join Kavenegar KODI ' + 'Kumulos LaMetric MacOS Mailgun Matrix Mattermost MessageBird MQTT ' + 'MSG91 Nexmo Nextcloud Notica Notifico Office365 OneSignal Opsgenie ' 'ParsePlatform PopcornNotify Prowl PushBullet Pushjet Pushed ' 'Pushover PushSafer Reddit Rocket.Chat Ryver SendGrid ServerChan ' 'SimplePush Sinch Slack SMTP2Go SparkPost Spontit Streamlabs ' diff --git a/test/helpers/rest.py b/test/helpers/rest.py index 71cede28..6f039d8d 100644 --- a/test/helpers/rest.py +++ b/test/helpers/rest.py @@ -230,6 +230,9 @@ class AppriseURLTester(object): assert False if isinstance(obj, plugins.NotifyBase): + # Ensure we are not performing any type of thorttling + obj.request_rate_per_sec = 0 + # We loaded okay; now lets make sure we can reverse # this url assert isinstance(obj.url(), six.string_types) is True diff --git a/test/test_plugin_dapnet.py b/test/test_plugin_dapnet.py new file mode 100644 index 00000000..9461238b --- /dev/null +++ b/test/test_plugin_dapnet.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021 Chris Caron +# All rights reserved. +# +# This code is licensed under the MIT License. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files(the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions : +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# Disable logging for a cleaner testing output +import logging +import requests + +from apprise import plugins +from helpers import AppriseURLTester + +logging.disable(logging.CRITICAL) + +# Our Testing URLs +apprise_url_tests = ( + ('dapnet://', { + # We failed to identify any valid authentication + 'instance': TypeError, + }), + ('dapnet://:@/', { + # We failed to identify any valid authentication + 'instance': TypeError, + }), + ('dapnet://user:pass', { + # No call-sign specified + 'instance': TypeError, + }), + ('dapnet://user@host', { + # No password specified + 'instance': TypeError, + }), + ('dapnet://user:pass@{}'.format('DF1ABC'), { + # valid call sign + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@{}/{}'.format('DF1ABC', 'DF1DEF'), { + # valid call signs + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@?to={},{}'.format('DF1ABC', 'DF1DEF'), { + # support the to= argument + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@{}?priority=normal'.format('DF1ABC'), { + # valid call sign with priority + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@{}?priority=em&batch=false'.format( + '/'.join(['DF1ABC', '0A1DEF'])), { + # valid call sign with priority (emergency) + no batch + # transmissions + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@{}?priority=invalid'.format('DF1ABC'), { + # invalid priority + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@{}?txgroups=dl-all,all'.format('DF1ABC'), { + # valid call sign with two transmitter groups + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@{}?txgroups=invalid'.format('DF1ABC'), { + # valid call sign with invalid transmitter group + 'instance': plugins.NotifyDapnet, + 'requests_response_code': requests.codes.created, + }), + ('dapnet://user:pass@{}/{}'.format('abcdefghi', 'a'), { + # invalid call signs + 'instance': plugins.NotifyDapnet, + 'notify_response': False, + }), + # Edge cases + ('dapnet://user:pass@{}'.format('DF1ABC'), { + 'instance': plugins.NotifyDapnet, + # throw a bizzare code forcing us to fail to look it up + 'response': False, + 'requests_response_code': 999, + }), + ('dapnet://user:pass@{}'.format('DF1ABC'), { + 'instance': plugins.NotifyDapnet, + # Throws a series of connection and transfer exceptions when this flag + # is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), +) + + +def test_plugin_dapnet_urls(): + """ + NotifyDapnet() Apprise URLs + + """ + + # Run our general tests + AppriseURLTester(tests=apprise_url_tests).run_all() diff --git a/test/test_utils.py b/test/test_utils.py index f30dadb4..5d0383e1 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -751,6 +751,44 @@ def test_is_email(): assert utils.is_email("Name ") is False +def test_is_call_sign_no(): + """ + API: is_call_sign() function + + """ + # Invalid numbers + assert utils.is_call_sign(None) is False + assert utils.is_call_sign(42) is False + assert utils.is_call_sign(object) is False + assert utils.is_call_sign('') is False + assert utils.is_call_sign('1') is False + assert utils.is_call_sign('12') is False + assert utils.is_call_sign('abc') is False + assert utils.is_call_sign('+()') is False + assert utils.is_call_sign('+') is False + assert utils.is_call_sign(None) is False + assert utils.is_call_sign(42) is False + + # To short or 2 long + assert utils.is_call_sign('DF1AB') is False + assert utils.is_call_sign('DF1ABCX') is False + assert utils.is_call_sign('DF1ABCEFG') is False + assert utils.is_call_sign('1ABCX') is False + # 4th character is not an number + assert utils.is_call_sign('XXXXXX') is False + + # Some valid checks + result = utils.is_call_sign('DF1ABC') + assert isinstance(result, dict) + assert 'DF1ABC' == result['callsign'] + assert '' == result['ssid'] + + # Get our SSID + result = utils.is_call_sign('DF1ABC-14') + assert 'DF1ABC' == result['callsign'] + assert '-14' == result['ssid'] + + def test_is_phone_no(): """ API: is_phone_no() function @@ -861,6 +899,48 @@ def test_is_phone_no(): assert '18001234567' == results['full'] +def test_parse_call_sign(): + """utils: parse_call_sign() testing """ + # A simple single array entry (As str) + results = utils.parse_call_sign('') + assert isinstance(results, list) + assert len(results) == 0 + + # just delimeters + results = utils.parse_call_sign(', ,, , ,,, ') + assert isinstance(results, list) + assert len(results) == 0 + + results = utils.parse_call_sign(None) + assert isinstance(results, list) + assert len(results) == 0 + + results = utils.parse_call_sign(42) + assert isinstance(results, list) + assert len(results) == 0 + + results = utils.parse_call_sign('this is not a parseable call sign at all') + assert isinstance(results, list) + assert len(results) == 9 + + results = utils.parse_call_sign( + 'this is not a parseable call sign at all', store_unparseable=False) + assert isinstance(results, list) + assert len(results) == 0 + + # Now test valid call signs + results = utils.parse_call_sign('0A1DEF') + assert isinstance(results, list) + assert len(results) == 1 + assert '0A1DEF' in results + + results = utils.parse_call_sign('0A1DEF, DF1ABC') + assert isinstance(results, list) + assert len(results) == 2 + assert '0A1DEF' in results + assert 'DF1ABC' in results + + def test_parse_phone_no(): """utils: parse_phone_no() testing """ # A simple single array entry (As str)