Created
May 20, 2011 07:17
-
-
Save armon/982485 to your computer and use it in GitHub Desktop.
Facebook interface to get posts and perform hierarchical user clustering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from math import sqrt | |
import httplib2 | |
import json | |
import re | |
import threadpool | |
import threading | |
from PIL import Image,ImageDraw | |
ACCESS_TOKEN = "..." | |
FRIEND_URL = "https://graph.facebook.com/me/friends?access_token=%s" % ACCESS_TOKEN | |
NEWS_URL = "https://graph.facebook.com/me/home?access_token=%s" % ACCESS_TOKEN | |
WALL_URL_FORM = "https://graph.facebook.com/%(id)s/feed?access_token=%(access)s" | |
def get_friends(): | |
"Gets all of the friends. Returns a Name -> Id mapping" | |
http = httplib2.Http(timeout=10) | |
resp, raw = http.request(FRIEND_URL, "GET") | |
data = json.loads(raw)["data"] | |
return dict([(x["name"], x["id"]) for x in data]) | |
def _create_get_posts(all_data, friend_name, friend_id, num_pages): | |
"Creates a function we can send to a threadpool to get the posts for one person" | |
def inner(tpool): | |
# Cache an HTTP handle, one per thread | |
try: | |
http = threading.local().http | |
except: | |
http = httplib2.Http(timeout=10) | |
threading.local().http = http | |
current_url = WALL_URL_FORM % {"id":friend_id, "access":ACCESS_TOKEN} | |
for page in xrange(num_pages): | |
resp, raw = http.request(current_url, "GET") | |
raw = json.loads(raw) | |
data = raw["data"] | |
for post in data: | |
all_data.append(post) | |
print "Added %d posts from %s" % (len(data), friend_name) | |
# Go to the next page | |
try: | |
current_url = raw["paging"]["next"] | |
except: | |
break | |
return inner | |
def get_friend_wall_posts(friends, num_pages=1): | |
"Returns all the posts made on friends walls for the number of wall pages" | |
all_data = [] | |
# Send all the friends to a thread pool to download status updates | |
pool = threadpool.ThreadPool(threads=32,poll_intv=1) | |
for friend_name, friend_id in friends.items(): | |
pool.add(_create_get_posts(all_data, friend_name, friend_id, num_pages)) | |
# Wait for completion | |
pool.shutdown() | |
return all_data | |
def filter_friends_only(friends, filtered): | |
# Get the set of friend ids | |
return [post for post in filtered if post[1] in friends] | |
def get_new_posts(num_pages): | |
""" | |
Gets a specified number of pages worth of wall posts. | |
""" | |
all_data = [] | |
http = httplib2.Http() | |
current_url = NEWS_URL | |
for page in xrange(num_pages): | |
resp, raw = http.request(current_url, "GET") | |
data = json.loads(raw) | |
all_data += data["data"] | |
# Go to the next page | |
try: | |
current_url = data["paging"]["next"] | |
except: | |
break | |
return all_data | |
def filter_data(data): | |
"Filters data to just get id, name, and message for status updates" | |
filtered = [] | |
for entry in data: | |
try: | |
if entry["type"] != "status": | |
continue | |
id = entry["id"] | |
name = entry["from"]["name"] | |
message = entry["message"].lower() | |
filtered.append((id, name, message)) | |
except: | |
print "Failed to parse: ",entry | |
return filtered | |
def get_words(filtered): | |
""" | |
Processes status updates to return 3 dictionaries. | |
The first dictionary returns a mapping of words to a set | |
of message id's that contain the word. | |
The second dictionary returns a mapping of people to a | |
dictionary of words -> occurances | |
The third dictionary returns a mapping of messages to a | |
dictionary of words -> occurances | |
""" | |
p = re.compile("[^a-zA-Z]+") | |
all_words = defaultdict(set) | |
person_count = {} | |
message_count = {} | |
for id,name,mesg in filtered: | |
person_count.setdefault(name, defaultdict(int)) | |
message_count.setdefault(id, defaultdict(int)) | |
words = p.split(mesg) | |
for word in words: | |
all_words[word].add(id) | |
person_count[name][word]+=1 | |
message_count[id][word]+=1 | |
return all_words,person_count,message_count | |
def filter_words(total_mesg, all_words, person_data, message_data, max_occur=0.02, min_occur=0.0007): | |
""" | |
Filters all the dictionaries to only contain words that have | |
a specified minimum and maximum occurance proportion. | |
Returns a list of the "good" words, and a modified person and message dictionary. | |
""" | |
good_words = [] | |
props = [] | |
for word,messages in all_words.items(): | |
proportion = len(messages) / (1.0*total_mesg) | |
props.append((proportion, word)) | |
if proportion < max_occur and proportion > min_occur: | |
good_words.append(word) | |
props.sort() | |
print props | |
persons = dict([(person, dict([(word, person_data[person][word]) for word in good_words])) for person in person_data]) | |
messages = dict([(mesg, dict([(word, message_data[mesg][word]) for word in good_words])) for mesg in message_data]) | |
return good_words, persons, messages | |
def vectorize_dict(key_ordered, dict): | |
"Vectorizes the values of a dictionary by iterating in a specific order" | |
return [dict[key] for key in key_ordered] | |
def doit(raw_posts=None,filtered=None): | |
"Takes raw posts from get_new_posts or get_friend_posts and generates the output charts" | |
if filtered is None: | |
filtered = filter_data(raw_posts) | |
all_words, person_data, mesg_data = get_words(filtered) | |
print "Total words: %d, Total People: %d, Total Messages: %d" % (len(all_words), len(person_data), len(mesg_data)) | |
all_words, person_data, mesg_data = filter_words(len(filtered), all_words, person_data, mesg_data) | |
print "Filtered words: %d, Filtered People: %d, Filtered Messages: %d" % (len(all_words), len(person_data), len(mesg_data)) | |
print "Making person clusters" | |
person_clusters = [bicluster(vectorize_dict(all_words, person_data[p]), id=p) for p in person_data.keys()] | |
print "Clustering people" | |
person_root = hcluster(person_clusters) | |
print "Drawing people" | |
drawdendrogram(person_root, dict([(p,p.encode("utf-8")) for p in person_data.keys()]), "person_root.jpg") | |
""" | |
print "Making mesage clusters" | |
mesg_clusters = [bicluster(vectorize_dict(filtered_words, fmesg[m]), id=m) for m in fmesg.keys()] | |
print "Clustering mesages" | |
mesg_root = hcluster(mesg_clusters) | |
# Map message id -> person | |
print "Drawing messages" | |
id_map = dict([(id,person.encode("utf-8")+" "+mesg[:30].encode("utf-8")) for (id,person,mesg) in filtered]) | |
drawdendrogram(mesg_root, id_map, "mesg_root.jpg") | |
""" | |
return person_root | |
###### Borrowed from Programming Collective Intelligence | |
def pearson(v1,v2): | |
# Simple sums | |
sum1 = 0 | |
sum2 = 0 | |
sum1Sq = 0 | |
sum2Sq = 0 | |
pSum = 0 | |
# Compute all the sums | |
for i in xrange(len(v1)): | |
val1 = v1[i] | |
val2 = v2[i] | |
sum1 += val1 | |
sum2 += val2 | |
sum1Sq += val1*val1 | |
sum2Sq += val2*val2 | |
pSum += val1*val2 | |
# Calculate r (Pearson score) | |
num=pSum-(sum1*sum2/len(v1)) | |
den=sqrt((sum1Sq-pow(sum1,2)/len(v1))*(sum2Sq-pow(sum2,2)/len(v1))) | |
if den==0: return 0 | |
return 1.0-num/den | |
class bicluster: | |
def __init__(self,vec,left=None,right=None,distance=0.0,id=None): | |
self.left=left | |
self.right=right | |
self.vec=vec | |
self.id=id | |
self.distance=distance | |
# Slightly modified to take the cluster instead of creating it | |
def hcluster(clust,distance=pearson): | |
distances={} | |
currentclustid=-1 | |
while len(clust)>1: | |
lowestpair=(0,1) | |
closest=distance(clust[0].vec,clust[1].vec) | |
# loop through every pair looking for the smallest distance | |
for i in xrange(len(clust)): | |
for j in xrange(i+1,len(clust)): | |
# distances is the cache of distance calculations | |
if (clust[i].id,clust[j].id) not in distances: | |
d = distances[(clust[i].id,clust[j].id)] = distance(clust[i].vec,clust[j].vec) | |
else: | |
d = distances[(clust[i].id,clust[j].id)] | |
if d<closest: | |
closest=d | |
lowestpair=(i,j) | |
# calculate the average of the two clusters | |
mergevec=[ | |
(clust[lowestpair[0]].vec[i]+clust[lowestpair[1]].vec[i])/2.0 | |
for i in xrange(len(clust[0].vec))] | |
# create the new cluster | |
newcluster=bicluster(mergevec,left=clust[lowestpair[0]], | |
right=clust[lowestpair[1]], | |
distance=closest,id=currentclustid) | |
print "Merged cluster" | |
# cluster ids that weren't in the original set are negative | |
currentclustid-=1 | |
del clust[lowestpair[1]] | |
del clust[lowestpair[0]] | |
clust.append(newcluster) | |
return clust[0] | |
def drawdendrogram(clust,labels,jpeg='clusters.jpg'): | |
# height and width | |
h=getheight(clust)*20 | |
w=1200 | |
depth=getdepth(clust) | |
# width is fixed, so scale distances accordingly | |
scaling=float(w-150)/depth | |
# Create a new image with a white background | |
img=Image.new('RGB',(w,h),(255,255,255)) | |
draw=ImageDraw.Draw(img) | |
draw.line((0,h/2,10,h/2),fill=(255,0,0)) | |
# Draw the first node | |
drawnode(draw,clust,10,(h/2),scaling,labels) | |
img.save(jpeg,'JPEG') | |
def drawnode(draw,clust,x,y,scaling,labels): | |
if clust.id<0: | |
h1=getheight(clust.left)*20 | |
h2=getheight(clust.right)*20 | |
top=y-(h1+h2)/2 | |
bottom=y+(h1+h2)/2 | |
# Line length | |
ll=clust.distance*scaling | |
# Vertical line from this cluster to children | |
draw.line((x,top+h1/2,x,bottom-h2/2),fill=(255,0,0)) | |
# Horizontal line to left item | |
draw.line((x,top+h1/2,x+ll,top+h1/2),fill=(255,0,0)) | |
if clust.left.distance != 0.0: | |
draw.text((x+2,top+h1/2+5),"%.2f" % clust.left.distance,(0,0,0)) | |
# Horizontal line to right item | |
draw.line((x,bottom-h2/2,x+ll,bottom-h2/2),fill=(255,0,0)) | |
if clust.right.distance != 0.0: | |
draw.text((x+2,bottom-h2/2+5),"%.2f" % clust.right.distance,(0,0,0)) | |
# Call the function to draw the left and right nodes | |
drawnode(draw,clust.left,x+ll,top+h1/2,scaling,labels) | |
drawnode(draw,clust.right,x+ll,bottom-h2/2,scaling,labels) | |
else: | |
# If this is an endpoint, draw the item label | |
draw.text((x+5,y-7),labels[clust.id],(0,0,0)) | |
def getheight(clust): | |
# Is this an endpoint? Then the height is just 1 | |
if clust.left==None and clust.right==None: return 1 | |
# Otherwise the height is the same of the heights of | |
# each branch | |
return getheight(clust.left)+getheight(clust.right) | |
def getdepth(clust): | |
# The distance of an endpoint is 0.0 | |
if clust.left==None and clust.right==None: return 0 | |
# The distance of a branch is the greater of its two sides | |
# plus its own distance | |
return max(getdepth(clust.left),getdepth(clust.right))+clust.distance | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module implements a simple thread pool. | |
""" | |
from Queue import Queue, Empty | |
from threading import Thread | |
import logging | |
# Get a logger | |
logger = logging.getLogger(__name__) | |
class ThreadPool (object): | |
def __init__(self, threads=8, poll_intv=5): | |
# Create our task queue | |
self.queue = Queue() | |
self.poll_intv = poll_intv | |
# Mark as not finished | |
self._shutdown = False | |
# Start some threads | |
self.workers = [] | |
for i in xrange(threads): | |
t = Thread(target=self._worker) | |
t.setDaemon(True) | |
self.workers.append(t) | |
t.start() | |
def shutdown(self): | |
"Waits for the pool to terminate" | |
self._shutdown = True | |
self.queue.join() | |
for t in self.workers: | |
t.join() | |
def add(self, func): | |
""" | |
Adds a function to be executed. | |
func should take a reference to this threadpool object. | |
""" | |
self.queue.put(func) | |
def _worker(self): | |
"Worker main method" | |
while True: | |
try: | |
task = self.queue.get(timeout=self.poll_intv) | |
except Empty: | |
if self._shutdown: | |
break | |
continue | |
try: | |
task(self) | |
except Exception, e: | |
logger.exception("Caught an exception invoking a task!") | |
print e | |
finally: | |
self.queue.task_done() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment