Twitter Notification Rewrite; dropped tweepy backend (#133)

pull/134/head
Chris Caron 2019-06-29 18:27:59 -04:00 committed by GitHub
parent 8fce99a006
commit 7a89b08a83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 977 additions and 4124 deletions

View File

@ -58,7 +58,7 @@ The table below identifies the services this tool supports and some example serv
| [Ryver](https://github.com/caronc/apprise/wiki/Notify_ryver) | ryver:// | (TCP) 443 | ryver://Organization/Token<br />ryver://botname@Organization/Token
| [Slack](https://github.com/caronc/apprise/wiki/Notify_slack) | slack:// | (TCP) 443 | slack://TokenA/TokenB/TokenC/<br />slack://TokenA/TokenB/TokenC/Channel<br />slack://botname@TokenA/TokenB/TokenC/Channel<br />slack://user@TokenA/TokenB/TokenC/Channel1/Channel2/ChannelN
| [Telegram](https://github.com/caronc/apprise/wiki/Notify_telegram) | tgram:// | (TCP) 443 | tgram://bottoken/ChatID<br />tgram://bottoken/ChatID1/ChatID2/ChatIDN
| [Twitter](https://github.com/caronc/apprise/wiki/Notify_twitter) | tweet:// | (TCP) 443 | tweet://user@CKey/CSecret/AKey/ASecret
| [Twitter](https://github.com/caronc/apprise/wiki/Notify_twitter) | twitter:// | (TCP) 443 | twitter://CKey/CSecret/AKey/ASecret<br/>twitter://user@CKey/CSecret/AKey/ASecret<br/>twitter://CKey/CSecret/AKey/ASecret/User1/User2/User2<br/>twitter://CKey/CSecret/AKey/ASecret?mode=tweet
| [XBMC](https://github.com/caronc/apprise/wiki/Notify_xbmc) | xbmc:// or xbmcs:// | (TCP) 8080 or 443 | xbmc://hostname<br />xbmc://user@hostname<br />xbmc://user:password@hostname:port
| [XMPP](https://github.com/caronc/apprise/wiki/Notify_xmpp) | xmpp:// or xmpps:// | (TCP) 5222 or 5223 | xmpp://password@hostname<br />xmpp://user:password@hostname<br />xmpps://user:password@hostname:port?jid=user@hostname/resource<br/>xmpps://password@hostname/target@myhost, target2@myhost/resource
| [Windows Notification](https://github.com/caronc/apprise/wiki/Notify_windows) | windows:// | n/a | windows://

View File

@ -0,0 +1,654 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# See https://developer.twitter.com/en/docs/direct-messages/\
# sending-and-receiving/api-reference/new-event.html
import re
import six
import requests
from datetime import datetime
from requests_oauthlib import OAuth1
from json import dumps
from json import loads
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
IS_USER = re.compile(r'^\s*@?(?P<user>[A-Z0-9_]+)$', re.I)
class TwitterMessageMode(object):
"""
Twitter Message Mode
"""
# DM (a Direct Message)
DM = 'dm'
# A Public Tweet
TWEET = 'tweet'
# Define the types in a list for validation purposes
TWITTER_MESSAGE_MODES = (
TwitterMessageMode.DM,
TwitterMessageMode.TWEET,
)
class NotifyTwitter(NotifyBase):
"""
A wrapper to Twitter Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Twitter'
# The services URL
service_url = 'https://twitter.com/'
# The default secure protocol is twitter. 'tweet' is left behind
# for backwards compatibility of older apprise usage
secure_protocol = ('twitter', 'tweet')
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twitter'
# Do not set body_maxlen as it is set in a property value below
# since the length varies depending if we are doing a direct message
# or a tweet
# body_maxlen = see below @propery defined
# Twitter does have titles when creating a message
title_maxlen = 0
# Twitter API
twitter_api = 'api.twitter.com'
# Twitter API Reference To Acquire Someone's Twitter ID
twitter_lookup = 'https://api.twitter.com/1.1/users/lookup.json'
# Twitter API Reference To Acquire Current Users Information
twitter_whoami = \
'https://api.twitter.com/1.1/account/verify_credentials.json'
# Twitter API Reference To Send A Private DM
twitter_dm = 'https://api.twitter.com/1.1/direct_messages/events/new.json'
# Twitter API Reference To Send A Public Tweet
twitter_tweet = 'https://api.twitter.com/1.1/statuses/update.json'
# Twitter is kind enough to return how many more requests we're allowed to
# continue to make within it's header response as:
# X-Rate-Limit-Reset: The epoc time (in seconds) we can expect our
# rate-limit to be reset.
# X-Rate-Limit-Remaining: an integer identifying how many requests we're
# still allow to make.
request_rate_per_sec = 0
# For Tracking Purposes
ratelimit_reset = datetime.utcnow()
# Default to 1000; users can send up to 1000 DM's and 2400 tweets a day
# This value only get's adjusted if the server sets it that way
ratelimit_remaining = 1
templates = (
'{schema}://{ckey}/{csecret}/{akey}/{asecret}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'ckey': {
'name': _('Consumer Key'),
'type': 'string',
'private': True,
'required': True,
},
'csecret': {
'name': _('Consumer Secret'),
'type': 'string',
'private': True,
'required': True,
},
'akey': {
'name': _('Access Key'),
'type': 'string',
'private': True,
'required': True,
},
'asecret': {
'name': _('Access Secret'),
'type': 'string',
'private': True,
'required': True,
},
'target_user': {
'name': _('Target User'),
'type': 'string',
'prefix': '@',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'mode': {
'name': _('Message Mode'),
'type': 'choice:string',
'values': TWITTER_MESSAGE_MODES,
'default': TwitterMessageMode.DM,
},
'cache': {
'name': _('Cache Results'),
'type': 'bool',
'default': True,
},
'to': {
'alias_of': 'targets',
},
})
def __init__(self, ckey, csecret, akey, asecret, targets=None,
mode=TwitterMessageMode.DM, cache=True, **kwargs):
"""
Initialize Twitter Object
"""
super(NotifyTwitter, self).__init__(**kwargs)
if not ckey:
msg = 'An invalid Consumer API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not csecret:
msg = 'An invalid Consumer Secret API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not akey:
msg = 'An invalid Access Token API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not asecret:
msg = 'An invalid Access Token Secret API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
# Store our webhook mode
self.mode = None \
if not isinstance(mode, six.string_types) else mode.lower()
# Set Cache Flag
self.cache = cache
if self.mode not in TWITTER_MESSAGE_MODES:
msg = 'The Twitter message mode specified ({}) is invalid.' \
.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Identify our targets
self.targets = []
for target in parse_list(targets):
match = IS_USER.match(target)
if match and match.group('user'):
self.targets.append(match.group('user'))
continue
self.logger.warning(
'Dropped invalid user ({}) specified.'.format(target),
)
# Store our data
self.ckey = ckey
self.csecret = csecret
self.akey = akey
self.asecret = asecret
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Twitter Notification
"""
# Call the _send_ function applicable to whatever mode we're in
# - calls _send_tweet if the mode is set so
# - calls _send_dm (direct message) otherwise
return getattr(self, '_send_{}'.format(self.mode))(
body=body, title=title, notify_type=notify_type, **kwargs)
def _send_tweet(self, body, title='', notify_type=NotifyType.INFO,
**kwargs):
"""
Twitter Public Tweet
"""
payload = {
'status': body,
}
# Send Tweet
postokay, response = self._fetch(
self.twitter_tweet,
payload=payload,
json=False,
)
if postokay:
self.logger.info(
'Sent Twitter notification as public tweet.')
return postokay
def _send_dm(self, body, title='', notify_type=NotifyType.INFO,
**kwargs):
"""
Twitter Direct Message
"""
# Error Tracking
has_error = False
payload = {
'event': {
'type': 'message_create',
'message_create': {
'target': {
# This gets assigned
'recipient_id': None,
},
'message_data': {
'text': body,
}
}
}
}
# Lookup our users
targets = self._whoami(lazy=self.cache) if not len(self.targets) \
else self._user_lookup(self.targets, lazy=self.cache)
if not targets:
# We failed to lookup any users
self.logger.warning(
'Failed to acquire user(s) to Direct Message via Twitter')
return False
for screen_name, user_id in targets.items():
# Assign our user
payload['event']['message_create']['target']['recipient_id'] = \
user_id
# Send Twitter DM
postokay, response = self._fetch(
self.twitter_dm,
payload=payload,
)
if not postokay:
# Track our error
has_error = True
continue
self.logger.info(
'Sent Twitter DM notification to @{}.'.format(screen_name))
return not has_error
def _whoami(self, lazy=True):
"""
Looks details of current authenticated user
"""
# Prepare a whoami key; this is to prevent conflict with other
# NotifyTwitter declarations that may or may not use a different
# set of authentication keys
whoami_key = '{}{}{}{}'.format(
self.ckey, self.csecret, self.akey, self.asecret)
if lazy and hasattr(NotifyTwitter, '_whoami_cache') \
and whoami_key in getattr(NotifyTwitter, '_whoami_cache'):
# Use cached response
return getattr(NotifyTwitter, '_whoami_cache')[whoami_key]
# Contains a mapping of screen_name to id
results = {}
# Send Twitter DM
postokay, response = self._fetch(
self.twitter_whoami,
method='GET',
json=False,
)
if postokay:
try:
results[response['screen_name']] = response['id']
if lazy:
# Cache our response for future references
if not hasattr(NotifyTwitter, '_whoami_cache'):
setattr(
NotifyTwitter, '_whoami_cache',
{whoami_key: results})
else:
getattr(NotifyTwitter, '_whoami_cache')\
.update({whoami_key: results})
# Update our user cache as well
if not hasattr(NotifyTwitter, '_user_cache'):
setattr(NotifyTwitter, '_user_cache', results)
else:
getattr(NotifyTwitter, '_user_cache').update(results)
except (TypeError, KeyError):
pass
return results
def _user_lookup(self, screen_name, lazy=True):
"""
Looks up a screen name and returns the user id
the screen_name can be a list/set/tuple as well
"""
# Contains a mapping of screen_name to id
results = {}
# Build a unique set of names
names = parse_list(screen_name)
if lazy and hasattr(NotifyTwitter, '_user_cache'):
# Use cached response
results = {k: v for k, v in getattr(
NotifyTwitter, '_user_cache').items() if k in names}
# limit our names if they already exist in our cache
names = [name for name in names if name not in results]
if not len(names):
# They're is nothing further to do
return results
# Twitters API documents that it can lookup to 100
# results at a time.
# https://developer.twitter.com/en/docs/accounts-and-users/\
# follow-search-get-users/api-reference/get-users-lookup
for i in range(0, len(names), 100):
# Send Twitter DM
postokay, response = self._fetch(
self.twitter_lookup,
payload={
'screen_name': names[i:i + 100],
},
json=False,
)
if not postokay or not isinstance(response, list):
# Track our error
continue
# Update our user index
for entry in response:
try:
results[entry['screen_name']] = entry['id']
except (TypeError, KeyError):
pass
# Cache our response for future use; this saves on un-nessisary extra
# hits against the Twitter API when we already know the answer
if lazy:
if not hasattr(NotifyTwitter, '_user_cache'):
setattr(NotifyTwitter, '_user_cache', results)
else:
getattr(NotifyTwitter, '_user_cache').update(results)
return results
def _fetch(self, url, payload=None, method='POST', json=True):
"""
Wrapper to Twitter API requests object
"""
headers = {
'Host': self.twitter_api,
'User-Agent': self.app_id,
}
if json:
headers['Content-Type'] = 'application/json'
payload = dumps(payload)
auth = OAuth1(
self.ckey,
client_secret=self.csecret,
resource_owner_key=self.akey,
resource_owner_secret=self.asecret,
)
# Some Debug Logging
self.logger.debug('Twitter {} URL: {} (cert_verify={})'.format(
method, url, self.verify_certificate))
self.logger.debug('Twitter Payload: %s' % str(payload))
# By default set wait to None
wait = None
if self.ratelimit_remaining == 0:
# Determine how long we should wait for or if we should wait at
# all. This isn't fool-proof because we can't be sure the client
# time (calling this script) is completely synced up with the
# Gitter server. One would hope we're on NTP and our clocks are
# the same allowing this to role smoothly:
now = datetime.utcnow()
if now < self.ratelimit_reset:
# We need to throttle for the difference in seconds
# We add 0.5 seconds to the end just to allow a grace
# period.
wait = (self.ratelimit_reset - now).total_seconds() + 0.5
# Default content response object
content = {}
# Always call throttle before any remote server i/o is made;
self.throttle(wait=wait)
# acquire our request mode
fn = requests.post if method == 'POST' else requests.get
try:
r = fn(
url,
data=payload,
headers=headers,
auth=auth,
verify=self.verify_certificate)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyTwitter.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Twitter {} to {}: '
'{}error={}.'.format(
method,
url,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
return (False, content)
try:
content = loads(r.content)
except (TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
content = {}
try:
# Capture rate limiting if possible
self.ratelimit_remaining = \
int(r.headers.get('x-rate-limit-remaining'))
self.ratelimit_reset = datetime.utcfromtimestamp(
int(r.headers.get('x-rate-limit-reset')))
except (TypeError, ValueError):
# This is returned if we could not retrieve this information
# gracefully accept this state and move on
pass
except requests.RequestException as e:
self.logger.warning(
'Exception received when sending Twitter {} to {}: '.
format(method, url))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
return (False, content)
return (True, content)
@property
def body_maxlen(self):
"""
The maximum allowable characters allowed in the body per message
This is used during a Private DM Message Size (not Public Tweets
which are limited to 280 characters)
"""
return 10000 if self.mode == TwitterMessageMode.DM else 280
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'mode': self.mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
if len(self.targets) > 0:
args['to'] = ','.join([NotifyTwitter.quote(x, safe='')
for x in self.targets])
return '{schema}://{ckey}/{csecret}/{akey}/{asecret}' \
'/{targets}/?{args}'.format(
schema=self.secure_protocol[0],
ckey=NotifyTwitter.quote(self.ckey, safe=''),
asecret=NotifyTwitter.quote(self.csecret, safe=''),
akey=NotifyTwitter.quote(self.akey, safe=''),
csecret=NotifyTwitter.quote(self.asecret, safe=''),
targets='/'.join(
[NotifyTwitter.quote('@{}'.format(target), safe='')
for target in self.targets]),
args=NotifyTwitter.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# The first token is stored in the hostname
consumer_key = NotifyTwitter.unquote(results['host'])
# Acquire remaining tokens
tokens = NotifyTwitter.split_path(results['fullpath'])
# Now fetch the remaining tokens
try:
consumer_secret, access_token_key, access_token_secret = \
tokens[0:3]
except (ValueError, AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
consumer_secret = None
access_token_key = None
access_token_secret = None
results['ckey'] = consumer_key
results['csecret'] = consumer_secret
results['akey'] = access_token_key
results['asecret'] = access_token_secret
# The defined twitter mode
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = \
NotifyTwitter.unquote(results['qsd']['mode'])
results['targets'] = []
# if a user has been defined, add it to the list of targets
if results.get('user'):
results['targets'].append(results.get('user'))
# Store any remaining items as potential targets
results['targets'].extend(tokens[3:])
if 'cache' in results['qsd'] and len(results['qsd']['cache']):
results['cache'] = \
parse_bool(results['qsd']['cache'], True)
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyTwitter.parse_list(results['qsd']['to'])
if results.get('schema', 'twitter').lower() == 'tweet':
# Deprication Notice issued for v0.7.9
NotifyTwitter.logger.deprecate(
'tweet:// has been replaced by twitter://')
return results

View File

@ -1,272 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from . import tweepy
from ..NotifyBase import NotifyBase
from ...common import NotifyType
from ...utils import parse_list
from ...AppriseLocale import gettext_lazy as _
class NotifyTwitter(NotifyBase):
"""
A wrapper to Twitter Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Twitter'
# The services URL
service_url = 'https://twitter.com/'
# The default secure protocol
secure_protocol = 'tweet'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twitter'
# The maximum allowable characters allowed in the body per message
# This is used during a Private DM Message Size (not Public Tweets
# which are limited to 240 characters)
body_maxlen = 4096
# Twitter does have titles when creating a message
title_maxlen = 0
templates = (
'{schema}://{user}@{ckey}{csecret}/{akey}/{asecret}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'ckey': {
'name': _('Consumer Key'),
'type': 'string',
'private': True,
'required': True,
},
'csecret': {
'name': _('Consumer Secret'),
'type': 'string',
'private': True,
'required': True,
},
'akey': {
'name': _('Access Key'),
'type': 'string',
'private': True,
'required': True,
},
'asecret': {
'name': _('Access Secret'),
'type': 'string',
'private': True,
'required': True,
},
'user': {
'name': _('User'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
})
def __init__(self, ckey, csecret, akey, asecret, targets=None, **kwargs):
"""
Initialize Twitter Object
"""
super(NotifyTwitter, self).__init__(**kwargs)
if not ckey:
msg = 'An invalid Consumer API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not csecret:
msg = 'An invalid Consumer Secret API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not akey:
msg = 'An invalid Access Token API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
if not asecret:
msg = 'An invalid Access Token Secret API Key was specified.'
self.logger.warning(msg)
raise TypeError(msg)
# Identify our targets
self.targets = parse_list(targets)
if len(self.targets) == 0 and not self.user:
msg = 'No user(s) were specified.'
self.logger.warning(msg)
raise TypeError(msg)
# Store our data
self.ckey = ckey
self.csecret = csecret
self.akey = akey
self.asecret = asecret
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Twitter Notification
"""
try:
# Attempt to Establish a connection to Twitter
self.auth = tweepy.OAuthHandler(self.ckey, self.csecret)
# Apply our Access Tokens
self.auth.set_access_token(self.akey, self.asecret)
except Exception:
self.logger.warning(
'Twitter authentication failed; '
'please verify your configuration.'
)
return False
# Get ourselves a list of targets
users = list(self.targets)
if not users:
# notify ourselves
users.append(self.user)
# Error Tracking
has_error = False
while len(users) > 0:
# Get our user
user = users.pop(0)
# Always call throttle before any remote server i/o is made to
# avoid thrashing the remote server and risk being blocked.
self.throttle()
try:
# Get our API
api = tweepy.API(self.auth)
# Send our Direct Message
api.send_direct_message(user, text=body)
self.logger.info(
'Sent Twitter DM notification to {}.'.format(user))
except Exception as e:
self.logger.warning(
'A Connection error occured sending Twitter '
'direct message to %s.' % user)
self.logger.debug('Twitter Exception: %s' % str(e))
# Track our error
has_error = True
return not has_error
def url(self):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
if len(self.targets) > 0:
args['to'] = ','.join([NotifyTwitter.quote(x, safe='')
for x in self.targets])
return '{schema}://{auth}{ckey}/{csecret}/{akey}/{asecret}' \
'/?{args}'.format(
auth='' if not self.user else '{user}@'.format(
user=NotifyTwitter.quote(self.user, safe='')),
schema=self.secure_protocol,
ckey=NotifyTwitter.quote(self.ckey, safe=''),
asecret=NotifyTwitter.quote(self.csecret, safe=''),
akey=NotifyTwitter.quote(self.akey, safe=''),
csecret=NotifyTwitter.quote(self.asecret, safe=''),
args=NotifyTwitter.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Apply our settings now
# The first token is stored in the hostname
consumer_key = NotifyTwitter.unquote(results['host'])
# Now fetch the remaining tokens
try:
consumer_secret, access_token_key, access_token_secret = \
NotifyTwitter.split_path(results['fullpath'])[0:3]
except (ValueError, AttributeError, IndexError):
# Force some bad values that will get caught
# in parsing later
consumer_secret = None
access_token_key = None
access_token_secret = None
results['ckey'] = consumer_key
results['csecret'] = consumer_secret
results['akey'] = access_token_key
results['asecret'] = access_token_secret
# Support the to= allowing one to identify more then one user to tweet
# too
results['targets'] = NotifyTwitter.parse_list(results['qsd'].get('to'))
return results

View File

@ -1,25 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
"""
Tweepy Twitter API library
"""
__version__ = '3.6.0'
__author__ = 'Joshua Roesslein'
__license__ = 'MIT'
from .models import Status, User, DirectMessage, Friendship, SavedSearch, SearchResults, ModelFactory, Category
from .error import TweepError, RateLimitError
from .api import API
from .cache import Cache, MemoryCache, FileCache
from .auth import OAuthHandler, AppAuthHandler
from .streaming import Stream, StreamListener
from .cursor import Cursor
# Global, unauthenticated instance of API
api = API()
def debug(enable=True, level=1):
from six.moves.http_client import HTTPConnection
HTTPConnection.debuglevel = level

File diff suppressed because it is too large Load Diff

View File

@ -1,178 +0,0 @@
from __future__ import print_function
import six
import logging
from .error import TweepError
from .api import API
import requests
from requests_oauthlib import OAuth1Session, OAuth1
from requests.auth import AuthBase
from six.moves.urllib.parse import parse_qs
WARNING_MESSAGE = """Warning! Due to a Twitter API bug, signin_with_twitter
and access_type don't always play nice together. Details
https://dev.twitter.com/discussions/21281"""
class AuthHandler(object):
def apply_auth(self, url, method, headers, parameters):
"""Apply authentication headers to request"""
raise NotImplementedError
def get_username(self):
"""Return the username of the authenticated user"""
raise NotImplementedError
class OAuthHandler(AuthHandler):
"""OAuth authentication handler"""
OAUTH_HOST = 'api.twitter.com'
OAUTH_ROOT = '/oauth/'
def __init__(self, consumer_key, consumer_secret, callback=None):
if type(consumer_key) == six.text_type:
consumer_key = consumer_key.encode('ascii')
if type(consumer_secret) == six.text_type:
consumer_secret = consumer_secret.encode('ascii')
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = None
self.access_token_secret = None
self.callback = callback
self.username = None
self.oauth = OAuth1Session(consumer_key,
client_secret=consumer_secret,
callback_uri=self.callback)
def _get_oauth_url(self, endpoint):
return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
def apply_auth(self):
return OAuth1(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.access_token,
resource_owner_secret=self.access_token_secret,
decoding=None)
def _get_request_token(self, access_type=None):
try:
url = self._get_oauth_url('request_token')
if access_type:
url += '?x_auth_access_type=%s' % access_type
return self.oauth.fetch_request_token(url)
except Exception as e:
raise TweepError(e)
def set_access_token(self, key, secret):
self.access_token = key
self.access_token_secret = secret
def get_authorization_url(self,
signin_with_twitter=False,
access_type=None):
"""Get the authorization URL to redirect the user"""
try:
if signin_with_twitter:
url = self._get_oauth_url('authenticate')
if access_type:
logging.warning(WARNING_MESSAGE)
else:
url = self._get_oauth_url('authorize')
self.request_token = self._get_request_token(access_type=access_type)
return self.oauth.authorization_url(url)
except Exception as e:
raise TweepError(e)
def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
self.oauth = OAuth1Session(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.request_token['oauth_token'],
resource_owner_secret=self.request_token['oauth_token_secret'],
verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
def get_xauth_access_token(self, username, password):
"""
Get an access token from an username and password combination.
In order to get this working you need to create an app at
http://twitter.com/apps, after that send a mail to api@twitter.com
and request activation of xAuth for it.
"""
try:
url = self._get_oauth_url('access_token')
oauth = OAuth1(self.consumer_key,
client_secret=self.consumer_secret)
r = requests.post(url=url,
auth=oauth,
headers={'x_auth_mode': 'client_auth',
'x_auth_username': username,
'x_auth_password': password})
credentials = parse_qs(r.content)
return credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0]
except Exception as e:
raise TweepError(e)
def get_username(self):
if self.username is None:
api = API(self)
user = api.verify_credentials()
if user:
self.username = user.screen_name
else:
raise TweepError('Unable to get username,'
' invalid oauth token!')
return self.username
class OAuth2Bearer(AuthBase):
def __init__(self, bearer_token):
self.bearer_token = bearer_token
def __call__(self, request):
request.headers['Authorization'] = 'Bearer ' + self.bearer_token
return request
class AppAuthHandler(AuthHandler):
"""Application-only authentication handler"""
OAUTH_HOST = 'api.twitter.com'
OAUTH_ROOT = '/oauth2/'
def __init__(self, consumer_key, consumer_secret):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self._bearer_token = ''
resp = requests.post(self._get_oauth_url('token'),
auth=(self.consumer_key,
self.consumer_secret),
data={'grant_type': 'client_credentials'})
data = resp.json()
if data.get('token_type') != 'bearer':
raise TweepError('Expected token_type to equal "bearer", '
'but got %s instead' % data.get('token_type'))
self._bearer_token = data['access_token']
def _get_oauth_url(self, endpoint):
return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
def apply_auth(self):
return OAuth2Bearer(self._bearer_token)

View File

@ -1,261 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
from __future__ import print_function
import time
import re
from six.moves.urllib.parse import quote, urlencode
import requests
import logging
from .error import TweepError, RateLimitError, is_rate_limit_error_message
from .utils import convert_to_utf8_str
from .models import Model
import six
import sys
re_path_template = re.compile(r'{\w+}')
log = logging.getLogger('tweepy.binder')
def bind_api(**config):
class APIMethod(object):
api = config['api']
path = config['path']
payload_type = config.get('payload_type', None)
payload_list = config.get('payload_list', False)
allowed_param = config.get('allowed_param', [])
method = config.get('method', 'GET')
require_auth = config.get('require_auth', False)
search_api = config.get('search_api', False)
upload_api = config.get('upload_api', False)
use_cache = config.get('use_cache', True)
session = requests.Session()
def __init__(self, args, kwargs):
api = self.api
# If authentication is required and no credentials
# are provided, throw an error.
if self.require_auth and not api.auth:
raise TweepError('Authentication required!')
self.post_data = kwargs.pop('post_data', None)
self.retry_count = kwargs.pop('retry_count',
api.retry_count)
self.retry_delay = kwargs.pop('retry_delay',
api.retry_delay)
self.retry_errors = kwargs.pop('retry_errors',
api.retry_errors)
self.wait_on_rate_limit = kwargs.pop('wait_on_rate_limit',
api.wait_on_rate_limit)
self.wait_on_rate_limit_notify = kwargs.pop('wait_on_rate_limit_notify',
api.wait_on_rate_limit_notify)
self.parser = kwargs.pop('parser', api.parser)
self.session.headers = kwargs.pop('headers', {})
self.build_parameters(args, kwargs)
# Pick correct URL root to use
if self.search_api:
self.api_root = api.search_root
elif self.upload_api:
self.api_root = api.upload_root
else:
self.api_root = api.api_root
# Perform any path variable substitution
self.build_path()
if self.search_api:
self.host = api.search_host
elif self.upload_api:
self.host = api.upload_host
else:
self.host = api.host
# Manually set Host header to fix an issue in python 2.5
# or older where Host is set including the 443 port.
# This causes Twitter to issue 301 redirect.
# See Issue https://github.com/tweepy/tweepy/issues/12
self.session.headers['Host'] = self.host
# Monitoring rate limits
self._remaining_calls = None
self._reset_time = None
def build_parameters(self, args, kwargs):
self.session.params = {}
for idx, arg in enumerate(args):
if arg is None:
continue
try:
self.session.params[self.allowed_param[idx]] = convert_to_utf8_str(arg)
except IndexError:
raise TweepError('Too many parameters supplied!')
for k, arg in kwargs.items():
if arg is None:
continue
if k in self.session.params:
raise TweepError('Multiple values for parameter %s supplied!' % k)
self.session.params[k] = convert_to_utf8_str(arg)
log.debug("PARAMS: %r", self.session.params)
def build_path(self):
for variable in re_path_template.findall(self.path):
name = variable.strip('{}')
if name == 'user' and 'user' not in self.session.params and self.api.auth:
# No 'user' parameter provided, fetch it from Auth instead.
value = self.api.auth.get_username()
else:
try:
value = quote(self.session.params[name])
except KeyError:
raise TweepError('No parameter value found for path variable: %s' % name)
del self.session.params[name]
self.path = self.path.replace(variable, value)
def execute(self):
self.api.cached_result = False
# Build the request URL
url = self.api_root + self.path
full_url = 'https://' + self.host + url
# Query the cache if one is available
# and this request uses a GET method.
if self.use_cache and self.api.cache and self.method == 'GET':
cache_result = self.api.cache.get('%s?%s' % (url, urlencode(self.session.params)))
# if cache result found and not expired, return it
if cache_result:
# must restore api reference
if isinstance(cache_result, list):
for result in cache_result:
if isinstance(result, Model):
result._api = self.api
else:
if isinstance(cache_result, Model):
cache_result._api = self.api
self.api.cached_result = True
return cache_result
# Continue attempting request until successful
# or maximum number of retries is reached.
retries_performed = 0
while retries_performed < self.retry_count + 1:
# handle running out of api calls
if self.wait_on_rate_limit:
if self._reset_time is not None:
if self._remaining_calls is not None:
if self._remaining_calls < 1:
sleep_time = self._reset_time - int(time.time())
if sleep_time > 0:
if self.wait_on_rate_limit_notify:
log.warning("Rate limit reached. Sleeping for: %d" % sleep_time)
time.sleep(sleep_time + 5) # sleep for few extra sec
# if self.wait_on_rate_limit and self._reset_time is not None and \
# self._remaining_calls is not None and self._remaining_calls < 1:
# sleep_time = self._reset_time - int(time.time())
# if sleep_time > 0:
# if self.wait_on_rate_limit_notify:
# log.warning("Rate limit reached. Sleeping for: %d" % sleep_time)
# time.sleep(sleep_time + 5) # sleep for few extra sec
# Apply authentication
auth = None
if self.api.auth:
auth = self.api.auth.apply_auth()
# Request compression if configured
if self.api.compression:
self.session.headers['Accept-encoding'] = 'gzip'
# Execute request
try:
resp = self.session.request(self.method,
full_url,
data=self.post_data,
timeout=self.api.timeout,
auth=auth,
proxies=self.api.proxy)
except Exception as e:
six.reraise(TweepError, TweepError('Failed to send request: %s' % e), sys.exc_info()[2])
rem_calls = resp.headers.get('x-rate-limit-remaining')
if rem_calls is not None:
self._remaining_calls = int(rem_calls)
elif isinstance(self._remaining_calls, int):
self._remaining_calls -= 1
reset_time = resp.headers.get('x-rate-limit-reset')
if reset_time is not None:
self._reset_time = int(reset_time)
if self.wait_on_rate_limit and self._remaining_calls == 0 and (
# if ran out of calls before waiting switching retry last call
resp.status_code == 429 or resp.status_code == 420):
continue
retry_delay = self.retry_delay
# Exit request loop if non-retry error code
if resp.status_code == 200:
break
elif (resp.status_code == 429 or resp.status_code == 420) and self.wait_on_rate_limit:
if 'retry-after' in resp.headers:
retry_delay = float(resp.headers['retry-after'])
elif self.retry_errors and resp.status_code not in self.retry_errors:
break
# Sleep before retrying request again
time.sleep(retry_delay)
retries_performed += 1
# If an error was returned, throw an exception
self.api.last_response = resp
if resp.status_code and not 200 <= resp.status_code < 300:
try:
error_msg, api_error_code = \
self.parser.parse_error(resp.text)
except Exception:
error_msg = "Twitter error response: status code = %s" % resp.status_code
api_error_code = None
if is_rate_limit_error_message(error_msg):
raise RateLimitError(error_msg, resp)
else:
raise TweepError(error_msg, resp, api_code=api_error_code)
# Parse the response payload
result = self.parser.parse(self, resp.text)
# Store result into cache if one is available.
if self.use_cache and self.api.cache and self.method == 'GET' and result:
self.api.cache.store('%s?%s' % (url, urlencode(self.session.params)), result)
return result
def _call(*args, **kwargs):
method = APIMethod(args, kwargs)
if kwargs.get('create'):
return method
else:
return method.execute()
# Set pagination mode
if 'cursor' in APIMethod.allowed_param:
_call.pagination_mode = 'cursor'
elif 'max_id' in APIMethod.allowed_param:
if 'since_id' in APIMethod.allowed_param:
_call.pagination_mode = 'id'
elif 'page' in APIMethod.allowed_param:
_call.pagination_mode = 'page'
return _call

View File

@ -1,432 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
from __future__ import print_function
import time
import datetime
import hashlib
import threading
import os
import logging
try:
import cPickle as pickle
except ImportError:
import pickle
try:
import fcntl
except ImportError:
# Probably on a windows system
# TODO: use win32file
pass
log = logging.getLogger('tweepy.cache')
class Cache(object):
"""Cache interface"""
def __init__(self, timeout=60):
"""Initialize the cache
timeout: number of seconds to keep a cached entry
"""
self.timeout = timeout
def store(self, key, value):
"""Add new record to cache
key: entry key
value: data of entry
"""
raise NotImplementedError
def get(self, key, timeout=None):
"""Get cached entry if exists and not expired
key: which entry to get
timeout: override timeout with this value [optional]
"""
raise NotImplementedError
def count(self):
"""Get count of entries currently stored in cache"""
raise NotImplementedError
def cleanup(self):
"""Delete any expired entries in cache."""
raise NotImplementedError
def flush(self):
"""Delete all cached entries"""
raise NotImplementedError
class MemoryCache(Cache):
"""In-memory cache"""
def __init__(self, timeout=60):
Cache.__init__(self, timeout)
self._entries = {}
self.lock = threading.Lock()
def __getstate__(self):
# pickle
return {'entries': self._entries, 'timeout': self.timeout}
def __setstate__(self, state):
# unpickle
self.lock = threading.Lock()
self._entries = state['entries']
self.timeout = state['timeout']
def _is_expired(self, entry, timeout):
return timeout > 0 and (time.time() - entry[0]) >= timeout
def store(self, key, value):
self.lock.acquire()
self._entries[key] = (time.time(), value)
self.lock.release()
def get(self, key, timeout=None):
self.lock.acquire()
try:
# check to see if we have this key
entry = self._entries.get(key)
if not entry:
# no hit, return nothing
return None
# use provided timeout in arguments if provided
# otherwise use the one provided during init.
if timeout is None:
timeout = self.timeout
# make sure entry is not expired
if self._is_expired(entry, timeout):
# entry expired, delete and return nothing
del self._entries[key]
return None
# entry found and not expired, return it
return entry[1]
finally:
self.lock.release()
def count(self):
return len(self._entries)
def cleanup(self):
self.lock.acquire()
try:
for k, v in dict(self._entries).items():
if self._is_expired(v, self.timeout):
del self._entries[k]
finally:
self.lock.release()
def flush(self):
self.lock.acquire()
self._entries.clear()
self.lock.release()
class FileCache(Cache):
"""File-based cache"""
# locks used to make cache thread-safe
cache_locks = {}
def __init__(self, cache_dir, timeout=60):
Cache.__init__(self, timeout)
if os.path.exists(cache_dir) is False:
os.mkdir(cache_dir)
self.cache_dir = cache_dir
if cache_dir in FileCache.cache_locks:
self.lock = FileCache.cache_locks[cache_dir]
else:
self.lock = threading.Lock()
FileCache.cache_locks[cache_dir] = self.lock
if os.name == 'posix':
self._lock_file = self._lock_file_posix
self._unlock_file = self._unlock_file_posix
elif os.name == 'nt':
self._lock_file = self._lock_file_win32
self._unlock_file = self._unlock_file_win32
else:
log.warning('FileCache locking not supported on this system!')
self._lock_file = self._lock_file_dummy
self._unlock_file = self._unlock_file_dummy
def _get_path(self, key):
md5 = hashlib.md5()
md5.update(key.encode('utf-8'))
return os.path.join(self.cache_dir, md5.hexdigest())
def _lock_file_dummy(self, path, exclusive=True):
return None
def _unlock_file_dummy(self, lock):
return
def _lock_file_posix(self, path, exclusive=True):
lock_path = path + '.lock'
if exclusive is True:
f_lock = open(lock_path, 'w')
fcntl.lockf(f_lock, fcntl.LOCK_EX)
else:
f_lock = open(lock_path, 'r')
fcntl.lockf(f_lock, fcntl.LOCK_SH)
if os.path.exists(lock_path) is False:
f_lock.close()
return None
return f_lock
def _unlock_file_posix(self, lock):
lock.close()
def _lock_file_win32(self, path, exclusive=True):
# TODO: implement
return None
def _unlock_file_win32(self, lock):
# TODO: implement
return
def _delete_file(self, path):
os.remove(path)
if os.path.exists(path + '.lock'):
os.remove(path + '.lock')
def store(self, key, value):
path = self._get_path(key)
self.lock.acquire()
try:
# acquire lock and open file
f_lock = self._lock_file(path)
datafile = open(path, 'wb')
# write data
pickle.dump((time.time(), value), datafile)
# close and unlock file
datafile.close()
self._unlock_file(f_lock)
finally:
self.lock.release()
def get(self, key, timeout=None):
return self._get(self._get_path(key), timeout)
def _get(self, path, timeout):
if os.path.exists(path) is False:
# no record
return None
self.lock.acquire()
try:
# acquire lock and open
f_lock = self._lock_file(path, False)
datafile = open(path, 'rb')
# read pickled object
created_time, value = pickle.load(datafile)
datafile.close()
# check if value is expired
if timeout is None:
timeout = self.timeout
if timeout > 0:
if (time.time() - created_time) >= timeout:
# expired! delete from cache
value = None
self._delete_file(path)
# unlock and return result
self._unlock_file(f_lock)
return value
finally:
self.lock.release()
def count(self):
c = 0
for entry in os.listdir(self.cache_dir):
if entry.endswith('.lock'):
continue
c += 1
return c
def cleanup(self):
for entry in os.listdir(self.cache_dir):
if entry.endswith('.lock'):
continue
self._get(os.path.join(self.cache_dir, entry), None)
def flush(self):
for entry in os.listdir(self.cache_dir):
if entry.endswith('.lock'):
continue
self._delete_file(os.path.join(self.cache_dir, entry))
class MemCacheCache(Cache):
"""Cache interface"""
def __init__(self, client, timeout=60):
"""Initialize the cache
client: The memcache client
timeout: number of seconds to keep a cached entry
"""
self.client = client
self.timeout = timeout
def store(self, key, value):
"""Add new record to cache
key: entry key
value: data of entry
"""
self.client.set(key, value, time=self.timeout)
def get(self, key, timeout=None):
"""Get cached entry if exists and not expired
key: which entry to get
timeout: override timeout with this value [optional].
DOES NOT WORK HERE
"""
return self.client.get(key)
def count(self):
"""Get count of entries currently stored in cache. RETURN 0"""
raise NotImplementedError
def cleanup(self):
"""Delete any expired entries in cache. NO-OP"""
raise NotImplementedError
def flush(self):
"""Delete all cached entries. NO-OP"""
raise NotImplementedError
class RedisCache(Cache):
"""Cache running in a redis server"""
def __init__(self, client,
timeout=60,
keys_container='tweepy:keys',
pre_identifier='tweepy:'):
Cache.__init__(self, timeout)
self.client = client
self.keys_container = keys_container
self.pre_identifier = pre_identifier
def _is_expired(self, entry, timeout):
# Returns true if the entry has expired
return timeout > 0 and (time.time() - entry[0]) >= timeout
def store(self, key, value):
"""Store the key, value pair in our redis server"""
# Prepend tweepy to our key,
# this makes it easier to identify tweepy keys in our redis server
key = self.pre_identifier + key
# Get a pipe (to execute several redis commands in one step)
pipe = self.client.pipeline()
# Set our values in a redis hash (similar to python dict)
pipe.set(key, pickle.dumps((time.time(), value)))
# Set the expiration
pipe.expire(key, self.timeout)
# Add the key to a set containing all the keys
pipe.sadd(self.keys_container, key)
# Execute the instructions in the redis server
pipe.execute()
def get(self, key, timeout=None):
"""Given a key, returns an element from the redis table"""
key = self.pre_identifier + key
# Check to see if we have this key
unpickled_entry = self.client.get(key)
if not unpickled_entry:
# No hit, return nothing
return None
entry = pickle.loads(unpickled_entry)
# Use provided timeout in arguments if provided
# otherwise use the one provided during init.
if timeout is None:
timeout = self.timeout
# Make sure entry is not expired
if self._is_expired(entry, timeout):
# entry expired, delete and return nothing
self.delete_entry(key)
return None
# entry found and not expired, return it
return entry[1]
def count(self):
"""Note: This is not very efficient,
since it retreives all the keys from the redis
server to know how many keys we have"""
return len(self.client.smembers(self.keys_container))
def delete_entry(self, key):
"""Delete an object from the redis table"""
pipe = self.client.pipeline()
pipe.srem(self.keys_container, key)
pipe.delete(key)
pipe.execute()
def cleanup(self):
"""Cleanup all the expired keys"""
keys = self.client.smembers(self.keys_container)
for key in keys:
entry = self.client.get(key)
if entry:
entry = pickle.loads(entry)
if self._is_expired(entry, self.timeout):
self.delete_entry(key)
def flush(self):
"""Delete all entries from the cache"""
keys = self.client.smembers(self.keys_container)
for key in keys:
self.delete_entry(key)
class MongodbCache(Cache):
"""A simple pickle-based MongoDB cache sytem."""
def __init__(self, db, timeout=3600, collection='tweepy_cache'):
"""Should receive a "database" cursor from pymongo."""
Cache.__init__(self, timeout)
self.timeout = timeout
self.col = db[collection]
self.col.create_index('created', expireAfterSeconds=timeout)
def store(self, key, value):
from bson.binary import Binary
now = datetime.datetime.utcnow()
blob = Binary(pickle.dumps(value))
self.col.insert({'created': now, '_id': key, 'value': blob})
def get(self, key, timeout=None):
if timeout:
raise NotImplementedError
obj = self.col.find_one({'_id': key})
if obj:
return pickle.loads(obj['value'])
def count(self):
return self.col.find({}).count()
def delete_entry(self, key):
return self.col.remove({'_id': key})
def cleanup(self):
"""MongoDB will automatically clear expired keys."""
pass
def flush(self):
self.col.drop()
self.col.create_index('created', expireAfterSeconds=self.timeout)

View File

@ -1,214 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
from __future__ import print_function
from .error import TweepError
from .parsers import ModelParser, RawParser
class Cursor(object):
"""Pagination helper class"""
def __init__(self, method, *args, **kargs):
if hasattr(method, 'pagination_mode'):
if method.pagination_mode == 'cursor':
self.iterator = CursorIterator(method, args, kargs)
elif method.pagination_mode == 'id':
self.iterator = IdIterator(method, args, kargs)
elif method.pagination_mode == 'page':
self.iterator = PageIterator(method, args, kargs)
else:
raise TweepError('Invalid pagination mode.')
else:
raise TweepError('This method does not perform pagination')
def pages(self, limit=0):
"""Return iterator for pages"""
if limit > 0:
self.iterator.limit = limit
return self.iterator
def items(self, limit=0):
"""Return iterator for items in each page"""
i = ItemIterator(self.iterator)
i.limit = limit
return i
class BaseIterator(object):
def __init__(self, method, args, kargs):
self.method = method
self.args = args
self.kargs = kargs
self.limit = 0
def __next__(self):
return self.next()
def next(self):
raise NotImplementedError
def prev(self):
raise NotImplementedError
def __iter__(self):
return self
class CursorIterator(BaseIterator):
def __init__(self, method, args, kargs):
BaseIterator.__init__(self, method, args, kargs)
start_cursor = kargs.pop('cursor', None)
self.next_cursor = start_cursor or -1
self.prev_cursor = start_cursor or 0
self.num_tweets = 0
def next(self):
if self.next_cursor == 0 or (self.limit and self.num_tweets == self.limit):
raise StopIteration
data, cursors = self.method(cursor=self.next_cursor,
*self.args,
**self.kargs)
self.prev_cursor, self.next_cursor = cursors
if len(data) == 0:
raise StopIteration
self.num_tweets += 1
return data
def prev(self):
if self.prev_cursor == 0:
raise TweepError('Can not page back more, at first page')
data, self.next_cursor, self.prev_cursor = self.method(cursor=self.prev_cursor,
*self.args,
**self.kargs)
self.num_tweets -= 1
return data
class IdIterator(BaseIterator):
def __init__(self, method, args, kargs):
BaseIterator.__init__(self, method, args, kargs)
self.max_id = kargs.pop('max_id', None)
self.num_tweets = 0
self.results = []
self.model_results = []
self.index = 0
def next(self):
"""Fetch a set of items with IDs less than current set."""
if self.limit and self.limit == self.num_tweets:
raise StopIteration
if self.index >= len(self.results) - 1:
data = self.method(max_id=self.max_id, parser=RawParser(), *self.args, **self.kargs)
if hasattr(self.method, '__self__'):
old_parser = self.method.__self__.parser
# Hack for models which expect ModelParser to be set
self.method.__self__.parser = ModelParser()
# This is a special invocation that returns the underlying
# APIMethod class
model = ModelParser().parse(self.method(create=True), data)
if hasattr(self.method, '__self__'):
self.method.__self__.parser = old_parser
result = self.method.__self__.parser.parse(self.method(create=True), data)
else:
result = model
if len(self.results) != 0:
self.index += 1
self.results.append(result)
self.model_results.append(model)
else:
self.index += 1
result = self.results[self.index]
model = self.model_results[self.index]
if len(result) == 0:
raise StopIteration
# TODO: Make this not dependant on the parser making max_id and
# since_id available
self.max_id = model.max_id
self.num_tweets += 1
return result
def prev(self):
"""Fetch a set of items with IDs greater than current set."""
if self.limit and self.limit == self.num_tweets:
raise StopIteration
self.index -= 1
if self.index < 0:
# There's no way to fetch a set of tweets directly 'above' the
# current set
raise StopIteration
data = self.results[self.index]
self.max_id = self.model_results[self.index].max_id
self.num_tweets += 1
return data
class PageIterator(BaseIterator):
def __init__(self, method, args, kargs):
BaseIterator.__init__(self, method, args, kargs)
self.current_page = 0
def next(self):
if self.limit > 0:
if self.current_page > self.limit:
raise StopIteration
items = self.method(page=self.current_page, *self.args, **self.kargs)
if len(items) == 0:
raise StopIteration
self.current_page += 1
return items
def prev(self):
if self.current_page == 1:
raise TweepError('Can not page back more, at first page')
self.current_page -= 1
return self.method(page=self.current_page, *self.args, **self.kargs)
class ItemIterator(BaseIterator):
def __init__(self, page_iterator):
self.page_iterator = page_iterator
self.limit = 0
self.current_page = None
self.page_index = -1
self.num_tweets = 0
def next(self):
if self.limit > 0:
if self.num_tweets == self.limit:
raise StopIteration
if self.current_page is None or self.page_index == len(self.current_page) - 1:
# Reached end of current page, get the next page...
self.current_page = self.page_iterator.next()
self.page_index = -1
self.page_index += 1
self.num_tweets += 1
return self.current_page[self.page_index]
def prev(self):
if self.current_page is None:
raise TweepError('Can not go back more, at first page')
if self.page_index == 0:
# At the beginning of the current page, move to next...
self.current_page = self.page_iterator.prev()
self.page_index = len(self.current_page)
if self.page_index == 0:
raise TweepError('No more items')
self.page_index -= 1
self.num_tweets -= 1
return self.current_page[self.page_index]

View File

@ -1,34 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
from __future__ import print_function
import six
class TweepError(Exception):
"""Tweepy exception"""
def __init__(self, reason, response=None, api_code=None):
self.reason = six.text_type(reason)
self.response = response
self.api_code = api_code
Exception.__init__(self, reason)
def __str__(self):
return self.reason
def is_rate_limit_error_message(message):
"""Check if the supplied error message belongs to a rate limit error."""
return isinstance(message, list) \
and len(message) > 0 \
and 'code' in message[0] \
and message[0]['code'] == 88
class RateLimitError(TweepError):
"""Exception for Tweepy hitting the rate limit."""
# RateLimitError has the exact same properties and inner workings
# as TweepError for backwards compatibility reasons.
pass

View File

@ -1,495 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
from __future__ import absolute_import, print_function
from .utils import parse_datetime, parse_html_value, parse_a_href
class ResultSet(list):
"""A list like object that holds results from a Twitter API query."""
def __init__(self, max_id=None, since_id=None):
super(ResultSet, self).__init__()
self._max_id = max_id
self._since_id = since_id
@property
def max_id(self):
if self._max_id:
return self._max_id
ids = self.ids()
# Max_id is always set to the *smallest* id, minus one, in the set
return (min(ids) - 1) if ids else None
@property
def since_id(self):
if self._since_id:
return self._since_id
ids = self.ids()
# Since_id is always set to the *greatest* id in the set
return max(ids) if ids else None
def ids(self):
return [item.id for item in self if hasattr(item, 'id')]
class Model(object):
def __init__(self, api=None):
self._api = api
def __getstate__(self):
# pickle
pickle = dict(self.__dict__)
try:
del pickle['_api'] # do not pickle the API reference
except KeyError:
pass
return pickle
@classmethod
def parse(cls, api, json):
"""Parse a JSON object into a model instance."""
raise NotImplementedError
@classmethod
def parse_list(cls, api, json_list):
"""
Parse a list of JSON objects into
a result set of model instances.
"""
results = ResultSet()
for obj in json_list:
if obj:
results.append(cls.parse(api, obj))
return results
def __repr__(self):
state = ['%s=%s' % (k, repr(v)) for (k, v) in vars(self).items()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(state))
class Status(Model):
@classmethod
def parse(cls, api, json):
status = cls(api)
setattr(status, '_json', json)
for k, v in json.items():
if k == 'user':
user_model = getattr(api.parser.model_factory, 'user') if api else User
user = user_model.parse(api, v)
setattr(status, 'author', user)
setattr(status, 'user', user) # DEPRECIATED
elif k == 'created_at':
setattr(status, k, parse_datetime(v))
elif k == 'source':
if '<' in v:
setattr(status, k, parse_html_value(v))
setattr(status, 'source_url', parse_a_href(v))
else:
setattr(status, k, v)
setattr(status, 'source_url', None)
elif k == 'retweeted_status':
setattr(status, k, Status.parse(api, v))
elif k == 'quoted_status':
setattr(status, k, Status.parse(api, v))
elif k == 'place':
if v is not None:
setattr(status, k, Place.parse(api, v))
else:
setattr(status, k, None)
else:
setattr(status, k, v)
return status
def destroy(self):
return self._api.destroy_status(self.id)
def retweet(self):
return self._api.retweet(self.id)
def retweets(self):
return self._api.retweets(self.id)
def favorite(self):
return self._api.create_favorite(self.id)
def __eq__(self, other):
if isinstance(other, Status):
return self.id == other.id
return NotImplemented
def __ne__(self, other):
result = self == other
if result is NotImplemented:
return result
return not result
class User(Model):
@classmethod
def parse(cls, api, json):
user = cls(api)
setattr(user, '_json', json)
for k, v in json.items():
if k == 'created_at':
setattr(user, k, parse_datetime(v))
elif k == 'status':
setattr(user, k, Status.parse(api, v))
elif k == 'following':
# twitter sets this to null if it is false
if v is True:
setattr(user, k, True)
else:
setattr(user, k, False)
else:
setattr(user, k, v)
return user
@classmethod
def parse_list(cls, api, json_list):
if isinstance(json_list, list):
item_list = json_list
else:
item_list = json_list['users']
results = ResultSet()
for obj in item_list:
results.append(cls.parse(api, obj))
return results
def timeline(self, **kargs):
return self._api.user_timeline(user_id=self.id, **kargs)
def friends(self, **kargs):
return self._api.friends(user_id=self.id, **kargs)
def followers(self, **kargs):
return self._api.followers(user_id=self.id, **kargs)
def follow(self):
self._api.create_friendship(user_id=self.id)
self.following = True
def unfollow(self):
self._api.destroy_friendship(user_id=self.id)
self.following = False
def lists_memberships(self, *args, **kargs):
return self._api.lists_memberships(user=self.screen_name,
*args,
**kargs)
def lists_subscriptions(self, *args, **kargs):
return self._api.lists_subscriptions(user=self.screen_name,
*args,
**kargs)
def lists(self, *args, **kargs):
return self._api.lists_all(user=self.screen_name,
*args,
**kargs)
def followers_ids(self, *args, **kargs):
return self._api.followers_ids(user_id=self.id,
*args,
**kargs)
class DirectMessage(Model):
@classmethod
def parse(cls, api, json):
dm = cls(api)
for k, v in json.items():
if k == 'sender' or k == 'recipient':
setattr(dm, k, User.parse(api, v))
elif k == 'created_at':
setattr(dm, k, parse_datetime(v))
else:
setattr(dm, k, v)
return dm
def destroy(self):
return self._api.destroy_direct_message(self.id)
class Friendship(Model):
@classmethod
def parse(cls, api, json):
relationship = json['relationship']
# parse source
source = cls(api)
for k, v in relationship['source'].items():
setattr(source, k, v)
# parse target
target = cls(api)
for k, v in relationship['target'].items():
setattr(target, k, v)
return source, target
class Category(Model):
@classmethod
def parse(cls, api, json):
category = cls(api)
for k, v in json.items():
setattr(category, k, v)
return category
class SavedSearch(Model):
@classmethod
def parse(cls, api, json):
ss = cls(api)
for k, v in json.items():
if k == 'created_at':
setattr(ss, k, parse_datetime(v))
else:
setattr(ss, k, v)
return ss
def destroy(self):
return self._api.destroy_saved_search(self.id)
class SearchResults(ResultSet):
@classmethod
def parse(cls, api, json):
metadata = json['search_metadata']
results = SearchResults()
results.refresh_url = metadata.get('refresh_url')
results.completed_in = metadata.get('completed_in')
results.query = metadata.get('query')
results.count = metadata.get('count')
results.next_results = metadata.get('next_results')
status_model = getattr(api.parser.model_factory, 'status') if api else Status
for status in json['statuses']:
results.append(status_model.parse(api, status))
return results
class List(Model):
@classmethod
def parse(cls, api, json):
lst = List(api)
for k, v in json.items():
if k == 'user':
setattr(lst, k, User.parse(api, v))
elif k == 'created_at':
setattr(lst, k, parse_datetime(v))
else:
setattr(lst, k, v)
return lst
@classmethod
def parse_list(cls, api, json_list, result_set=None):
results = ResultSet()
if isinstance(json_list, dict):
json_list = json_list['lists']
for obj in json_list:
results.append(cls.parse(api, obj))
return results
def update(self, **kargs):
return self._api.update_list(self.slug, **kargs)
def destroy(self):
return self._api.destroy_list(self.slug)
def timeline(self, **kargs):
return self._api.list_timeline(self.user.screen_name,
self.slug,
**kargs)
def add_member(self, id):
return self._api.add_list_member(self.slug, id)
def remove_member(self, id):
return self._api.remove_list_member(self.slug, id)
def members(self, **kargs):
return self._api.list_members(self.user.screen_name,
self.slug,
**kargs)
def is_member(self, id):
return self._api.is_list_member(self.user.screen_name,
self.slug,
id)
def subscribe(self):
return self._api.subscribe_list(self.user.screen_name, self.slug)
def unsubscribe(self):
return self._api.unsubscribe_list(self.user.screen_name, self.slug)
def subscribers(self, **kargs):
return self._api.list_subscribers(self.user.screen_name,
self.slug,
**kargs)
def is_subscribed(self, id):
return self._api.is_subscribed_list(self.user.screen_name,
self.slug,
id)
class Relation(Model):
@classmethod
def parse(cls, api, json):
result = cls(api)
for k, v in json.items():
if k == 'value' and json['kind'] in ['Tweet', 'LookedupStatus']:
setattr(result, k, Status.parse(api, v))
elif k == 'results':
setattr(result, k, Relation.parse_list(api, v))
else:
setattr(result, k, v)
return result
class Relationship(Model):
@classmethod
def parse(cls, api, json):
result = cls(api)
for k, v in json.items():
if k == 'connections':
setattr(result, 'is_following', 'following' in v)
setattr(result, 'is_followed_by', 'followed_by' in v)
else:
setattr(result, k, v)
return result
class JSONModel(Model):
@classmethod
def parse(cls, api, json):
return json
class IDModel(Model):
@classmethod
def parse(cls, api, json):
if isinstance(json, list):
return json
else:
return json['ids']
class BoundingBox(Model):
@classmethod
def parse(cls, api, json):
result = cls(api)
if json is not None:
for k, v in json.items():
setattr(result, k, v)
return result
def origin(self):
"""
Return longitude, latitude of southwest (bottom, left) corner of
bounding box, as a tuple.
This assumes that bounding box is always a rectangle, which
appears to be the case at present.
"""
return tuple(self.coordinates[0][0])
def corner(self):
"""
Return longitude, latitude of northeast (top, right) corner of
bounding box, as a tuple.
This assumes that bounding box is always a rectangle, which
appears to be the case at present.
"""
return tuple(self.coordinates[0][2])
class Place(Model):
@classmethod
def parse(cls, api, json):
place = cls(api)
for k, v in json.items():
if k == 'bounding_box':
# bounding_box value may be null (None.)
# Example: "United States" (id=96683cc9126741d1)
if v is not None:
t = BoundingBox.parse(api, v)
else:
t = v
setattr(place, k, t)
elif k == 'contained_within':
# contained_within is a list of Places.
setattr(place, k, Place.parse_list(api, v))
else:
setattr(place, k, v)
return place
@classmethod
def parse_list(cls, api, json_list):
if isinstance(json_list, list):
item_list = json_list
else:
item_list = json_list['result']['places']
results = ResultSet()
for obj in item_list:
results.append(cls.parse(api, obj))
return results
class Media(Model):
@classmethod
def parse(cls, api, json):
media = cls(api)
for k, v in json.items():
setattr(media, k, v)
return media
class ModelFactory(object):
"""
Used by parsers for creating instances
of models. You may subclass this factory
to add your own extended models.
"""
status = Status
user = User
direct_message = DirectMessage
friendship = Friendship
saved_search = SavedSearch
search_results = SearchResults
category = Category
list = List
relation = Relation
relationship = Relationship
media = Media
json = JSONModel
ids = IDModel
place = Place
bounding_box = BoundingBox

View File

@ -1,109 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
from __future__ import print_function
from .models import ModelFactory
from .utils import import_simplejson
from .error import TweepError
class Parser(object):
def parse(self, method, payload):
"""
Parse the response payload and return the result.
Returns a tuple that contains the result data and the cursors
(or None if not present).
"""
raise NotImplementedError
def parse_error(self, payload):
"""
Parse the error message and api error code from payload.
Return them as an (error_msg, error_code) tuple. If unable to parse the
message, throw an exception and default error message will be used.
"""
raise NotImplementedError
class RawParser(Parser):
def __init__(self):
pass
def parse(self, method, payload):
return payload
def parse_error(self, payload):
return payload
class JSONParser(Parser):
payload_format = 'json'
def __init__(self):
self.json_lib = import_simplejson()
def parse(self, method, payload):
try:
json = self.json_lib.loads(payload)
except Exception as e:
raise TweepError('Failed to parse JSON payload: %s' % e)
needs_cursors = 'cursor' in method.session.params
if needs_cursors and isinstance(json, dict) \
and 'previous_cursor' in json \
and 'next_cursor' in json:
cursors = json['previous_cursor'], json['next_cursor']
return json, cursors
else:
return json
def parse_error(self, payload):
error_object = self.json_lib.loads(payload)
if 'error' in error_object:
reason = error_object['error']
api_code = error_object.get('code')
else:
reason = error_object['errors']
api_code = [error.get('code') for error in
reason if error.get('code')]
api_code = api_code[0] if len(api_code) == 1 else api_code
return reason, api_code
class ModelParser(JSONParser):
def __init__(self, model_factory=None):
JSONParser.__init__(self)
self.model_factory = model_factory or ModelFactory
def parse(self, method, payload):
try:
if method.payload_type is None:
return
model = getattr(self.model_factory, method.payload_type)
except AttributeError:
raise TweepError('No model for this payload type: '
'%s' % method.payload_type)
json = JSONParser.parse(self, method, payload)
if isinstance(json, tuple):
json, cursors = json
else:
cursors = None
if method.payload_list:
result = model.parse_list(method.api, json)
else:
result = model.parse(method.api, json)
if cursors:
return result, cursors
else:
return result

View File

@ -1,476 +0,0 @@
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
from __future__ import absolute_import, print_function
import logging
import re
import requests
import sys
from requests.exceptions import Timeout
from threading import Thread
from time import sleep
import six
import ssl
from .models import Status
from .api import API
from .error import TweepError
from .utils import import_simplejson
json = import_simplejson()
STREAM_VERSION = '1.1'
class StreamListener(object):
def __init__(self, api=None):
self.api = api or API()
def on_connect(self):
"""Called once connected to streaming server.
This will be invoked once a successful response
is received from the server. Allows the listener
to perform some work prior to entering the read loop.
"""
pass
def on_data(self, raw_data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(raw_data)
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'friends' in data:
if self.on_friends(data['friends']) is False:
return False
elif 'limit' in data:
if self.on_limit(data['limit']['track']) is False:
return False
elif 'disconnect' in data:
if self.on_disconnect(data['disconnect']) is False:
return False
elif 'warning' in data:
if self.on_warning(data['warning']) is False:
return False
else:
logging.error("Unknown message type: " + str(raw_data))
def keep_alive(self):
"""Called when a keep-alive arrived"""
return
def on_status(self, status):
"""Called when a new status arrives"""
return
def on_exception(self, exception):
"""Called when an unhandled exception occurs."""
return
def on_delete(self, status_id, user_id):
"""Called when a delete notice arrives for a status"""
return
def on_event(self, status):
"""Called when a new event arrives"""
return
def on_direct_message(self, status):
"""Called when a new direct message arrives"""
return
def on_friends(self, friends):
"""Called when a friends list arrives.
friends is a list that contains user_id
"""
return
def on_limit(self, track):
"""Called when a limitation notice arrives"""
return
def on_error(self, status_code):
"""Called when a non-200 status code is returned"""
return False
def on_timeout(self):
"""Called when stream connection times out"""
return
def on_disconnect(self, notice):
"""Called when twitter sends a disconnect notice
Disconnect codes are listed here:
https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
"""
return
def on_warning(self, notice):
"""Called when a disconnection warning message arrives"""
return
class ReadBuffer(object):
"""Buffer data from the response in a smarter way than httplib/requests can.
Tweets are roughly in the 2-12kb range, averaging around 3kb.
Requests/urllib3/httplib/socket all use socket.read, which blocks
until enough data is returned. On some systems (eg google appengine), socket
reads are quite slow. To combat this latency we can read big chunks,
but the blocking part means we won't get results until enough tweets
have arrived. That may not be a big deal for high throughput systems.
For low throughput systems we don't want to sacrafice latency, so we
use small chunks so it can read the length and the tweet in 2 read calls.
"""
def __init__(self, stream, chunk_size, encoding='utf-8'):
self._stream = stream
self._buffer = six.b('')
self._chunk_size = chunk_size
self._encoding = encoding
def read_len(self, length):
while not self._stream.closed:
if len(self._buffer) >= length:
return self._pop(length)
read_len = max(self._chunk_size, length - len(self._buffer))
self._buffer += self._stream.read(read_len)
return six.b('')
def read_line(self, sep=six.b('\n')):
"""Read the data stream until a given separator is found (default \n)
:param sep: Separator to read until. Must by of the bytes type (str in python 2,
bytes in python 3)
:return: The str of the data read until sep
"""
start = 0
while not self._stream.closed:
loc = self._buffer.find(sep, start)
if loc >= 0:
return self._pop(loc + len(sep))
else:
start = len(self._buffer)
self._buffer += self._stream.read(self._chunk_size)
return six.b('')
def _pop(self, length):
r = self._buffer[:length]
self._buffer = self._buffer[length:]
return r.decode(self._encoding)
class Stream(object):
host = 'stream.twitter.com'
def __init__(self, auth, listener, **options):
self.auth = auth
self.listener = listener
self.running = False
self.timeout = options.get("timeout", 300.0)
self.retry_count = options.get("retry_count")
# values according to
# https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
self.retry_time_start = options.get("retry_time", 5.0)
self.retry_420_start = options.get("retry_420", 60.0)
self.retry_time_cap = options.get("retry_time_cap", 320.0)
self.snooze_time_step = options.get("snooze_time", 0.25)
self.snooze_time_cap = options.get("snooze_time_cap", 16)
# The default socket.read size. Default to less than half the size of
# a tweet so that it reads tweets with the minimal latency of 2 reads
# per tweet. Values higher than ~1kb will increase latency by waiting
# for more data to arrive but may also increase throughput by doing
# fewer socket read calls.
self.chunk_size = options.get("chunk_size", 512)
self.verify = options.get("verify", True)
self.api = API()
self.headers = options.get("headers") or {}
self.new_session()
self.body = None
self.retry_time = self.retry_time_start
self.snooze_time = self.snooze_time_step
# Example: proxies = {'http': 'http://localhost:1080', 'https': 'http://localhost:1080'}
self.proxies = options.get("proxies")
def new_session(self):
self.session = requests.Session()
self.session.headers = self.headers
self.session.params = None
def _run(self):
# Authenticate
url = "https://%s%s" % (self.host, self.url)
# Connect and process the stream
error_counter = 0
resp = None
exc_info = None
while self.running:
if self.retry_count is not None:
if error_counter > self.retry_count:
# quit if error count greater than retry count
break
try:
auth = self.auth.apply_auth()
resp = self.session.request('POST',
url,
data=self.body,
timeout=self.timeout,
stream=True,
auth=auth,
verify=self.verify,
proxies = self.proxies)
if resp.status_code != 200:
if self.listener.on_error(resp.status_code) is False:
break
error_counter += 1
if resp.status_code == 420:
self.retry_time = max(self.retry_420_start,
self.retry_time)
sleep(self.retry_time)
self.retry_time = min(self.retry_time * 2,
self.retry_time_cap)
else:
error_counter = 0
self.retry_time = self.retry_time_start
self.snooze_time = self.snooze_time_step
self.listener.on_connect()
self._read_loop(resp)
except (Timeout, ssl.SSLError) as exc:
# This is still necessary, as a SSLError can actually be
# thrown when using Requests
# If it's not time out treat it like any other exception
if isinstance(exc, ssl.SSLError):
if not (exc.args and 'timed out' in str(exc.args[0])):
exc_info = sys.exc_info()
break
if self.listener.on_timeout() is False:
break
if self.running is False:
break
sleep(self.snooze_time)
self.snooze_time = min(self.snooze_time + self.snooze_time_step,
self.snooze_time_cap)
except Exception as exc:
exc_info = sys.exc_info()
# any other exception is fatal, so kill loop
break
# cleanup
self.running = False
if resp:
resp.close()
self.new_session()
if exc_info:
# call a handler first so that the exception can be logged.
self.listener.on_exception(exc_info[1])
six.reraise(*exc_info)
def _data(self, data):
if self.listener.on_data(data) is False:
self.running = False
def _read_loop(self, resp):
charset = resp.headers.get('content-type', default='')
enc_search = re.search(r'charset=(?P<enc>\S*)', charset)
if enc_search is not None:
encoding = enc_search.group('enc')
else:
encoding = 'utf-8'
buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
while self.running and not resp.raw.closed:
length = 0
while not resp.raw.closed:
line = buf.read_line()
stripped_line = line.strip() if line else line # line is sometimes None so we need to check here
if not stripped_line:
self.listener.keep_alive() # keep-alive new lines are expected
elif stripped_line.isdigit():
length = int(stripped_line)
break
else:
raise TweepError('Expecting length, unexpected value found')
next_status_obj = buf.read_len(length)
if self.running and next_status_obj:
self._data(next_status_obj)
# # Note: keep-alive newlines might be inserted before each length value.
# # read until we get a digit...
# c = b'\n'
# for c in resp.iter_content(decode_unicode=True):
# if c == b'\n':
# continue
# break
#
# delimited_string = c
#
# # read rest of delimiter length..
# d = b''
# for d in resp.iter_content(decode_unicode=True):
# if d != b'\n':
# delimited_string += d
# continue
# break
#
# # read the next twitter status object
# if delimited_string.decode('utf-8').strip().isdigit():
# status_id = int(delimited_string)
# next_status_obj = resp.raw.read(status_id)
# if self.running:
# self._data(next_status_obj.decode('utf-8'))
if resp.raw.closed:
self.on_closed(resp)
def _start(self, is_async):
self.running = True
if is_async:
self._thread = Thread(target=self._run)
self._thread.start()
else:
self._run()
def on_closed(self, resp):
""" Called when the response has been closed by Twitter """
pass
def userstream(self,
stall_warnings=False,
_with=None,
replies=None,
track=None,
locations=None,
is_async=False,
encoding='utf8'):
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/user.json' % STREAM_VERSION
self.host = 'userstream.twitter.com'
if stall_warnings:
self.session.params['stall_warnings'] = stall_warnings
if _with:
self.session.params['with'] = _with
if replies:
self.session.params['replies'] = replies
if locations and len(locations) > 0:
if len(locations) % 4 != 0:
raise TweepError("Wrong number of locations points, "
"it has to be a multiple of 4")
self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
if track:
self.session.params['track'] = u','.join(track).encode(encoding)
self._start(is_async)
def firehose(self, count=None, is_async=False):
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
if count:
self.url += '&count=%s' % count
self._start(is_async)
def retweet(self, is_async=False):
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
self._start(is_async)
def sample(self, is_async=False, languages=None, stall_warnings=False):
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/sample.json' % STREAM_VERSION
if languages:
self.session.params['language'] = ','.join(map(str, languages))
if stall_warnings:
self.session.params['stall_warnings'] = 'true'
self._start(is_async)
def filter(self, follow=None, track=None, is_async=False, locations=None,
stall_warnings=False, languages=None, encoding='utf8', filter_level=None):
self.body = {}
self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/filter.json' % STREAM_VERSION
if follow:
self.body['follow'] = u','.join(follow).encode(encoding)
if track:
self.body['track'] = u','.join(track).encode(encoding)
if locations and len(locations) > 0:
if len(locations) % 4 != 0:
raise TweepError("Wrong number of locations points, "
"it has to be a multiple of 4")
self.body['locations'] = u','.join(['%.4f' % l for l in locations])
if stall_warnings:
self.body['stall_warnings'] = stall_warnings
if languages:
self.body['language'] = u','.join(map(str, languages))
if filter_level:
self.body['filter_level'] = filter_level.encode(encoding)
self.session.params = {'delimited': 'length'}
self.host = 'stream.twitter.com'
self._start(is_async)
def sitestream(self, follow, stall_warnings=False,
with_='user', replies=False, is_async=False):
self.body = {}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/site.json' % STREAM_VERSION
self.body['follow'] = u','.join(map(six.text_type, follow))
self.body['delimited'] = 'length'
if stall_warnings:
self.body['stall_warnings'] = stall_warnings
if with_:
self.body['with'] = with_
if replies:
self.body['replies'] = replies
self._start(is_async)
def disconnect(self):
if self.running is False:
return
self.running = False

View File

@ -1,50 +0,0 @@
# Tweepy
# Copyright 2010 Joshua Roesslein
# See LICENSE for details.
from __future__ import print_function
from datetime import datetime
import six
from email.utils import parsedate
def parse_datetime(string):
return datetime(*(parsedate(string)[:6]))
def parse_html_value(html):
return html[html.find('>')+1:html.rfind('<')]
def parse_a_href(atag):
start = atag.find('"') + 1
end = atag.find('"', start)
return atag[start:end]
def convert_to_utf8_str(arg):
# written by Michael Norton (http://docondev.blogspot.com/)
if isinstance(arg, six.text_type):
arg = arg.encode('utf-8')
elif not isinstance(arg, bytes):
arg = six.text_type(arg).encode('utf-8')
return arg
def import_simplejson():
try:
import simplejson as json
except ImportError:
import json
return json
def list_to_csv(item_list):
if item_list:
return ','.join([str(i) for i in item_list])

View File

@ -37,7 +37,6 @@ from . import NotifyEmail as NotifyEmailBase
# Required until re-factored into base code
from .NotifyPushjet import pushjet
from .NotifyGrowl import gntp
from .NotifyTwitter import tweepy
# NotifyBase object is passed in as a module not class
from . import NotifyBase
@ -66,9 +65,6 @@ __all__ = [
# pushjet (used for NotifyPushjet Testing)
'pushjet',
# tweepy (used for NotifyTwitter Testing)
'tweepy',
]
# we mirror our base purely for the ability to reset everything; this

View File

@ -7,7 +7,7 @@ license_file = LICENSE
[flake8]
# We exclude packages we don't maintain
exclude = .eggs,.tox,gntp,tweepy,pushjet
exclude = .eggs,.tox,gntp,pushjet
ignore = E722,W503,W504
statistics = true
builtins = _

View File

@ -2169,6 +2169,152 @@ TEST_URLS = (
'test_requests_exceptions': True,
}),
##################################
# NotifyTwitter
##################################
('twitter://', {
'instance': None,
}),
('twitter://:@/', {
'instance': TypeError,
}),
('twitter://consumer_key', {
# Missing Keys
'instance': TypeError,
}),
('twitter://consumer_key/consumer_secret/', {
# Missing Keys
'instance': TypeError,
}),
('twitter://consumer_key/consumer_secret/access_token/', {
# Missing Access Secret
'instance': TypeError,
}),
('twitter://consumer_key/consumer_secret/access_token/access_secret', {
# No user mean's we message ourselves
'instance': plugins.NotifyTwitter,
# Expected notify() response False (because we won't be able
# to detect our user)
'notify_response': False,
}),
('twitter://consumer_key/consumer_secret/access_token/access_secret'
'?cache=no', {
# No user mean's we message ourselves
'instance': plugins.NotifyTwitter,
# However we'll be okay if we return a proper response
'requests_response_text': {
'id': 12345,
'screen_name': 'test'
},
}),
('twitter://consumer_key/consumer_secret/access_token/access_secret', {
# No user mean's we message ourselves
'instance': plugins.NotifyTwitter,
# However we'll be okay if we return a proper response
'requests_response_text': {
'id': 12345,
'screen_name': 'test'
},
}),
# A duplicate of the entry above, this will cause cache to be referenced
('twitter://consumer_key/consumer_secret/access_token/access_secret', {
# No user mean's we message ourselves
'instance': plugins.NotifyTwitter,
# However we'll be okay if we return a proper response
'requests_response_text': {
'id': 12345,
'screen_name': 'test'
},
}),
# handle cases where the screen_name is missing from the response causing
# an exception during parsing
('twitter://consumer_key/consumer_secret2/access_token/access_secret', {
# No user mean's we message ourselves
'instance': plugins.NotifyTwitter,
# However we'll be okay if we return a proper response
'requests_response_text': {
'id': 12345,
},
# due to a mangled response_text we'll fail
'notify_response': False,
}),
('twitter://user@consumer_key/csecret2/access_token/access_secret/-/%/', {
# One Invalid User
'instance': plugins.NotifyTwitter,
# Expected notify() response False (because we won't be able
# to detect our user)
'notify_response': False,
}),
('twitter://user@consumer_key/csecret/access_token/access_secret'
'?cache=No', {
# No Cache
'instance': plugins.NotifyTwitter,
'requests_response_text': [{
'id': 12345,
'screen_name': 'user'
}],
}),
('twitter://user@consumer_key/csecret/access_token/access_secret', {
# We're good!
'instance': plugins.NotifyTwitter,
'requests_response_text': [{
'id': 12345,
'screen_name': 'user'
}],
}),
# A duplicate of the entry above, this will cause cache to be referenced
# for this reason, we don't even need to return a valid response
('twitter://user@consumer_key/csecret/access_token/access_secret', {
# We're identifying the same user we already sent to
'instance': plugins.NotifyTwitter,
}),
('twitter://ckey/csecret/access_token/access_secret?mode=tweet', {
# A Public Tweet
'instance': plugins.NotifyTwitter,
}),
('tweet://consumer_key/consumer_secret/access_token/access_secret', {
# tweet:// is to be depricated; but we will support for purposes of
# generating a warning to the user; the above matches an above
# twitter:// reference so that it can use what was cached
'instance': plugins.NotifyTwitter,
}),
('twitter://user@ckey/csecret/access_token/access_secret?mode=invalid', {
# An invalid mode
'instance': TypeError,
}),
('twitter://usera@consumer_key/consumer_secret/access_token/'
'access_secret/user/?to=userb', {
# We're good!
'instance': plugins.NotifyTwitter,
'requests_response_text': [{
'id': 12345,
'screen_name': 'usera'
}, {
'id': 12346,
'screen_name': 'userb'
}, {
# A garbage entry we can test exception handling on
'id': 123,
}],
}),
('twitter://ckey/csecret/access_token/access_secret', {
'instance': plugins.NotifyTwitter,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('twitter://ckey/csecret/access_token/access_secret', {
'instance': plugins.NotifyTwitter,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
('twitter://ckey/csecret/access_token/access_secret?mode=tweet', {
'instance': plugins.NotifyTwitter,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
##################################
# NotifyNexmo
##################################

View File

@ -25,181 +25,17 @@
import six
import mock
from random import choice
from string import ascii_uppercase as str_alpha
from string import digits as str_num
import requests
from json import dumps
from datetime import datetime
from apprise import plugins
from apprise import NotifyType
from apprise import Apprise
from apprise import OverflowMode
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
TEST_URLS = (
##################################
# NotifyTwitter
##################################
('tweet://', {
'instance': None,
}),
('tweet://:@/', {
'instance': None,
}),
('tweet://consumer_key', {
# Missing Keys
'instance': TypeError,
}),
('tweet://consumer_key/consumer_key/', {
# Missing Keys
'instance': TypeError,
}),
('tweet://consumer_key/consumer_key/access_token/', {
# Missing Access Secret
'instance': TypeError,
}),
('tweet://consumer_key/consumer_key/access_token/access_secret', {
# Missing User
'instance': TypeError,
}),
('tweet://user@consumer_key/consumer_key/access_token/access_secret', {
# We're good!
'instance': plugins.NotifyTwitter,
}),
('tweet://usera@consumer_key/consumer_key/access_token/'
'access_secret/?to=userb', {
# We're good!
'instance': plugins.NotifyTwitter,
}),
)
@mock.patch('apprise.plugins.tweepy.API')
@mock.patch('apprise.plugins.tweepy.OAuthHandler')
def test_plugin(mock_oauth, mock_api):
"""
API: NotifyTwitter Plugin() (pt1)
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Define how many characters exist per line
row = 80
# Some variables we use to control the data we work with
body_len = 1024
title_len = 1024
# Create a large body and title with random data
body = ''.join(choice(str_alpha + str_num + ' ') for _ in range(body_len))
body = '\r\n'.join([body[i: i + row] for i in range(0, len(body), row)])
# Create our title using random data
title = ''.join(choice(str_alpha + str_num) for _ in range(title_len))
# iterate over our dictionary and test it out
for (url, meta) in TEST_URLS:
# Our expected instance
instance = meta.get('instance', None)
# Our expected server objects
self = meta.get('self', None)
# Our expected Query response (True, False, or exception type)
response = meta.get('response', True)
# Allow notification type override, otherwise default to INFO
notify_type = meta.get('notify_type', NotifyType.INFO)
# Allow us to force the server response code to be something other then
# the defaults
response = meta.get(
'response', True if response else False)
try:
obj = Apprise.instantiate(url, suppress_exceptions=False)
if obj is None:
if instance is not None:
# We're done (assuming this is what we were expecting)
print("{} didn't instantiate itself "
"(we expected it to)".format(url))
assert False
continue
if instance is None:
# Expected None but didn't get it
print('%s instantiated %s (but expected None)' % (
url, str(obj)))
assert False
assert isinstance(obj, instance) is True
if isinstance(obj, plugins.NotifyBase):
# We loaded okay; now lets make sure we can reverse this url
assert isinstance(obj.url(), six.string_types) is True
# Instantiate the exact same object again using the URL from
# the one that was already created properly
obj_cmp = Apprise.instantiate(obj.url())
# Our object should be the same instance as what we had
# originally expected above.
if not isinstance(obj_cmp, plugins.NotifyBase):
# Assert messages are hard to trace back with the way
# these tests work. Just printing before throwing our
# assertion failure makes things easier to debug later on
print('TEST FAIL: {} regenerated as {}'.format(
url, obj.url()))
assert False
if self:
# Iterate over our expected entries inside of our object
for key, val in self.items():
# Test that our object has the desired key
assert hasattr(key, obj) is True
assert getattr(key, obj) == val
obj.request_rate_per_sec = 0
# check that we're as expected
assert obj.notify(
title='test', body='body',
notify_type=NotifyType.INFO) == response
# check that this doesn't change using different overflow
# methods
assert obj.notify(
body=body, title=title,
notify_type=notify_type,
overflow=OverflowMode.UPSTREAM) == response
assert obj.notify(
body=body, title=title,
notify_type=notify_type,
overflow=OverflowMode.TRUNCATE) == response
assert obj.notify(
body=body, title=title,
notify_type=notify_type,
overflow=OverflowMode.SPLIT) == response
except AssertionError:
# Don't mess with these entries
raise
except Exception as e:
# Handle our exception
assert(instance is not None)
assert(isinstance(e, instance))
@mock.patch('apprise.plugins.tweepy.API.send_direct_message')
@mock.patch('apprise.plugins.tweepy.OAuthHandler.set_access_token')
def test_twitter_plugin_init(set_access_token, send_direct_message):
def test_twitter_plugin_init():
"""
API: NotifyTwitter Plugin() (pt2)
@ -240,15 +76,16 @@ def test_twitter_plugin_init(set_access_token, send_direct_message):
try:
plugins.NotifyTwitter(
ckey='value', csecret='value', akey='value', asecret='value')
assert False
except TypeError:
# user not set
assert True
except TypeError:
# user not set; but this is okay
# We should not reach here
assert False
try:
obj = plugins.NotifyTwitter(
plugins.NotifyTwitter(
ckey='value', csecret='value', akey='value', asecret='value',
user='l2g')
user='l2gnux')
# We should initialize properly
assert True
@ -256,19 +93,171 @@ def test_twitter_plugin_init(set_access_token, send_direct_message):
# We should not reach here
assert False
set_access_token.side_effect = TypeError('Invalid')
assert obj.notify(
title='test', body='body',
notify_type=NotifyType.INFO) is False
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_notify_twitter_plugin_general(mock_post, mock_get):
"""
API: NotifyTwitter() General Tests
# Make it so we can pass authentication, but fail on message
# delivery
set_access_token.side_effect = None
set_access_token.return_value = True
send_direct_message.side_effect = plugins.tweepy.error.TweepError(
0, 'tweepy.error.TweepyError() not handled'),
"""
ckey = 'ckey'
csecret = 'csecret'
akey = 'akey'
asecret = 'asecret'
screen_name = 'apprise'
assert obj.notify(
title='test', body='body',
notify_type=NotifyType.INFO) is False
response_obj = [{
'screen_name': screen_name,
'id': 9876,
}]
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Epoch time:
epoch = datetime.utcfromtimestamp(0)
request = mock.Mock()
request.content = dumps(response_obj)
request.status_code = requests.codes.ok
request.headers = {
'x-rate-limit-reset': (datetime.utcnow() - epoch).total_seconds(),
'x-rate-limit-remaining': 1,
}
# Prepare Mock
mock_get.return_value = request
mock_post.return_value = request
# Variation Initializations
obj = plugins.NotifyTwitter(
ckey=ckey,
csecret=csecret,
akey=akey,
asecret=asecret,
targets=screen_name)
assert isinstance(obj, plugins.NotifyTwitter) is True
assert isinstance(obj.url(), six.string_types) is True
# apprise room was found
assert obj.send(body="test") is True
# Change our status code and try again
request.status_code = 403
assert obj.send(body="test") is False
assert obj.ratelimit_remaining == 1
# Return the status
request.status_code = requests.codes.ok
# Force a reset
request.headers['x-rate-limit-remaining'] = 0
# behind the scenes, it should cause us to update our rate limit
assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 0
# This should cause us to block
request.headers['x-rate-limit-remaining'] = 10
assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 10
# Handle cases where we simply couldn't get this field
del request.headers['x-rate-limit-remaining']
assert obj.send(body="test") is True
# It remains set to the last value
assert obj.ratelimit_remaining == 10
# Reset our variable back to 1
request.headers['x-rate-limit-remaining'] = 1
# Handle cases where our epoch time is wrong
del request.headers['x-rate-limit-reset']
assert obj.send(body="test") is True
# Return our object, but place it in the future forcing us to block
request.headers['x-rate-limit-reset'] = \
(datetime.utcnow() - epoch).total_seconds() + 1
request.headers['x-rate-limit-remaining'] = 0
obj.ratelimit_remaining = 0
assert obj.send(body="test") is True
# Return our object, but place it in the future forcing us to block
request.headers['x-rate-limit-reset'] = \
(datetime.utcnow() - epoch).total_seconds() - 1
request.headers['x-rate-limit-remaining'] = 0
obj.ratelimit_remaining = 0
assert obj.send(body="test") is True
# Return our limits to always work
request.headers['x-rate-limit-reset'] = \
(datetime.utcnow() - epoch).total_seconds()
request.headers['x-rate-limit-remaining'] = 1
obj.ratelimit_remaining = 1
# Alter pending targets
obj.targets.append('usera')
request.content = dumps(response_obj)
response_obj = [{
'screen_name': 'usera',
'id': 1234,
}]
assert obj.send(body="test") is True
# Flush our cache forcing it's re-creating
del plugins.NotifyTwitter._user_cache
assert obj.send(body="test") is True
# Cause content response to be None
request.content = None
assert obj.send(body="test") is True
# Invalid JSON
request.content = '{'
assert obj.send(body="test") is True
# Return it to a parseable string
request.content = '{}'
results = plugins.NotifyTwitter.parse_url(
'twitter://{}/{}/{}/{}?to={}'.format(
ckey, csecret, akey, asecret, screen_name))
assert isinstance(results, dict) is True
assert screen_name in results['targets']
# cause a json parsing issue now
response_obj = None
assert obj.send(body="test") is True
response_obj = '{'
assert obj.send(body="test") is True
# Set ourselves up to handle whoami calls
# Flush out our cache
del plugins.NotifyTwitter._user_cache
response_obj = {
'screen_name': screen_name,
'id': 9876,
}
request.content = dumps(response_obj)
obj = plugins.NotifyTwitter(
ckey=ckey,
csecret=csecret,
akey=akey,
asecret=asecret)
assert obj.send(body="test") is True
# Alter the key forcing us to look up a new value of ourselves again
del plugins.NotifyTwitter._user_cache
del plugins.NotifyTwitter._whoami_cache
obj.ckey = 'different.then.it.was'
assert obj.send(body="test") is True
del plugins.NotifyTwitter._whoami_cache
obj.ckey = 'different.again'
assert obj.send(body="test") is True