Create a gist now

Instantly share code, notes, and snippets.

@xiongjia /+READEM.md
Last active Sep 14, 2017

What would you like to do?
Fanfou API test #devnotes

Fanfou API 測試

用“飯否“的都是能識得漢語,於是就用漢語寫 README 了。

Files

總共有 2 個文件:

  1. fanfou.py - 裏面有 3 個 Objects: Logger, FanfouOAuth, Fanfou。具體參考 Objects 一節。
  2. fanfou.sample.cfg - 這是一個 sample。可以根據這個 sample 生成具體的 fanfou.cfg。 配置內容是 consumer key&secert,以及 auth_cache 文件名。

Objects

總共分了 3 個 Objects:

  1. Logger
    用於輸出 Log。默認把 Log Level 設置爲了 "debug" ,輸出爲 stdout 。
    如需調整: 在 fanfou.py 裡找, 下面的片段並修改。 (TODO: 以後會從 cfg 文件或 process argv 中讀取 )
    LOGGER = Log({ "Level": "debug", "console": True })
    記錄一條 Log 可以使用 (Log.error, Log.warn, Log.info; Log.debug )
  2. FanfouOAuth
    用於 OAuth 認證。主要 Interfaces:
    • get_new_acc_token() 得到一個新得 access token。 這個函數會自動用系統默認瀏覽器打開認證頁面,并要求用戶輸入授權後得 PIN Code。
    • get_cached_acc_token() 從本地存儲得 auth_cach 文件中讀出 access token
    • update_auth_cache() 存儲/更新本地 cache 得 access token
  3. Fanfou
    這個 Object 的目的是對 Fanfou 的 API 進行封裝,初步的加工,分析查詢結果。 (目前支持 3 個 API)

main() function

在 main 函數裡有一分簡單的調用。會讀取 HOME 目錄下的 .fanfou.cfg 根據對應的配置 取得 oauth access token 并顯示 3 條 time line messages。接著顯示 3 條 mentions。 (另一 Status post 測試默認被 disable ,以免誤發送更新消息)

fanfou.cfg
#!/usr/bin/env python
import os, sys, logging, time, ConfigParser, json
import hmac, binascii, hashlib, random
import urllib, urllib2, urlparse, webbrowser
class Log(object):
_LEVELS = {
"error": logging.ERROR,
"debug": logging.DEBUG,
"info": logging.INFO
}
def __init__(self, opts):
self._logger = logging.getLogger()
self.set_opts(opts)
def set_opts(self, opts):
if opts.get("console", False):
# enable console logger
out_hdlr = logging.StreamHandler(sys.stdout)
self._logger.addHandler(out_hdlr)
# update log level
self._logger.setLevel(self._get_log_level(opts.get("Level", "info")))
def get_logger(self):
return self._logger
def _get_log_level(self, log_level):
return self._LEVELS.get(log_level, logging.INFO)
# init logger
LOGGER = Log({ "Level": "debug", "console": True })
LOG = LOGGER.get_logger()
def resolve_usr_filename(filename):
full_filename = filename
if os.path.isabs(full_filename) == False:
full_filename = os.path.join(os.path.expanduser("~"),
full_filename)
return os.path.normpath(full_filename)
def chk_keys(keys, src_list):
for key in keys:
if key not in src_list:
return False
return True
def load_fanfou_oauth_config(conf_filename):
full_filename = resolve_usr_filename(conf_filename)
LOG.debug("Load oauth config from %s", full_filename)
try:
cfg = ConfigParser.RawConfigParser()
cfg.read(full_filename)
return {
"consumer_key": cfg.get("fanfou", "consumer_key"),
"consumer_secret": cfg.get("fanfou", "consumer_secret"),
"auth_cache": cfg.get("fanfou", "auth_cache"),
}
except Exception, err:
LOG.warn("Cannot load oauth config from %s; err %s",
full_filename, err)
raise err
def mbstrlen(src):
try:
return len(src.decode("utf8", errors = "replace"))
except Exception, err:
LOG.error("String convert issue %s", err)
return len(src)
class FanfouOAuth(object):
def __init__(self, cfg):
# Fanfou authorize URLs
self.urls = {
"unauth_req_token": "http://fanfou.com/oauth/request_token",
"authorize": "http://fanfou.com/oauth/authorize",
"acc_token": "http://fanfou.com/oauth/access_token",
}
# OAuth config
self.oauth_config = {
"consumer_key": cfg.get("consumer_key", ""),
"consumer_secret": cfg.get("consumer_secret", ""),
}
# auth cach filename
self.auth_cache = cfg.get("auth_cache")
@staticmethod
def _escape(src_str):
return urllib.quote(src_str.encode('utf-8'), safe='~')
@staticmethod
def generate_timestamp():
return FanfouOAuth._escape(str(int(time.time())))
@staticmethod
def generate_nonce(length = 8):
random_data = range(length)
for i in range(length):
random_data[i] = str(random.randint(0, 9))
return FanfouOAuth._escape("".join(random_data))
@staticmethod
def get_full_cache_filename(filename):
return resolve_usr_filename(filename)
@classmethod
def open_url(cls, url):
LOG.debug("open %s", url)
webbrowser.open_new_tab(url)
@classmethod
def get_input(cls, prompt):
return raw_input(prompt)
@staticmethod
def get_sig_key(auth_keys):
if len(auth_keys) == 1:
key = ("%s&" % "".join(auth_keys))
else:
key = "&".join(auth_keys)
return key
@staticmethod
def oauth_sig_hash(base_str, auth_keys):
key = FanfouOAuth.get_sig_key(auth_keys)
sig_hash = hmac.new(key, base_str, hashlib.sha1)
return binascii.b2a_base64(sig_hash.digest())[:-1]
@staticmethod
def get_oauth_sig_item(base_str, auth_keys):
sig_hash = FanfouOAuth.oauth_sig_hash(base_str, auth_keys)
return ("oauth_signature", sig_hash)
@staticmethod
def get_normalized_urlstr(data):
return urllib.urlencode(data).replace("+", "%20").replace("%7E", "~")
@staticmethod
def send_req(req_opts):
if req_opts["method"] == "GET":
LOG.debug("send GET req: %s", req_opts["uri"])
req = urllib2.Request(req_opts["uri"])
elif req_opts["method"] == "POST":
LOG.debug("send POST req: %s", req_opts["uri"])
req = urllib2.Request(req_opts["uri"],
data = req_opts["data"],
headers = req_opts["header"])
else:
LOG.error("Invalid method %s", req_opts["method"])
raise ValueError("Invalid HTTP method")
try:
rep = urllib2.urlopen(req)
data = rep.read()
except urllib2.HTTPError, http_err:
LOG.error("Cannot access target server. HTTP code %s",
http_err.code)
raise http_err
except Exception, err:
LOG.error("HTTP Request error %s", err)
raise err
else:
LOG.debug("HTTP repLen=%d", len(data))
return data
def get_pin_code(self):
return self.get_input("Enter the PIN code: ")
def mk_oauth_query_data(self, opts):
# sort the query data and create the query string
query_data = opts["req_data"].items()
query_data.sort()
query_str = self.get_normalized_urlstr(query_data)
# base_str = "HTTP Method (GET/POST)" + "&" +
# "url_encode(base_url)" + "&" +
# sorted(querysting.items()).join('&');
base_str = "%s&%s&%s" % (opts["method"],
self._escape(opts["base_url"]), self._escape(query_str))
LOG.debug("base str: %s", base_str)
# oauth_signature = signature(base_string)
# signature-method = SHA1;
# signature-key = "consumer_secret&access_secret"
sig_item = self.get_oauth_sig_item(base_str, opts["auth_keys"])
query_data.append(sig_item)
return query_data
def mk_oauth_hdr(self, query_data):
oauth_params = ((key, val) for key, val in query_data
if key.startswith('oauth_'))
stringy_params = ((key, self._escape(str(val)))
for key, val in oauth_params)
header_params = ("%s=\"%s\"" % (key, val)
for key, val in stringy_params)
params_hdr = ','.join(header_params)
auth_hdr = "OAuth "
if params_hdr:
auth_hdr = "%s %s" % (auth_hdr, params_hdr)
LOG.debug("Hdr: %s", auth_hdr)
return { "Authorization": auth_hdr }
def mk_oauth(self, opts):
if opts["method"] not in ("GET", "POST"):
LOG.error("Invalid HTTP method: %s", opts["method"])
raise ValueError("Invalid HTTP method")
# Fanfou only supports "HMAC-SHA1" signature and oauth version 1.0
req_data = opts.get("req_data")
req_data["oauth_signature_method"] = "HMAC-SHA1"
req_data["oauth_version"] = "1.0"
# auto update oauth_timestamp & oauth_nonce
if req_data.has_key("oauth_timestamp") != True:
req_data["oauth_timestamp"] = self.generate_timestamp()
if req_data.has_key("oauth_nonce") != True:
req_data["oauth_nonce"] = self.generate_nonce()
# make query data
query_data = self.mk_oauth_query_data(opts)
if opts["method"] == "GET":
# make GET uri
return {
"method": "GET",
"uri": "%s?%s" % (opts["base_url"],
self.get_normalized_urlstr(query_data)),
}
else:
# make POST header&body
hdr = { "Content-Type": "application/x-www-form-urlencoded" }
hdr.update(self.mk_oauth_hdr(query_data))
return {
"method": "POST",
"uri": opts["base_url"],
"data": self.get_normalized_urlstr(query_data),
"header": hdr,
}
def get_cached_acc_token(self):
cache_filename = self.get_full_cache_filename(self.auth_cache)
try:
LOG.debug("load auth cache from %s", cache_filename)
cache = ConfigParser.RawConfigParser()
cache.read(cache_filename)
except Exception, err:
LOG.warn("cannot get token from %s", cache_filename)
raise err
if (
cache.has_option("acc_token", "oauth_token") != True or
cache.has_option("acc_token", "oauth_token_secret") != True
):
LOG.warn("cannot get cached token")
raise ValueError("Invalid OAuth token")
result = {
"oauth_token": cache.get("acc_token", "oauth_token"),
"oauth_token_secret": cache.get("acc_token", "oauth_token_secret")
}
LOG.debug("get cached acc token: %s", result)
return result
def update_auth_cache(self, acc_token):
cache = ConfigParser.ConfigParser()
cache.add_section("acc_token")
cache.set("acc_token", "oauth_token",
acc_token.get("oauth_token"))
cache.set("acc_token", "oauth_token_secret",
acc_token.get("oauth_token_secret"))
# save the toke to local cache
cache_filename = self.get_full_cache_filename(self.auth_cache)
LOG.debug("save auth cache to %s", cache_filename)
with open(cache_filename, "w") as cache_file:
cache.write(cache_file)
def send_oauth_req(self, opts):
oauth_req = self.mk_oauth(opts)
return self.send_req(oauth_req)
def get_unauth_request_token(self):
# get unauth request_token
# 1. send the signed GET request to
# "http://fanfou.com/oauth/request_token"
# 2. parse the response and get the unauth token
# For example:
# rep =
# "oauth_token=12345&oauth_token_secret=7890"
data = {
"oauth_consumer_key": self.oauth_config["consumer_key"],
}
try:
rep_data = self.send_oauth_req({
"method": "GET",
"base_url": self.urls["unauth_req_token"],
"req_data": data,
"auth_keys": [self.oauth_config["consumer_secret"]],
})
except Exception, err:
LOG.error("cannot get oauth req token: err %s", err)
raise err
token = urlparse.parse_qs(rep_data)
require_keys = ("oauth_token", "oauth_token_secret")
if chk_keys(require_keys, token.keys()) != True:
LOG.error("Invalid OAuth Token, repKeys = %s", token.keys())
raise ValueError("Invalid OAuth token")
result = {
"oauth_token": "".join(token["oauth_token"]),
"oauth_token_secret": "".join(token["oauth_token_secret"])
}
LOG.debug("oauth_token: %s", result)
return result
def get_auth_request_token(self, unauth_token):
oauth_token = unauth_token.get("oauth_token", "")
oauth_token_secret = unauth_token.get("oauth_token_secret", "")
LOG.debug("token %s (%s)", oauth_token, oauth_token_secret)
auth_url = ("%s?oauth_token=%s&oauth_callback=oob" % (
self.urls["authorize"], oauth_token))
LOG.debug(auth_url)
# Open the authorize page and waitting the "PIN" code
self.open_url(auth_url)
pin = self.get_pin_code()
LOG.debug("got pin code: %s", pin)
return {
"oauth_token": oauth_token,
"oauth_verifier": pin,
}
def get_acc_token(self, autho_token):
# send request to self.urls["acc_token"]
data = {
"oauth_consumer_key": self.oauth_config["consumer_key"],
"oauth_token": autho_token.get("oauth_token", ""),
"oauth_verifier": autho_token.get("oauth_verifier", ""),
}
try:
rep_data = self.send_oauth_req({
"method": "GET",
"base_url": self.urls["acc_token"],
"req_data": data,
"auth_keys": [self.oauth_config["consumer_secret"]],
})
except Exception, err:
LOG.error("cannot get oauth acc token: err %s", err)
raise err
# parse&verify the response token
token = urlparse.parse_qs(rep_data)
require_keys = ("oauth_token", "oauth_token_secret")
if chk_keys(require_keys, token.keys()) != True:
LOG.error("Invalid OAuth Token, repKeys = %s", token.keys())
raise ValueError("Invalid OAuth token")
result = {
"oauth_token": "".join(token["oauth_token"]),
"oauth_token_secret": "".join(token["oauth_token_secret"])
}
LOG.debug("acc_token: %s", result)
return result
def get_new_acc_token(self):
unauth_token = self.get_unauth_request_token()
LOG.debug("get unauthon token %s", unauth_token)
autho_token = self.get_auth_request_token(unauth_token)
LOG.debug("get autho token: %s", autho_token)
acc_token = self.get_acc_token(autho_token)
LOG.debug("get acc token: %s", acc_token)
return acc_token
class Fanfou(object):
def __init__(self, fanfou_oauth):
self.oauth = fanfou_oauth
self.acc_token = {}
self.acc_token_loaded = False
# api URLs
self.urls = {
"home_timeline":
"http://api.fanfou.com/statuses/home_timeline.json",
"update":
"http://api.fanfou.com/statuses/update.json",
"mentions":
"http://api.fanfou.com/statuses/mentions.json"
}
def load_token(self, auto_renew = False):
try:
self.acc_token = self.oauth.get_cached_acc_token()
self.acc_token_loaded = True
LOG.debug("loaded cached acc token")
except Exception, err:
LOG.debug("cannot load cached acc token")
if auto_renew != True:
LOG.debug("cannot get cached acc token")
raise err
# create a new acc token
LOG.debug("requesting a new acc token")
self.acc_token = self.oauth.get_new_acc_token()
LOG.debug("updating acc token cache, %s", self.acc_token)
self.oauth.update_auth_cache(self.acc_token)
self.acc_token_loaded = True
def mk_api_req(self, opts):
if self.acc_token_loaded != True:
LOG.error("Invalid Access Token")
raise ValueError("Invalid Access Token")
consumer_secret = self.oauth.oauth_config["consumer_secret"]
consumer_key = self.oauth.oauth_config["consumer_key"]
acc_token_secret = self.acc_token["oauth_token_secret"]
opts["req_data"].update({
"oauth_consumer_key": consumer_key,
"oauth_token": self.acc_token["oauth_token"],
})
opts["auth_keys"] = [consumer_secret, acc_token_secret]
return self.oauth.mk_oauth(opts)
@staticmethod
def _parse_json_messages(data):
ret_val = []
LOG.debug("parse messages, len=%d", len(data))
for item in data:
ret_item = Fanfou._parse_json_message(item)
if not ret_item:
continue
ret_val.append(ret_item)
return ret_val
@staticmethod
def _parse_json_message(data):
msg_keys = ("id", "text", "created_at", "user")
usr_keys = ("id", "name")
if chk_keys(msg_keys, data.keys()) != True:
return None
usr = data["user"]
if chk_keys(usr_keys, usr.keys()) != True:
return None
ret_item = {
"id": data["id"],
"text": data["text"],
"created_at": data["created_at"],
"user_id": usr["id"],
"user_name": usr["name"],
}
if data.has_key("photo"):
ret_item.update(Fanfou.parse_photo(data["photo"]))
return ret_item
@staticmethod
def parse_photo(photo_data):
if not photo_data:
return {}
elif photo_data["largeurl"]:
return { "photo_url": photo_data["largeurl"] }
elif photo_data["imageurl"]:
return { "photo_url": photo_data["imageurl"] }
elif photo_data["thumburl"]:
return { "photo_url": photo_data["thumburl"] }
else:
return {}
def parse_rep_messages(self, data):
LOG.debug("parse rep, dataLen = %d", len(data))
try:
results = json.loads(data)
except Exception, err:
LOG.error("Invalid JSON object")
raise err
return self._parse_json_messages(results)
def parse_rep_message(self, data):
LOG.debug("parse rep, dataLen = %d", len(data))
try:
results = json.loads(data)
except Exception, err:
LOG.error("Invalid JSON object")
raise err
return self._parse_json_message(results)
def send_api_req(self, opts):
api_req = self.mk_api_req(opts)
return self.oauth.send_req(api_req)
def statuses_update(self, status):
# check status length
status_len = mbstrlen(status)
if status_len > 140:
raise Exception("Invalid status; Too many characters. " +
"It was not sent.")
elif status_len < 0:
raise Exception("Invalid status; It's empty. It was not sent.")
# send status
data = { "status": status }
try:
rep_data = self.send_api_req({
"method": "POST",
"base_url": self.urls["update"],
"req_data": data,
})
except Exception, err:
LOG.error("cannot update status, err %s", err)
raise err
# parse post message result
return self.parse_rep_message(rep_data)
def get_home_timeline(self, opts):
data = {
"count": opts.get("count", 10),
}
try:
rep_data = self.send_api_req({
"method": "GET",
"base_url": self.urls["home_timeline"],
"req_data": data,
})
except Exception, err:
LOG.error("cannot access home timeline, err %s", err)
raise err
# parse response
return self.parse_rep_messages(rep_data)
def get_statuses_mentions(self, opts):
data = {
"count": opts.get("count", 10),
}
try:
rep_data = self.send_api_req({
"method": "GET",
"base_url": self.urls["mentions"],
"req_data": data,
})
except Exception, err:
LOG.error("cannot access status mentions, err %s", err)
raise err
# parse response
return self.parse_rep_messages(rep_data)
def main():
LOG.info("start fanfou api test")
# load config
LOG.debug("load config fanfou.cfg")
cfg = load_fanfou_oauth_config(".fanfou.cfg")
fanfou_oauth = FanfouOAuth(cfg)
fanfou = Fanfou(fanfou_oauth)
fanfou.load_token()
# --> Read Timeline
LOG.debug(">> HOME Timeline")
tm_lines = fanfou.get_home_timeline({ "count": 3 })
LOG.debug("Timeline: len=%d", len(tm_lines))
for tm_ln in tm_lines:
LOG.debug("usr: %s (%s) - msg: %s",
tm_ln["user_name"], tm_ln["created_at"], tm_ln["text"])
LOG.debug("---------------------")
# --> Read mentions
LOG.debug(">> Mentions")
mentions = fanfou.get_statuses_mentions({ "count": 3 })
LOG.debug("Mentions: len=%d", len(mentions))
for mention in mentions:
LOG.debug("usr: %s (%s) - msg: %s",
mention["user_name"], mention["created_at"], mention["text"])
LOG.debug("---------------------")
# ---> Status post
# ret_item = fanfou.statuses_update("Fanfou API Test")
# LOG.debug("post %s", ret_item)
if __name__ == "__main__":
main()
[fanfou]
consumer_key=<your consumer key>
consumer_secret=<your consumer secret>
auth_cache=.fanfou_auth_cache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment