Skip to content

Instantly share code, notes, and snippets.

@upsilon
Forked from mizchi/tw_event_viewer.py
Created August 25, 2010 22:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save upsilon/550448 to your computer and use it in GitHub Desktop.
Save upsilon/550448 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
#-*- encoding:utf-8 -*-
#====== options =============
#Oauth用認証トークン 各自取得
ckey = ""
csecret = ""
atoken =""
atoken_secret = ""
#============================
import simplejson
import sqlite3,re
import random
import urllib, urllib2
import hmac, hashlib
import cgi
import time
chirp_url = 'https://userstream.twitter.com/2/user.json'
def make_signature(params, url, method, csecret, secret = ""):
# Generate Signature Base String
plist = []
for i in sorted(params):
plist.append("%s=%s" % (i, params[i]))
pstr = "&".join(plist)
msg = "%s&%s&%s" % (method, urllib.quote(url, ""),
urllib.quote(pstr, ""))
# Calculate Signature
h = hmac.new("%s&%s" % (csecret, secret), msg, hashlib.sha1)
sig = h.digest().encode("base64").strip()
return sig
def init_params():
p = {
"oauth_consumer_key": ckey,
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": str(int(time.time())),
"oauth_nonce": str(random.getrandbits(64)),
"oauth_version": "1.0"
}
return p
def oauth_header(params):
plist = []
for p in params:
plist.append('%s="%s"' % (p, urllib.quote(params[p])))
return "OAuth %s" % (", ".join(plist))
def get_chirp_stream():
params = init_params()
params["oauth_token"] = atoken
sig = make_signature(params, chirp_url, "GET", csecret, atoken_secret)
params["oauth_signature"] = sig
req = urllib2.Request(chirp_url)
req.add_header("Authorization", oauth_header(params))
return urllib2.urlopen(req)
def colorize(text, color):
return "\x1b[3%dm%s\x1b[30m" % (color, text)
def main():
stream = get_chirp_stream()
stream.readline()
stream.readline()
while 1:
recv = stream.readline()
json = None
try:
json = simplejson.loads(recv)
except simplejson.decoder.JSONDecodeError:
continue
text = ""
if json.has_key("event"):
text = json["source"]["screen_name"]+ " "+ json["event"] +" "+json["target"]["screen_name"]
text = colorize(text, 1)
if json["event"] in ("favorite","retweet"):
text+= "\t"+json["target_object"]["text"]
elif json.has_key("delete"):
text = colorize("delete:", 1)
text += str(json["delete"]["status"]["user_id"]) +" "+ str(json["delete"]["status"]["id"])
elif json.has_key("text"):
text = colorize("tweet:", 1)
text += json["user"]["screen_name"] +" "+ json["text"]
if text :
print text
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment