Skip to content

Instantly share code, notes, and snippets.

@yoshiori
Created November 4, 2010 07:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save yoshiori/662214 to your computer and use it in GitHub Desktop.
Save yoshiori/662214 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time, random, urllib, urllib2, cgi, hmac, hashlib, commands, simplejson
from pit import Pit
class MyTwitter(object):
request_token_url = 'http://twitter.com/oauth/request_token'
access_token_url = 'http://twitter.com/oauth/access_token'
_URL_BASE = 'http://api.twitter.com/'
def __init__(self):
twitter_config = Pit.get('twitter.com',
{'require' :
{'consumer_key':'Your twitter Consumer key',
'consumer_secret':'Your twitter consumer_secret'}})
self.consumer_key = twitter_config['consumer_key']
self.consumer_secret = twitter_config['consumer_secret']
self.oauth_token = twitter_config.get('oauth_token')
self.oauth_token_secret = twitter_config.get('oauth_token_secret')
if not self.oauth_token or not self.oauth_token_secret:
self.get_oauth_token()
def get_init_params(self):
params = {
"oauth_consumer_key": self.consumer_key,
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": str(int(time.time())),
"oauth_nonce": str(random.getrandbits(64)),
"oauth_version": "1.0"
}
if self.oauth_token:
params['oauth_token'] = self.oauth_token
return params
def encode(self, text):
return urllib.quote(str(text), '~')
def _create_param_str(self,params,sep='&'):
for k,v in params.items():
if isinstance(v, unicode):
params[k] = v.encode('utf8')
return sep.join(['%s=%s' % (self.encode(key),self.encode(params[key]))
for key in sorted(params)])
def _get_signature(self, method, url, init_params, user_params = None):
_params = {}
_params.update(init_params)
if user_params:
_params.update(user_params)
params_str = self._create_param_str(_params)
message = "&".join([method, self.encode(url), self.encode(params_str)])
key = "%s&%s" % (self.consumer_secret, self.oauth_token_secret if self.oauth_token_secret else '')
signature = hmac.new(key, message, hashlib.sha1)
return signature.digest().encode("base64").strip()
def get_oauth_token(self):
params = self.get_init_params()
params['oauth_signature'] = self._get_signature('GET', self.request_token_url, params)
opener = urllib2.build_opener()
_url = self.request_token_url + '?' + urllib.urlencode(params)
result = opener.open(_url).read()
data = cgi.parse_qs(result)
_url = 'http://twitter.com/oauth/authorize?oauth_token='+data['oauth_token'][0]
print 'open %s' % _url
commands.getoutput('%s %s' % ('xdg-open',_url))
self.oauth_token = data['oauth_token'][0]
self.oauth_token_secret = data['oauth_token_secret'][0]
params = self.get_init_params()
params['oauth_verifier'] = raw_input('PID > ')
params['oauth_signature'] = self._get_signature('GET', self.access_token_url,params)
opener = urllib2.build_opener()
opener.addheaders = [('Authorization','OAuth')]
_url = self.access_token_url + '?' + urllib.urlencode(params)
result = opener.open(_url).read()
data = cgi.parse_qs(result)
self.oauth_token = data['oauth_token'][0]
self.oauth_token_secret = data['oauth_token_secret'][0]
twitter_config = Pit.get('twitter.com')
twitter_config['oauth_token'] = self.oauth_token
twitter_config['oauth_token_secret'] = self.oauth_token_secret
Pit.set('twitter.com',{'data':twitter_config})
def get_auth_opener(self, methon, url, params=None):
base_params = self.get_init_params()
base_params['oauth_signature'] = self._get_signature(methon, url, base_params, params)
opener = urllib2.build_opener()
opener.addheaders = [('Authorization','OAuth %s'% self._create_param_str(base_params,','))]
return opener
def auth_get(self, url, params=None):
_url = url
if params:
_url += '?' + urllib.urlencode(params)
result = self.get_auth_opener('GET', url,params).open(_url)
return result
def auth_post(self, url, params={}):
result = self.get_auth_opener('POST', url,params).open(url, urllib.urlencode(params))
return result
def get_friend_timeline(self):
url = self._URL_BASE + 'statuses/friends_timeline.json'
r = self.auth_get(url)
data = simplejson.loads(r.read())
return data
def get_friends(self,cursor = -1):
params = {'cursor':cursor}
url = self._URL_BASE + 'statuses/friends.json'
r = self.auth_get(url,params)
data = simplejson.loads(r.read())
return data
def destroy_friend(self,id):
url = self._URL_BASE + ('friendships/destroy/%s.json' % id)
r = self.auth_post(url)
data = simplejson.loads(r.read())
return data
def create_friend(self,id):
url = self._URL_BASE + ('friendships/create/%s.json' % id)
r = self.auth_post(url,)
data = simplejson.loads(r.read())
return data
def show(self,screen_name):
params = {'screen_name':screen_name}
url = self._URL_BASE + 'users/show.json'
r = self.auth_get(url,params)
data = simplejson.loads(r.read())
return data
def get_list(self,screen_name ,cursor = -1):
params = {'cursor':cursor}
url = self._URL_BASE + '1/%s/lists.json' % screen_name
r = self.auth_get(url,params)
data = simplejson.loads(r.read())
return data
def get_list_menbers(self, user, list_name,cursor = -1):
url = self._URL_BASE + '1/%s/%s/members.json' % (user,list_name)
r = self.auth_get(url,{'cursor':cursor})
data = simplejson.loads(r.read())
return data
def get_list_menbers_all(self, user, list_name):
cursor = -1
users = []
while cursor:
data = self.get_list_menbers(user,list_name, cursor)
cursor = data['next_cursor']
users += data['users']
return users
def get_list_all(self,screen_name):
cursor = -1
lists = []
while cursor:
data = self.get_list(screen_name, cursor)
cursor = data['next_cursor']
lists += data['lists']
return lists
def get_friends_ids(self,screen_name):
param = {'screen_name':screen_name}
url = self._URL_BASE + 'friends/ids.json'
r = self.auth_get(url,param)
data = simplejson.loads(r.read())
return data
def update(self,status):
if isinstance(status, unicode):
status = status.encode('utf8')
param = {'status':status}
url = self._URL_BASE + 'statuses/update.json'
r = self.auth_post(url,param)
data = simplejson.loads(r.read())
return data
def get_replies(self):
url = self._URL_BASE + 'statuses/replies.json'
r = self.auth_get(url)
data = simplejson.loads(r.read())
return data
if __name__ == "__main__":
import readline
readline.parse_and_bind("tab: complete")
twitter = MyTwitter()
friends = set()
def complete(text, status):
results = [x for x in friends if x.startswith(text)] + [None]
return results[status]
readline.set_completer(complete)
while True:
input = raw_input('> ')
input = unicode(input,'utf-8')
if input:
twitter.update(input)
print 'update : ' + input
else:
for data in reversed(twitter.get_replies()):
print '%-12s : %s' % (data['user']['screen_name'] ,data['text'])
friends.add(data['user']['screen_name'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment