Created
August 21, 2015 09:58
-
-
Save dhesse/e3aabc9946d52304a2b8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pymongo import MongoClient | |
from collections import defaultdict | |
from dateutil.parser import parse | |
from random import randint | |
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
class HourCounter(object): | |
'''Saves hourly counts form Twitter messages to MongoDB. | |
WARNING: This class is for illustration purposes only and will hammer | |
the MongoDB server with requests if you delete many instances at once. | |
This will almost certainly not be a good idea in production.''' | |
def __init__(self): | |
self.counts = defaultdict(int) | |
def __del__(self): | |
# this assumes we have MongoDB on localhost | |
collection = MongoClient().twitter_stats.counts | |
for time, count in self.counts.items(): | |
collection.update({'time': time}, | |
{'$inc': {'count': count}}, | |
upsert=True) | |
def __call__(self, tweet): | |
# make time stamp per-hour | |
time = parse(tweet['created_at']).replace(minute=0, | |
second=0, | |
microsecond=0) | |
self.counts[time] += 1 | |
def mkDate(): | |
'''Make a random date string.''' | |
dateString = "Fri Aug 21 {0}:{1}:{2} +0000 2015" | |
h, m, s = randint(0, 23), randint(0,59), randint(0,59) | |
return dateString.format(h,m,s) | |
if __name__ == '__main__': | |
# create Saprk and Spark Streaming contexts | |
sc = SparkContext('local', 'TweetCount') | |
ssc = StreamingContext(sc, 1) | |
# create a fake stream of tweets containing only the 'created_at' field | |
nRDD, nTweetsPerRDD = 50, 50 | |
dstream = ssc.queueStream( | |
[sc.parallelize([{'created_at': mkDate()} | |
for i in range(nTweetsPerRDD)]) | |
for i in range(nRDD)]) | |
# do the counts | |
dstream.foreachRDD(lambda RDD: RDD.foreach(HourCounter())) | |
ssc.start() | |
ssc.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment