Skip to content

Instantly share code, notes, and snippets.

Last active July 15, 2018 05:41
Show Gist options
  • Save bdeshi/213433700a9a2485d259ab902a222d18 to your computer and use it in GitHub Desktop.
Save bdeshi/213433700a9a2485d259ab902a222d18 to your computer and use it in GitHub Desktop.
twitter api test helper[WIP]
"""basic twitter api"""
from time import time
from math import floor, ceil
from os import urandom
from base64 import standard_b64encode as base64encode
import hashlib
import hmac
from urllib.parse import quote
import httplib2
def urlquote(str):
return quote(str, '')
class Twi(object):
api = ''
def __init__(self, ckey, cskey, akey=None, askey=None,
oauth_ver='1.0', oauth_method='HMAC-SHA1', api_ver='1.1'):
"""initialize, duh"""
self.ckey = ckey
self.cskey = cskey
self.akey = akey
self.askey = askey
self.oauth_method = oauth_method.upper()
self.oauth_ver = str(oauth_ver)
self.api_ver = str(api_ver)
def auth(self, endpoint_url, method, params):
"""generate authorization header"""
def hmac_hash(key, message):
key = bytes(key, 'UTF-8')
message = bytes(message, 'UTF-8')
hash_scheme = hashlib.sha1
hmac_hash =, message, hash_scheme)
signature = bytes(hmac_hash.digest())
return signature
def gen_sign(params):
# parameters must be sorted as urlencoded keys
param_keys = [urlquote(k) for k in params]
param_keys = sorted(param_keys)
param_str = ''
for key in param_keys:
if params[key] is not None:
param_str += '%s=%s&' % (key, urlquote(params[key]))
print('param_str:\n' + param_str)
param_str = urlquote(param_str.rstrip('&'))
print('param_str_enc:\n' + param_str)
base_str = '&'.join([
print('base_str:\n' + base_str)
signing_key = urlquote(self.cskey) + '&' + urlquote(self.askey)
signature = hmac_hash(signing_key, base_str)
b64_sign = base64encode(signature).decode()
print('b64_sign:\n' + b64_sign)
return b64_sign
# these params passed to signer function
# as well as included in auth header
auth_params = {
'oauth_consumer_key': self.ckey,
'oauth_nonce': 'kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg', # urandom(32).hex(),
'oauth_signature': None,
'oauth_signature_method': self.oauth_method,
# 'oauth_timestamp': str(floor(time())),
'oauth_timestamp': '1318622958',
'oauth_token': self.akey,
'oauth_version': self.oauth_ver
oauth_signature = gen_sign({**params, **auth_params})
auth_params['oauth_signature'] = oauth_signature
auth_bundle = "".join(
['OAuth '] +
['%s="%s",' % (key, urlquote(auth_params[key])) for key in auth_params]
auth_bundle = auth_bundle.rstrip(',')
print('auth_bundle:\n' + auth_bundle)
auth_header = { "Authorization": auth_bundle }
return auth_header
def get_oauth(self):
"""retrieve oauth token+secret for given cosumer key+secret"""
raise NotImplementedError()
def request(self, endpoint, method, **params):
"""send authorized request to endpoint and return response"""
http = httplib2.Http()
if not endpoint.endswith('.json'):
endpoint = endpoint+'.json'
endpoint_url = '/'.join([self.api, self.api_ver, endpoint])
auth = self.auth(endpoint_url, method, params)
(status, response) = http.request(endpoint_url, method, body=params, headers=auth)
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment