Skip to content

Instantly share code, notes, and snippets.

@from1to9
Last active August 31, 2016 12:59
Show Gist options
  • Save from1to9/3023641 to your computer and use it in GitHub Desktop.
Save from1to9/3023641 to your computer and use it in GitHub Desktop.
Fanfou API for python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# code by from1to9
# from1to9@gmail.com
# update 31/08/2016
# pass a local image addr to upload photo, like '/home/user/test.jpg'
import oauth2 as oauth
import re, time
import itertools, mimetools, mimetypes
import urllib, urllib2
import binascii, hashlib, hmac
class MultiPartForm(object):
def __init__(self):
self.form_fields = []
self.files = []
self.boundary = mimetools.choose_boundary()
return
def get_content_type(self):
return 'multipart/form-data; boundary=%s' % self.boundary
def add_field(self, name, value):
self.form_fields.append((name, value))
return
def add_file(self, fieldname, filename, fileHandle, mimetype=None):
#body = fileHandle.read()
if mimetype is None:
mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
self.files.append((fieldname, filename, mimetype, fileHandle))
return
def __str__(self):
parts = []
part_boundary = '--' + self.boundary
parts.extend(
[ part_boundary,
'Content-Disposition: form-data; name="%s"' % name,
'',
value,
]
for name, value in self.form_fields
)
parts.extend(
[ part_boundary,
'Content-Disposition: file; name="%s"; filename="%s"' % \
(field_name, filename),
'Content-Type: %s' % content_type,
'',
body,
]
for field_name, filename, content_type, body in self.files
)
flattened = list(itertools.chain(*parts))
flattened.append('--' + self.boundary + '--')
flattened.append('')
return '\r\n'.join(flattened)
class Api:
api_base = "api.fanfou.com"
#extention="xml" #json, xml, rss
extension = 'json'
api = {
"public_timeline": {"url": "statuses/public_timeline", "method": "GET"},
"friends_timeline": {"url": "statuses/friends_timeline", "method": "GET"},
"replies": {"url": "statuses/replies", "method": "GET"},
"mentions": {"url": "statuses/mentions", "method": "GET"},
"show": {"url": "statuses/show", "method": "GET"},
"user_timeline": {"url": "statuses/user_timeline", "method": "GET"},
"update": {"url": "statuses/update", "method": "POST"},
"destroy": {"url": "statuses/destroy", "method": "POST"},
"statuses_friends": {"url": "statuses/friends", "method": "POST"},
"statuses_followers": {"url": "statuses/followers", "method": "POST"},
#photo
"photo_upload": {"url": "photos/upload", "method": "POST"},
#private msg
"direct_messages_sent": {"url": "direct_messages/sent", "method": "GET"},
"direct_messages_inbox": {"url": "direct_messages/inbox", "method": "GET"},
"direct_messages_new": {"url": "direct_messages/new", "method": "POST"},
"direct_messages_destroy": {"url": "direct_messages/destroy", "method": "POST"},
#friends
"friendships_create": {"url": "friendships/create", "method": "POST"},
"friendships_destroy": {"url": "friendships/destroy", "method": "POST"},
"friendships_exists": {"url": "friendships/exists", "method": "GET"},
"friends_ids": {"url": "friends/ids", "method": "GET"},
"followers_ids": {"url": "followers/ids", "method": "GET"},
#user
"users_show": {"url": "users/show", "method": "GET"},
"users_followers": {"url": "users/followers", "method": "GET"},
"users_friends": {"url": "users/friends", "method": "GET"},
#notification
"notifications_follow": {"url": "notifications/follow", "method": "POST"},
"notifications_leave": {"url": "notifications/leave", "method": "POST"},
}
def __init__(self, api_type):
self.api_type = api_type
def url(self):
if self.api_type in self.api.keys():
api = self.api[self.api_type]
url = "http://" + self.api_base + "/" + api["url"] + "." + self.extension
return url
else:
return ""
def method(self):
return self.api[self.api_type]["method"]
class fanfou:
def __init__( self, consumer_key, consumer_secret, oauth_token, oauth_token_secret ):
self.__token = oauth.Token(key = oauth_token, secret = oauth_token_secret)
self.__consumer = oauth.Consumer(key = consumer_key, secret = consumer_secret)
self.__client = oauth.Client(self.__consumer, self.__token)
def _execute( self, params = {}, method = "GET", url = "" ):
try:
if method == "POST":
req = oauth.Request(method = method, url = url, parameters = params, is_form_encoded = True)
resp, content = self.__client.request(uri = url, method = "POST", body = req.to_postdata())
else:
req = oauth.Request(method = method, url = url, parameters = params)
resp, content = self.__client.request(uri = req.to_url(), method = "POST")
rev = {"resp": resp, "content": content}
return rev
except:
pass
def get_sign(self, params = "", url = "", http_method = "", consumer_secret = "", oauth_token_secret = ""):
q = lambda x: urllib.quote(x, safe="~")
k = params.keys()
normalized_params = ""
for key in sorted(k):
normalized_params = "&".join((normalized_params, urllib.urlencode({key: params[key]})))
normalized_params = normalized_params[1:]
base_string = "&".join((http_method, q(url), q(normalized_params)))
sig = hmac.new("&".join([consumer_secret, oauth_token_secret]), base_string, hashlib.sha1)
return binascii.b2a_base64(sig.digest())[:-1]
def get_auth_header(self, params):
rev = "OAuth "
for key in params:
rev = rev + key + '="' + urllib.quote(params[key]) + '",'
return rev[:-1]
def call( self , apitype, params ):
api = Api(apitype)
rev = self._execute( params, api.method(), api.url() )
return rev
def update( self, message, source = ""):
params = {
"status": message,
"source": source,
#"in_reply_to_status_id": "",
#"in_reply_to_user_id": "",
#"repost_status_id": "",
#"location": "",
}
rev = self.call("update", params)
return rev
def reply_to( self, message, in_reply_to_status_id, in_reply_to_user_id, source = "" ):
params = {
"status": message,
"source": source,
"in_reply_to_status_id": in_reply_to_status_id,
"in_reply_to_user_id": in_reply_to_user_id,
}
rev = self.call("update", params)
return rev
def repost( self, message, repost_status_id, source = "" ):
params = {
"status": message,
"source": source,
"repost_status_id": repost_status_id,
}
rev = self.call("update", params)
return rev
def photo(self, photo, status = "", source = ""):
form = MultiPartForm()
params = {
"oauth_version": "1.0",
"oauth_timestamp": str(oauth.generate_timestamp()),
"oauth_nonce": oauth.generate_nonce(),
"oauth_consumer_key": self.__consumer.key,
"oauth_token": self.__token.key,
"oauth_signature_method": "HMAC-SHA1",
}
sign = self.get_sign(params, Api("photo_upload").url(), "POST", self.__consumer.secret, self.__token.secret)
params["oauth_signature"] = sign
header = {
"content-type": "multipart/form-data; boundary=" + form.boundary,
"Authorization": self.get_auth_header(params)
}
#change here if you want to use photo on web, pass binary data to form.add_file
form.add_field("status", status)
f = open(photo, "rb")
fd = f.read()
form.add_file("photo", photo, fd)
f.close()
post = str(form)
try:
req = urllib2.Request(Api("photo_upload").url(), post, header)
response = urllib2.urlopen(req)
resp = response.getcode()
content = response.read()
rev = {"resp": resp, "content": content}
except:
rev = {"resp": "", "content": ""}
return rev
def replies(self, since_id = "", mode = "lite", format = ""):
params = {
"since_id": since_id,
#"count": count,
#"page": page,
"mode": mode,
"format": format,
}
rev = self.call("replies", params)
return rev
def mentions(self, since_id = "", mode = "lite", format = ""):
params = {
"since_id": since_id,
#"count": count,
#"page": page,
"mode": mode,
"format": format,
}
rev = self.call("mentions", params)
return rev
def direct_messages_inbox(self, since_id = "", mode = "lite"):
params = {
"since_id": since_id,
#"count": count,
#"page": page,
"mode": mode,
}
rev = self.call("direct_messages_inbox", params)
return rev
def direct_messages_new(self, user, text, in_reply_to_id = ""):
params = {
"user": user,
"text": text[:140],
"in_reply_to_id": in_reply_to_id,
"mode": "lite",
"format": format,
}
rev = self.call("direct_messages_new", params)
return rev
def friends_ids(self, id = "", page = "1", count = "60"):
params = {
"id": id,
"count": count,
"page": page,
#"mode": mode,
}
rev = self.call("friends_ids", params)
return rev
def followers_ids(self, id = "", page = "1", count = "60"):
params = {
"id": id,
"count": count,
"page": page,
#"mode": mode,
}
rev = self.call("followers_ids", params)
return rev
def users_show(self, id = ""):
params = {
"id": id,
"mode": "lite",
#"mode": mode,
}
rev = self.call("users_show", params)
return rev
if __name__ == "__main__":
consumer_key = 'XXXX' # api key
consumer_secret = 'XXXX' # api secret
oauth_token = 'XXX'
oauth_token_secret = 'XXX'
fanfou = fanfou(consumer_key, consumer_secret, oauth_token, oauth_token_secret)
msg = fanfou.update("test pyfanfou")
print msg["content"].decode("utf-8")
msg = fanfou.photo("test.jpg", "photo info")
print msg["content"].decode("utf-8")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import oauth2 as oauth
import re, time
import urlparse
consumer_key = 'xxxx' # api key
consumer_secret = 'xxxx' # api secret
access_token_url = 'http://fanfou.com/oauth/access_token'
authorize_url = 'http://fanfou.com/oauth/authorize'
request_token_url = 'http://fanfou.com/oauth/request_token'
import urlparse
import oauth2 as oauth
consumer = oauth.Consumer(consumer_key, consumer_secret)
client = oauth.Client(consumer)
resp, content = client.request(request_token_url, "GET")
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
request_token = dict(urlparse.parse_qsl(content))
print "Request Token:"
print " - oauth_token = %s" % request_token['oauth_token']
print " - oauth_token_secret = %s" % request_token['oauth_token_secret']
print
print "Go to the following link in your browser:"
print "%s?oauth_token=%s" % (authorize_url, request_token['oauth_token'])
print
oauth_verifier = raw_input('What is the PIN? ')
token = oauth.Token(request_token['oauth_token'],
request_token['oauth_token_secret'])
token.set_verifier(oauth_verifier)
client = oauth.Client(consumer, token)
resp, content = client.request(access_token_url, "POST")
access_token = dict(urlparse.parse_qsl(content))
print "Access Token:"
print " - oauth_token = %s" % access_token['oauth_token']
print " - oauth_token_secret = %s" % access_token['oauth_token_secret']
print
print "You may now access protected resources using the access tokens above."
print
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment