Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@mizchi
Created May 23, 2010 13:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mizchi/410946 to your computer and use it in GitHub Desktop.
Save mizchi/410946 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
#-*- encoding:utf-8 -*-
"""
require : simplejson
simplejson - Project Hosting on Google Code http://code.google.com/p/simplejson/
oauthのコンシューマIDとアクセストークンは自分で取得してください
$ python tw_stream.py <抽出ユーザー名>
"""
ckey = ""
csecret = ""
atoken =""
atoken_secret = ""
from sys import argv
import time, random
import urllib, urllib2
import hmac, hashlib
import cgi
import simplejson
import time
from time import sleep , time ,strftime,localtime
def make_signature(params, url, method, csecret, secret = ""):
# Generate Signature Base String
plist = []
for i in sorted(params):
plist.append("%s=%s" % (i, params[i]))
pstr = "&".join(plist)
msg = "%s&%s&%s" % (method, urllib.quote(url, ""),
urllib.quote(pstr, ""))
# Calculate Signature
h = hmac.new("%s&%s" % (csecret, secret), msg, hashlib.sha1)
sig = h.digest().encode("base64").strip()
return sig
def init_params():
p = {
"oauth_consumer_key": ckey,
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": str(int(time())),
"oauth_nonce": str(random.getrandbits(64)),
"oauth_version": "1.0"
}
return p
def oauth_header(params):
plist = []
for p in params:
plist.append('%s="%s"' % (p, urllib.quote(params[p])))
return "OAuth %s" % (", ".join(plist))
reqt_url = 'http://twitter.com/oauth/request_token'
auth_url = 'http://twitter.com/oauth/authorize'
acct_url = 'http://twitter.com/oauth/access_token'
chirp_url ='http://chirpstream.twitter.com/2b/user.json'
if not atoken and not atoken_secret:
# Request Parameters
params = init_params()
print "Get request token:",
# Generate Signature
sig = make_signature(params, reqt_url, "GET", csecret)
params["oauth_signature"] = sig
# Get Token
req = urllib2.Request("%s?%s" % (reqt_url, urllib.urlencode(params)))
resp = urllib2.urlopen(req)
print "\t[OK]"
# Parse Token Parameters
ret = cgi.parse_qs(resp.read())
token = ret["oauth_token"][0]
token_secret = ret["oauth_token_secret"][0]
# Get PIN
print "* Please access to this URL, and allow."
print "> %s?%s=%s" % (auth_url, "oauth_token", token)
print "\n* After that, will display 7 digit PIN, input here."
print "PIN ->",
pin = raw_input()
pin = int(pin)
print "Get access token:",
# Generate Access Token Request
params = init_params()
params["oauth_verifier"] = pin
params["oauth_token"] = token
sig = make_signature(params, acct_url, "GET", csecret, token_secret)
params["oauth_signature"] = sig
# Get Access Token
req = urllib2.Request("%s?%s" % (acct_url, urllib.urlencode(params)))
resp = urllib2.urlopen(req)
print "\t[OK]"
# Parse Access Token
fin = cgi.parse_qs(resp.read())
atoken = fin["oauth_token"][0]
atoken_secret = fin["oauth_token_secret"][0]
print "Access Token: %s" % atoken
print "Access Token Secret: %s" % atoken_secret
print "Your screen_name is '%s'." % fin["screen_name"][0]
def getChirpStream():
url ='http://chirpstream.twitter.com/2b/user.json'
params = init_params()
params["oauth_token"] = atoken
sig = make_signature(params, url, "GET", csecret, atoken_secret)
params["oauth_signature"] = sig
req = urllib2.Request(url)
req.add_header("Authorization", oauth_header(params))
return urllib2.urlopen(req)
def main(argv):
stream = getChirpStream()
stream.readline()
stream.readline()
extract = argv[1]
while 1:
try:
recv = stream.readline()
json = simplejson.loads(recv)
if json.has_key("event"):
text = json["source"]["screen_name"]+ " "+ json["event"] +"\t"+json["target"]["screen_name"]+":"
if json["event"] == "favorite":
text+= json["target_object"]["text"]
elif json["event"] == "retweet":
text+=" "+json["target_object"]["text"]
# elif json["event"] == "follow":
# print json["target"]["description"]
print text
if json["target"]["screen_name"] == extract:
print "==================== check it!"
try:pass
if __name__ == "__main__":
main(argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment