refactored and drastically improved throttling functionality

pull/74/head
Chris Caron 2019-02-17 16:35:09 -05:00
parent 00d1c9b958
commit 7d9715aa5a
32 changed files with 298 additions and 103 deletions

View File

@ -26,6 +26,8 @@
import re
import logging
from time import sleep
from datetime import datetime
try:
# Python 2.7
from urllib import unquote as _unquote
@ -115,8 +117,8 @@ class NotifyBase(object):
setup_url = None
# Most Servers do not like more then 1 request per 5 seconds, so 5.5 gives
# us a safe play range...
throttle_attempt = 5.5
# us a safe play range.
request_rate_per_sec = 5.5
# Allows the user to specify the NotifyImageSize object
image_size = None
@ -209,19 +211,45 @@ class NotifyBase(object):
# it just falls back to whatever was already defined globally
self.tags = set(parse_list(kwargs.get('tag', self.tags)))
def throttle(self, throttle_time=None):
# Tracks the time any i/o was made to the remote server. This value
# is automatically set and controlled through the throttle() call.
self._last_io_datetime = None
def throttle(self, last_io=None):
"""
A common throttle control
"""
self.logger.debug('Throttling...')
throttle_time = throttle_time \
if throttle_time is not None else self.throttle_attempt
if last_io is not None:
# Assume specified last_io
self._last_io_datetime = last_io
# Perform throttle
if throttle_time > 0:
sleep(throttle_time)
# Get ourselves a reference time of 'now'
reference = datetime.now()
if self._last_io_datetime is None:
# Set time to 'now' and no need to throttle
self._last_io_datetime = reference
return
if self.request_rate_per_sec <= 0.0:
# We're done if there is no throttle limit set
return
# If we reach here, we need to do additional logic.
# If the difference between the reference time and 'now' is less than
# the defined request_rate_per_sec then we need to throttle for the
# remaining balance of this time.
elapsed = (reference - self._last_io_datetime).total_seconds()
if elapsed < self.request_rate_per_sec:
self.logger.debug('Throttling for {}s...'.format(
self.request_rate_per_sec - elapsed))
sleep(self.request_rate_per_sec - elapsed)
# Update our timestamp before we leave
self._last_io_datetime = reference
return
def image_url(self, notify_type, logo=False, extension=None):

View File

@ -230,6 +230,9 @@ class NotifyBoxcar(NotifyBase):
))
self.logger.debug('Boxcar Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,

View File

@ -142,6 +142,9 @@ class NotifyDBus(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_dbus'
# No throttling required for DBus queries
request_rate_per_sec = 0
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
@ -258,6 +261,9 @@ class NotifyDBus(NotifyBase):
.format(icon_path, e))
try:
# Always call throttle() before any remote execution is made
self.throttle()
dbus_iface.Notify(
# Application Identifier
self.app_id,

View File

@ -201,6 +201,10 @@ class NotifyDiscord(NotifyBase):
notify_url, self.verify_certificate,
))
self.logger.debug('Discord Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,

View File

@ -375,6 +375,10 @@ class NotifyEmail(NotifyBase):
# bind the socket variable to the current namespace
socket = None
# Always call throttle before any remote server i/o is made
self.throttle()
try:
self.logger.debug('Connecting to remote SMTP server...')
socket_func = smtplib.SMTP

View File

@ -494,6 +494,10 @@ class NotifyEmby(NotifyBase):
session_url, self.verify_certificate,
))
self.logger.debug('Emby Payload: %s' % str(payload))
# Always call throttle before the requests are made
self.throttle()
try:
r = requests.post(
session_url,

View File

@ -85,6 +85,10 @@ class NotifyFaast(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Faast Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,

View File

@ -84,6 +84,10 @@ class NotifyGnome(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# Disable throttle rate for Gnome requests since they are normally
# local anyway
request_rate_per_sec = 0
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 10
@ -138,6 +142,9 @@ class NotifyGnome(NotifyBase):
# Assign urgency
notification.set_urgency(self.urgency)
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Use Pixbuf to create the proper image type
image = GdkPixbuf.Pixbuf.new_from_file(icon_path)

View File

@ -69,6 +69,10 @@ class NotifyGrowl(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_growl'
# Disable throttle rate for Growl requests since they are normally
# local anyway
request_rate_per_sec = 0
# Default Growl Port
default_port = 23053
@ -178,6 +182,9 @@ class NotifyGrowl(NotifyBase):
# print the binary contents of an image
payload['icon'] = icon
# Always call throttle before any remote server i/o is made
self.throttle()
try:
response = self.growl.notify(**payload)
if not isinstance(response, bool):

View File

@ -182,6 +182,10 @@ class NotifyIFTTT(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('IFTTT Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -225,11 +229,6 @@ class NotifyIFTTT(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
error_count += 1
finally:
if len(events):
# Prevent thrashing requests
self.throttle()
return (error_count == 0)
def url(self):

View File

@ -52,6 +52,10 @@ class NotifyJSON(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# Disable throttle rate for JSON requests since they are normally
# local anyway
request_rate_per_sec = 0
def __init__(self, headers, **kwargs):
"""
Initialize JSON Object
@ -154,6 +158,10 @@ class NotifyJSON(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('JSON Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,

View File

@ -181,6 +181,9 @@ class NotifyJoin(NotifyBase):
))
self.logger.debug('Join Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -220,10 +223,6 @@ class NotifyJoin(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
return_status = False
if len(devices):
# Prevent thrashing requests
self.throttle()
return return_status
def url(self):

View File

@ -169,6 +169,10 @@ class NotifyMatrix(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('Matrix Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,

View File

@ -68,6 +68,9 @@ class NotifyMatterMost(NotifyBase):
# The maximum allowable characters allowed in the body per message
body_maxlen = 4000
# Mattermost does not have a title
title_maxlen = 0
def __init__(self, authtoken, channel=None, **kwargs):
"""
Initialize MatterMost Object
@ -120,7 +123,7 @@ class NotifyMatterMost(NotifyBase):
# prepare JSON Object
payload = {
'text': '###### %s\n%s' % (title, body),
'text': body,
'icon_url': self.image_url(notify_type),
}
@ -140,6 +143,10 @@ class NotifyMatterMost(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('MatterMost Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,

View File

@ -81,6 +81,10 @@ class NotifyProwl(NotifyBase):
# Prowl uses the http protocol with JSON requests
notify_url = 'https://api.prowlapp.com/publicapi/add'
# Disable throttle rate for Prowl requests since they are normally
# local anyway
request_rate_per_sec = 0
# The maximum allowable characters allowed in the body per message
body_maxlen = 10000
@ -150,6 +154,10 @@ class NotifyProwl(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Prowl Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,

View File

@ -135,6 +135,10 @@ class NotifyPushBullet(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('PushBullet Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
@ -176,10 +180,6 @@ class NotifyPushBullet(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
has_error = True
if len(recipients):
# Prevent thrashing requests
self.throttle()
return not has_error
def url(self):

View File

@ -177,10 +177,6 @@ class NotifyPushed(NotifyBase):
# toggle flag
has_error = True
if len(channels) + len(users) > 0:
# Prevent thrashing requests
self.throttle()
# Copy our payload
_payload = dict(payload)
_payload['target_type'] = 'pushed_id'
@ -189,16 +185,13 @@ class NotifyPushed(NotifyBase):
while len(users):
# Get User's Pushed ID
_payload['pushed_id'] = users.pop(0)
if not self.send_notification(
payload=_payload, notify_type=notify_type, **kwargs):
# toggle flag
has_error = True
if len(users) > 0:
# Prevent thrashing requests
self.throttle()
return not has_error
def send_notification(self, payload, notify_type, **kwargs):
@ -217,6 +210,10 @@ class NotifyPushed(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushed Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,

View File

@ -52,6 +52,10 @@ class NotifyPushjet(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushjet'
# Disable throttle rate for Pushjet requests since they are normally
# local anyway (the remote/online service is no more)
request_rate_per_sec = 0
def __init__(self, secret_key, **kwargs):
"""
Initialize Pushjet Object
@ -65,15 +69,16 @@ class NotifyPushjet(NotifyBase):
"""
Perform Pushjet Notification
"""
# Always call throttle before any remote server i/o is made
self.throttle()
server = "https://" if self.secure else "http://"
server += self.host
if self.port:
server += ":" + str(self.port)
try:
server = "http://"
if self.secure:
server = "https://"
server += self.host
if self.port:
server += ":" + str(self.port)
api = pushjet.Api(server)
service = api.Service(secret_key=self.secret_key)

View File

@ -189,6 +189,10 @@ class NotifyPushover(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushover Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
@ -231,10 +235,6 @@ class NotifyPushover(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
has_error = True
if len(devices):
# Prevent thrashing requests
self.throttle()
return not has_error
def url(self):

View File

@ -67,8 +67,11 @@ class NotifyRocketChat(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_rocketchat'
# Defines the maximum allowable characters in the title
title_maxlen = 200
# The title is not used
title_maxlen = 0
# The maximum size of the message
body_maxlen = 200
def __init__(self, recipients=None, **kwargs):
"""
@ -185,8 +188,8 @@ class NotifyRocketChat(NotifyBase):
if not self.login():
return False
# Prepare our message
text = '*%s*\r\n%s' % (title.replace('*', '\\*'), body)
# Prepare our message using the body only
text = body
# Initiaize our error tracking
has_error = False
@ -208,10 +211,6 @@ class NotifyRocketChat(NotifyBase):
# toggle flag
has_error = True
if len(channels) + len(rooms) > 0:
# Prevent thrashing requests
self.throttle()
# Send all our defined room id's
while len(rooms):
# Get Room
@ -226,10 +225,6 @@ class NotifyRocketChat(NotifyBase):
# toggle flag
has_error = True
if len(rooms) > 0:
# Prevent thrashing requests
self.throttle()
# logout
self.logout()
@ -244,6 +239,10 @@ class NotifyRocketChat(NotifyBase):
self.api_url + 'chat.postMessage', self.verify_certificate,
))
self.logger.debug('Rocket.Chat Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.api_url + 'chat.postMessage',

View File

@ -178,6 +178,10 @@ class NotifyRyver(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('Ryver Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,

View File

@ -86,6 +86,10 @@ class NotifySNS(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_sns'
# AWS is pretty good for handling data load so request limits
# can occur in much shorter bursts
request_rate_per_sec = 2.5
# The maximum length of the body
# Source: https://docs.aws.amazon.com/sns/latest/api/API_Publish.html
body_maxlen = 140
@ -219,10 +223,6 @@ class NotifySNS(NotifyBase):
if not result:
error_count += 1
if len(phone) > 0:
# Prevent thrashing requests
self.throttle()
# Send all our defined topic id's
while len(topics):
@ -261,10 +261,6 @@ class NotifySNS(NotifyBase):
if not result:
error_count += 1
if len(topics) > 0:
# Prevent thrashing requests
self.throttle()
return error_count == 0
def _post(self, payload, to):
@ -276,6 +272,13 @@ class NotifySNS(NotifyBase):
if it wasn't.
"""
# Always call throttle before any remote server i/o is made; for AWS
# time plays a huge factor in the headers being sent with the payload.
# So for AWS (SNS) requests we must throttle before they're generated
# and not directly before the i/o call like other notification
# services do.
self.throttle()
# Convert our payload from a dict() into a urlencoded string
payload = self.urlencode(payload)
@ -287,6 +290,7 @@ class NotifySNS(NotifyBase):
self.notify_url, self.verify_certificate,
))
self.logger.debug('AWS Payload: %s' % str(payload))
try:
r = requests.post(
self.notify_url,

View File

@ -250,6 +250,9 @@ class NotifySlack(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('Slack Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
@ -274,7 +277,7 @@ class NotifySlack(NotifyBase):
channel,
r.status_code))
# self.logger.debug('Response Details: %s' % r.raw.read())
# self.logger.debug('Response Details: %s' % r.content)
# Return; we're done
notify_okay = False
@ -290,10 +293,6 @@ class NotifySlack(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
notify_okay = False
if len(channels):
# Prevent thrashing requests
self.throttle()
return notify_okay
def url(self):

View File

@ -413,14 +413,14 @@ class NotifyTelegram(NotifyBase):
# ID
payload['chat_id'] = int(chat_id.group('idno'))
# Always call throttle before any remote server i/o is made;
# Telegram throttles to occur before sending the image so that
# content can arrive together.
self.throttle()
if self.include_image is True:
# Send an image
if self.send_image(
payload['chat_id'], notify_type) is not None:
# We sent a post (whether we were successful or not)
# we still hit the remote server... just throttle
# before our next hit server query
self.throttle()
self.send_image(payload['chat_id'], notify_type)
self.logger.debug('Telegram POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
@ -483,11 +483,6 @@ class NotifyTelegram(NotifyBase):
self.logger.debug('Socket Exception: %s' % str(e))
has_error = True
finally:
if len(chat_ids):
# Prevent thrashing requests
self.throttle()
return not has_error
def url(self):

View File

@ -50,6 +50,9 @@ class NotifyTwitter(NotifyBase):
# which are limited to 240 characters)
body_maxlen = 4096
# Twitter does have titles when creating a message
title_maxlen = 0
def __init__(self, ckey, csecret, akey, asecret, **kwargs):
"""
Initialize Twitter Object
@ -109,15 +112,16 @@ class NotifyTwitter(NotifyBase):
)
return False
# Only set title if it was specified
text = body if not title else '%s\r\n%s' % (title, body)
# Always call throttle before any remote server i/o is made to avoid
# thrashing the remote server and risk being blocked.
self.throttle()
try:
# Get our API
api = tweepy.API(self.auth)
# Send our Direct Message
api.send_direct_message(self.user, text=text)
api.send_direct_message(self.user, text=body)
self.logger.info('Sent Twitter DM notification.')
except Exception as e:

View File

@ -63,6 +63,10 @@ class NotifyWindows(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_windows'
# Disable throttle rate for Windows requests since they are normally
# local anyway
request_rate_per_sec = 0
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
@ -113,6 +117,9 @@ class NotifyWindows(NotifyBase):
"Windows Notifications are not supported by this system.")
return False
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Register destruction callback
message_map = {win32con.WM_DESTROY: self._on_destroy, }

View File

@ -57,6 +57,10 @@ class NotifyXBMC(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_kodi'
# Disable throttle rate for XBMC/KODI requests since they are normally
# local anyway
request_rate_per_sec = 0
# Limit results to just the first 2 line otherwise there is just to much
# content to display
body_max_line_count = 2
@ -186,6 +190,10 @@ class NotifyXBMC(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('XBMC/KODI Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,

View File

@ -52,6 +52,10 @@ class NotifyXML(NotifyBase):
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# Disable throttle rate for JSON requests since they are normally
# local anyway
request_rate_per_sec = 0
def __init__(self, headers=None, **kwargs):
"""
Initialize XML Object
@ -172,6 +176,10 @@ class NotifyXML(NotifyBase):
url, self.verify_certificate,
))
self.logger.debug('XML Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,

View File

@ -166,6 +166,8 @@ def test_email_plugin(mock_smtp, mock_smtpssl):
API: NotifyEmail Plugin()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# iterate over our dictionary and test it out
for (url, meta) in TEST_URLS:
@ -342,6 +344,8 @@ def test_smtplib_init_fail(mock_smtplib):
API: Test exception handling when calling smtplib.SMTP()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
obj = Apprise.instantiate(
'mailto://user:pass@gmail.com', suppress_exceptions=False)
@ -378,6 +382,8 @@ def test_smtplib_send_okay(mock_smtplib):
API: Test a successfully sent email
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Defaults to HTML
obj = Apprise.instantiate(

View File

@ -23,6 +23,9 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from datetime import datetime
from datetime import timedelta
from apprise.plugins.NotifyBase import NotifyBase
from apprise import NotifyType
from apprise import NotifyImageSize
@ -64,7 +67,7 @@ def test_notify_base():
# Throttle overrides..
nb = NotifyBase()
nb.throttle_attempt = 0.0
nb.request_rate_per_sec = 0.0
start_time = default_timer()
nb.throttle()
elapsed = default_timer() - start_time
@ -73,13 +76,57 @@ def test_notify_base():
# then other
assert elapsed < 0.5
# Concurrent calls should achieve the same response
start_time = default_timer()
nb.throttle(1.0)
nb.throttle()
elapsed = default_timer() - start_time
assert elapsed < 0.5
nb = NotifyBase()
nb.request_rate_per_sec = 1.0
# Set our time to now
start_time = default_timer()
nb.throttle()
elapsed = default_timer() - start_time
# A first call to throttle (Without telling it a time previously ran) does
# not block for any length of time; it just merely sets us up for
# concurrent calls to block
assert elapsed < 0.5
# Concurrent calls could take up to the rate_per_sec though...
start_time = default_timer()
nb.throttle(last_io=datetime.now())
elapsed = default_timer() - start_time
assert elapsed > 0.5 and elapsed < 1.5
nb = NotifyBase()
nb.request_rate_per_sec = 1.0
# Set our time to now
start_time = default_timer()
nb.throttle(last_io=datetime.now())
elapsed = default_timer() - start_time
# because we told it that we had already done a previous action (now)
# the throttle holds out until the right time has passed
assert elapsed > 0.5 and elapsed < 1.5
# Concurrent calls could take up to the rate_per_sec though...
start_time = default_timer()
nb.throttle(last_io=datetime.now())
elapsed = default_timer() - start_time
assert elapsed > 0.5 and elapsed < 1.5
nb = NotifyBase()
start_time = default_timer()
nb.request_rate_per_sec = 1.0
# Force a time in the past
nb.throttle(last_io=(datetime.now() - timedelta(seconds=20)))
elapsed = default_timer() - start_time
# Should be a very fast response time since we set it to zero but we'll
# check for less then 500 to be fair as some testing systems may be slower
# then other
assert elapsed < 1.5
assert elapsed < 0.5
# our NotifyBase wasn't initialized with an ImageSize so this will fail
assert nb.image_url(notify_type=NotifyType.INFO) is None

View File

@ -1487,7 +1487,7 @@ def test_rest_plugins(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# iterate over our dictionary and test it out
for (url, meta) in TEST_URLS:
@ -1606,12 +1606,18 @@ def test_rest_plugins(mock_post, mock_get):
#
try:
if test_requests_exceptions is False:
# Disable throttling
obj.request_rate_per_sec = 0
# check that we're as expected
assert obj.notify(
title='test', body='body',
notify_type=notify_type) == response
else:
# Disable throttling
obj.request_rate_per_sec = 0
for _exception in REQUEST_EXCEPTIONS:
mock_post.side_effect = _exception
mock_get.side_effect = _exception
@ -1699,7 +1705,7 @@ def test_notify_boxcar_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Generate some generic message types
device = 'A' * 64
@ -1762,7 +1768,7 @@ def test_notify_discord_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
webhook_id = 'A' * 24
@ -1844,6 +1850,8 @@ def test_notify_emby_plugin_login(mock_post, mock_get):
API: NotifyEmby.login()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Prepare Mock
mock_get.return_value = requests.Request()
@ -1969,6 +1977,8 @@ def test_notify_emby_plugin_sessions(mock_post, mock_get, mock_logout,
API: NotifyEmby.sessions()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Prepare Mock
mock_get.return_value = requests.Request()
@ -2070,6 +2080,8 @@ def test_notify_emby_plugin_logout(mock_post, mock_get, mock_login):
API: NotifyEmby.sessions()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Prepare Mock
mock_get.return_value = requests.Request()
@ -2138,6 +2150,8 @@ def test_notify_emby_plugin_notify(mock_post, mock_get, mock_logout,
API: NotifyEmby.notify()
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Prepare Mock
mock_get.return_value = requests.Request()
@ -2214,7 +2228,7 @@ def test_notify_ifttt_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
webhook_id = 'webhookid'
@ -2243,11 +2257,10 @@ def test_notify_ifttt_plugin(mock_post, mock_get):
assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is True
# Test the addition of tokens
obj = plugins.NotifyIFTTT(
webhook_id=webhook_id, events=events,
add_tokens={'Test':'ValueA', 'Test2': 'ValueB'})
add_tokens={'Test': 'ValueA', 'Test2': 'ValueB'})
assert(isinstance(obj, plugins.NotifyIFTTT))
@ -2282,14 +2295,14 @@ def test_notify_ifttt_plugin(mock_post, mock_get):
del_tokens=(
plugins.NotifyIFTTT.ifttt_default_title_key,
plugins.NotifyIFTTT.ifttt_default_body_key,
plugins.NotifyIFTTT.ifttt_default_type_key,
))
plugins.NotifyIFTTT.ifttt_default_type_key))
assert(isinstance(obj, plugins.NotifyIFTTT))
assert obj.notify(title='title', body='body',
notify_type=NotifyType.INFO) is True
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_join_plugin(mock_post, mock_get):
@ -2298,7 +2311,7 @@ def test_notify_join_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Generate some generic message types
device = 'A' * 32
@ -2333,7 +2346,7 @@ def test_notify_slack_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
token_a = 'A' * 9
@ -2381,6 +2394,8 @@ def test_notify_pushbullet_plugin(mock_post, mock_get):
API: NotifyPushBullet() Extra Checks
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
accesstoken = 'a' * 32
@ -2425,7 +2440,7 @@ def test_notify_pushed_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Chat ID
recipients = '@ABCDEFG, @DEFGHIJ, #channel, #channel2'
@ -2525,7 +2540,7 @@ def test_notify_pushover_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Initialize some generic (but valid) tokens
token = 'a' * 30
@ -2589,7 +2604,7 @@ def test_notify_rocketchat_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Chat ID
recipients = 'l2g, lead2gold, #channel, #channel2'
@ -2724,7 +2739,7 @@ def test_notify_telegram_plugin(mock_post, mock_get):
"""
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Bot Token
bot_token = '123456789:abcdefg_hijklmnop'
@ -2954,6 +2969,9 @@ def test_notify_overflow_truncate():
# A little preparation
#
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Number of characters per line
row = 24
@ -3119,6 +3137,9 @@ def test_notify_overflow_split():
# A little preparation
#
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.request_rate_per_sec = 0
# Number of characters per line
row = 24
@ -3280,5 +3301,5 @@ def test_notify_overflow_split():
assert chunk.get('title') == ''
_body = chunk.get('body')
assert bulk[offset:len(_body)+offset] == _body
assert bulk[offset: len(_body) + offset] == _body
offset += len(_body)

View File

@ -303,6 +303,8 @@ def test_aws_topic_handling(mock_post):
API: NotifySNS Plugin() AWS Topic Handling
"""
# Disable Throttling to speed testing
plugins.NotifySNS.request_rate_per_sec = 0
arn_response = \
"""
@ -336,9 +338,6 @@ def test_aws_topic_handling(mock_post):
# Assign ourselves a new function
mock_post.side_effect = post
# Disable Throttling to speed testing
plugins.NotifyBase.NotifyBase.throttle_attempt = 0
# Create our object
a = Apprise()