Skip to content

Instantly share code, notes, and snippets.

@armon
Created May 20, 2011 07:17
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save armon/982485 to your computer and use it in GitHub Desktop.
Save armon/982485 to your computer and use it in GitHub Desktop.
Facebook interface to get posts and perform hierarchical user clustering
from collections import defaultdict
from math import sqrt
import httplib2
import json
import re
import threadpool
import threading
from PIL import Image,ImageDraw
ACCESS_TOKEN = "..."
FRIEND_URL = "https://graph.facebook.com/me/friends?access_token=%s" % ACCESS_TOKEN
NEWS_URL = "https://graph.facebook.com/me/home?access_token=%s" % ACCESS_TOKEN
WALL_URL_FORM = "https://graph.facebook.com/%(id)s/feed?access_token=%(access)s"
def get_friends():
"Gets all of the friends. Returns a Name -> Id mapping"
http = httplib2.Http(timeout=10)
resp, raw = http.request(FRIEND_URL, "GET")
data = json.loads(raw)["data"]
return dict([(x["name"], x["id"]) for x in data])
def _create_get_posts(all_data, friend_name, friend_id, num_pages):
"Creates a function we can send to a threadpool to get the posts for one person"
def inner(tpool):
# Cache an HTTP handle, one per thread
try:
http = threading.local().http
except:
http = httplib2.Http(timeout=10)
threading.local().http = http
current_url = WALL_URL_FORM % {"id":friend_id, "access":ACCESS_TOKEN}
for page in xrange(num_pages):
resp, raw = http.request(current_url, "GET")
raw = json.loads(raw)
data = raw["data"]
for post in data:
all_data.append(post)
print "Added %d posts from %s" % (len(data), friend_name)
# Go to the next page
try:
current_url = raw["paging"]["next"]
except:
break
return inner
def get_friend_wall_posts(friends, num_pages=1):
"Returns all the posts made on friends walls for the number of wall pages"
all_data = []
# Send all the friends to a thread pool to download status updates
pool = threadpool.ThreadPool(threads=32,poll_intv=1)
for friend_name, friend_id in friends.items():
pool.add(_create_get_posts(all_data, friend_name, friend_id, num_pages))
# Wait for completion
pool.shutdown()
return all_data
def filter_friends_only(friends, filtered):
# Get the set of friend ids
return [post for post in filtered if post[1] in friends]
def get_new_posts(num_pages):
"""
Gets a specified number of pages worth of wall posts.
"""
all_data = []
http = httplib2.Http()
current_url = NEWS_URL
for page in xrange(num_pages):
resp, raw = http.request(current_url, "GET")
data = json.loads(raw)
all_data += data["data"]
# Go to the next page
try:
current_url = data["paging"]["next"]
except:
break
return all_data
def filter_data(data):
"Filters data to just get id, name, and message for status updates"
filtered = []
for entry in data:
try:
if entry["type"] != "status":
continue
id = entry["id"]
name = entry["from"]["name"]
message = entry["message"].lower()
filtered.append((id, name, message))
except:
print "Failed to parse: ",entry
return filtered
def get_words(filtered):
"""
Processes status updates to return 3 dictionaries.
The first dictionary returns a mapping of words to a set
of message id's that contain the word.
The second dictionary returns a mapping of people to a
dictionary of words -> occurances
The third dictionary returns a mapping of messages to a
dictionary of words -> occurances
"""
p = re.compile("[^a-zA-Z]+")
all_words = defaultdict(set)
person_count = {}
message_count = {}
for id,name,mesg in filtered:
person_count.setdefault(name, defaultdict(int))
message_count.setdefault(id, defaultdict(int))
words = p.split(mesg)
for word in words:
all_words[word].add(id)
person_count[name][word]+=1
message_count[id][word]+=1
return all_words,person_count,message_count
def filter_words(total_mesg, all_words, person_data, message_data, max_occur=0.02, min_occur=0.0007):
"""
Filters all the dictionaries to only contain words that have
a specified minimum and maximum occurance proportion.
Returns a list of the "good" words, and a modified person and message dictionary.
"""
good_words = []
props = []
for word,messages in all_words.items():
proportion = len(messages) / (1.0*total_mesg)
props.append((proportion, word))
if proportion < max_occur and proportion > min_occur:
good_words.append(word)
props.sort()
print props
persons = dict([(person, dict([(word, person_data[person][word]) for word in good_words])) for person in person_data])
messages = dict([(mesg, dict([(word, message_data[mesg][word]) for word in good_words])) for mesg in message_data])
return good_words, persons, messages
def vectorize_dict(key_ordered, dict):
"Vectorizes the values of a dictionary by iterating in a specific order"
return [dict[key] for key in key_ordered]
def doit(raw_posts=None,filtered=None):
"Takes raw posts from get_new_posts or get_friend_posts and generates the output charts"
if filtered is None:
filtered = filter_data(raw_posts)
all_words, person_data, mesg_data = get_words(filtered)
print "Total words: %d, Total People: %d, Total Messages: %d" % (len(all_words), len(person_data), len(mesg_data))
all_words, person_data, mesg_data = filter_words(len(filtered), all_words, person_data, mesg_data)
print "Filtered words: %d, Filtered People: %d, Filtered Messages: %d" % (len(all_words), len(person_data), len(mesg_data))
print "Making person clusters"
person_clusters = [bicluster(vectorize_dict(all_words, person_data[p]), id=p) for p in person_data.keys()]
print "Clustering people"
person_root = hcluster(person_clusters)
print "Drawing people"
drawdendrogram(person_root, dict([(p,p.encode("utf-8")) for p in person_data.keys()]), "person_root.jpg")
"""
print "Making mesage clusters"
mesg_clusters = [bicluster(vectorize_dict(filtered_words, fmesg[m]), id=m) for m in fmesg.keys()]
print "Clustering mesages"
mesg_root = hcluster(mesg_clusters)
# Map message id -> person
print "Drawing messages"
id_map = dict([(id,person.encode("utf-8")+" "+mesg[:30].encode("utf-8")) for (id,person,mesg) in filtered])
drawdendrogram(mesg_root, id_map, "mesg_root.jpg")
"""
return person_root
###### Borrowed from Programming Collective Intelligence
def pearson(v1,v2):
# Simple sums
sum1 = 0
sum2 = 0
sum1Sq = 0
sum2Sq = 0
pSum = 0
# Compute all the sums
for i in xrange(len(v1)):
val1 = v1[i]
val2 = v2[i]
sum1 += val1
sum2 += val2
sum1Sq += val1*val1
sum2Sq += val2*val2
pSum += val1*val2
# Calculate r (Pearson score)
num=pSum-(sum1*sum2/len(v1))
den=sqrt((sum1Sq-pow(sum1,2)/len(v1))*(sum2Sq-pow(sum2,2)/len(v1)))
if den==0: return 0
return 1.0-num/den
class bicluster:
def __init__(self,vec,left=None,right=None,distance=0.0,id=None):
self.left=left
self.right=right
self.vec=vec
self.id=id
self.distance=distance
# Slightly modified to take the cluster instead of creating it
def hcluster(clust,distance=pearson):
distances={}
currentclustid=-1
while len(clust)>1:
lowestpair=(0,1)
closest=distance(clust[0].vec,clust[1].vec)
# loop through every pair looking for the smallest distance
for i in xrange(len(clust)):
for j in xrange(i+1,len(clust)):
# distances is the cache of distance calculations
if (clust[i].id,clust[j].id) not in distances:
d = distances[(clust[i].id,clust[j].id)] = distance(clust[i].vec,clust[j].vec)
else:
d = distances[(clust[i].id,clust[j].id)]
if d<closest:
closest=d
lowestpair=(i,j)
# calculate the average of the two clusters
mergevec=[
(clust[lowestpair[0]].vec[i]+clust[lowestpair[1]].vec[i])/2.0
for i in xrange(len(clust[0].vec))]
# create the new cluster
newcluster=bicluster(mergevec,left=clust[lowestpair[0]],
right=clust[lowestpair[1]],
distance=closest,id=currentclustid)
print "Merged cluster"
# cluster ids that weren't in the original set are negative
currentclustid-=1
del clust[lowestpair[1]]
del clust[lowestpair[0]]
clust.append(newcluster)
return clust[0]
def drawdendrogram(clust,labels,jpeg='clusters.jpg'):
# height and width
h=getheight(clust)*20
w=1200
depth=getdepth(clust)
# width is fixed, so scale distances accordingly
scaling=float(w-150)/depth
# Create a new image with a white background
img=Image.new('RGB',(w,h),(255,255,255))
draw=ImageDraw.Draw(img)
draw.line((0,h/2,10,h/2),fill=(255,0,0))
# Draw the first node
drawnode(draw,clust,10,(h/2),scaling,labels)
img.save(jpeg,'JPEG')
def drawnode(draw,clust,x,y,scaling,labels):
if clust.id<0:
h1=getheight(clust.left)*20
h2=getheight(clust.right)*20
top=y-(h1+h2)/2
bottom=y+(h1+h2)/2
# Line length
ll=clust.distance*scaling
# Vertical line from this cluster to children
draw.line((x,top+h1/2,x,bottom-h2/2),fill=(255,0,0))
# Horizontal line to left item
draw.line((x,top+h1/2,x+ll,top+h1/2),fill=(255,0,0))
if clust.left.distance != 0.0:
draw.text((x+2,top+h1/2+5),"%.2f" % clust.left.distance,(0,0,0))
# Horizontal line to right item
draw.line((x,bottom-h2/2,x+ll,bottom-h2/2),fill=(255,0,0))
if clust.right.distance != 0.0:
draw.text((x+2,bottom-h2/2+5),"%.2f" % clust.right.distance,(0,0,0))
# Call the function to draw the left and right nodes
drawnode(draw,clust.left,x+ll,top+h1/2,scaling,labels)
drawnode(draw,clust.right,x+ll,bottom-h2/2,scaling,labels)
else:
# If this is an endpoint, draw the item label
draw.text((x+5,y-7),labels[clust.id],(0,0,0))
def getheight(clust):
# Is this an endpoint? Then the height is just 1
if clust.left==None and clust.right==None: return 1
# Otherwise the height is the same of the heights of
# each branch
return getheight(clust.left)+getheight(clust.right)
def getdepth(clust):
# The distance of an endpoint is 0.0
if clust.left==None and clust.right==None: return 0
# The distance of a branch is the greater of its two sides
# plus its own distance
return max(getdepth(clust.left),getdepth(clust.right))+clust.distance
"""
This module implements a simple thread pool.
"""
from Queue import Queue, Empty
from threading import Thread
import logging
# Get a logger
logger = logging.getLogger(__name__)
class ThreadPool (object):
def __init__(self, threads=8, poll_intv=5):
# Create our task queue
self.queue = Queue()
self.poll_intv = poll_intv
# Mark as not finished
self._shutdown = False
# Start some threads
self.workers = []
for i in xrange(threads):
t = Thread(target=self._worker)
t.setDaemon(True)
self.workers.append(t)
t.start()
def shutdown(self):
"Waits for the pool to terminate"
self._shutdown = True
self.queue.join()
for t in self.workers:
t.join()
def add(self, func):
"""
Adds a function to be executed.
func should take a reference to this threadpool object.
"""
self.queue.put(func)
def _worker(self):
"Worker main method"
while True:
try:
task = self.queue.get(timeout=self.poll_intv)
except Empty:
if self._shutdown:
break
continue
try:
task(self)
except Exception, e:
logger.exception("Caught an exception invoking a task!")
print e
finally:
self.queue.task_done()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment