Skip to content

Instantly share code, notes, and snippets.

@cnicodeme
Created March 11, 2020 15:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cnicodeme/c3311ac6ec3b80f74e9e17a250b4d6db to your computer and use it in GitHub Desktop.
Save cnicodeme/c3311ac6ec3b80f74e9e17a250b4d6db to your computer and use it in GitHub Desktop.
Custom implementation of the Twitter OAuth API.
# -*- coding:utf-8 -*-
from urllib.parse import parse_qs, quote
import oauth2, time, requests, json
"""
Usage:
# To setup the access:
# GET /twitter/authorize
```
twitter = Twitter('CONSUMER_KEY', 'CONSUMER_SECRET')
tokens = twitter.get_temporary_token()
session['twitter_oauth'] = tokens
redirect(twitter.get_authorize_url(tokens['token'])
```
# Response from Twitter
# GET /twitter/validate?oauth_token={twitter_oauth_token}&oauth_verifier={twitter_oauth_verifier}
```
twitter = Twitter('CONSUMER_KEY', 'CONSUMER_SECRET')
# We take Flask's "request.args" to get args from the URL
assert request.args.get('oauth_token') == session['twitter_oauth']['token']
oauth = twitter.verify(session['twitter_oauth'], request.args.get('oauth_verifier'))
# oauth will contain the real oauth_token and oauth_token_secret along with the user_id and screen_name
print(oauth['screen_name'])
# Save `oauth` details in the database
```
# GET /twitter/details
```
twitter = Twitter.init(oauth['oauth_token'], oauth['oauth_token_secret'], 'CONSUMER_KEY', 'CONSUMER_SECRET')
return jsonify(twitter.get_user(oauth['screen_name']))
```
"""
TWITTER_REQUEST_TOKEN_URL = 'https://api.twitter.com/oauth/request_token'
TWITTER_AUTHORIZE_URL = 'https://api.twitter.com/oauth/authorize'
TWITTER_ACCESS_TOKEN_URL = 'https://api.twitter.com/oauth/access_token'
TWITTER_BASE_ENDPOINT = "https://api.twitter.com/1.1/"
class Twitter:
def __init__(self, consumer_key, consumer_secret):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
if self.consumer_key is None:
raise AttributeError('Missing consumer key')
if self.consumer_secret is None:
raise AttributeError('Missing consumer key')
self.consumer = oauth2.Consumer(self.consumer_key, self.consumer_secret)
def get_temporary_token(self, redirect=None):
client = oauth2.Client(self.consumer)
url = TWITTER_REQUEST_TOKEN_URL
if redirect:
url += '?oauth_callback={0}'.format(quote(redirect))
response, content = client.request(url, 'GET')
assert response['status'] == '200'
response = parse_qs(content)
assert response.get(b'oauth_callback_confirmed', None)[0] == b'true'
return {
'token': response.get(b'oauth_token')[0].decode('utf-8'),
'token_secret': response.get(b'oauth_token_secret')[0].decode('utf-8'),
}
def get_authorize_url(self, oauth_token):
return '{0}?oauth_token={1}'.format(TWITTER_AUTHORIZE_URL, oauth_token)
def verify(self, oauth_tokens, oauth_verifier):
token = oauth2.Token(oauth_tokens['token'], oauth_tokens['token_secret'])
token.set_verifier(oauth_verifier)
client = oauth2.Client(self.consumer, token)
response, content = client.request(TWITTER_ACCESS_TOKEN_URL, 'POST')
assert response['status'] == '200'
response = parse_qs(content)
result = {
'oauth_token': response.get(b'oauth_token')[0].decode('utf-8'),
'oauth_token_secret': response.get(b'oauth_token_secret')[0].decode('utf-8'),
'user_id': response.get(b'user_id')[0].decode('utf-8'),
'screen_name': response.get(b'screen_name')[0].decode('utf-8') # Without @ prefix
}
self.set_access_token(result['oauth_token'], result['oauth_token_secret'])
return result
def set_access_token(self, oauth_token, oauth_token_secret):
self.token = oauth2.Token(oauth_token, oauth_token_secret)
def _request(self, method, path, params=None):
if self.token is None:
raise ValueError('Missing token instance')
if method not in ('GET', 'POST'):
raise ValueError('Only GET/POST method accepted.')
url = '{0}{1}.json'.format(TWITTER_BASE_ENDPOINT, path)
if method == 'GET':
params = params or {}
params.update({
'oauth_version': "1.0",
'oauth_nonce': oauth2.generate_nonce(),
'oauth_timestamp': str(int(time.time())),
})
request = oauth2.Request(method, url, params)
request.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, self.token)
response = requests.get(request.to_url())
else:
request_params = {
'oauth_version': "1.0",
'oauth_nonce': oauth2.generate_nonce(),
'oauth_timestamp': str(int(time.time())),
}
request = oauth2.Request(method, url, request_params, json.dumps(params).encode('utf-8'))
request.sign_request(oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, self.token)
response = requests.post(url, json=params, headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'authorization': request.to_header()['Authorization']
})
response.raise_for_status()
return response.json()
def get(self, path, params=None):
return self._request('GET', path, params)
def post(self, path, params=None):
return self._request('POST', path, params)
def get_user(self, screen_name, include_entities=False):
return self.get('users/show', {'screen_name': screen_name, 'include_entities': include_entities})
def send_message(self, recipient, message):
"""
Sends a direct message to a specific user ID (not screen name)
@rate-limit: 1000 per user; 15000 per app | Per 24 hour window
"""
return self.post('direct_messages/events/new', {
"event": {
"type": "message_create",
"message_create": {
"target": {"recipient_id": recipient},
"message_data": {"text": message}
}
}
})
def list_messages(self, cursor=None, count=50):
"""
count: 50 is max.
"""
params = {
'count': str(count)
}
if cursor:
params['cursor'] = str(cursor)
return self.get('direct_messages/events/list', params)
@classmethod
def init(cls, consumer_key, consumer_secret, token, secret):
tw = Twitter(consumer_key, consumer_secret)
tw.set_access_token(token, secret)
return tw
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment