Skip to content

Instantly share code, notes, and snippets.

@dhesse
Created August 21, 2015 09:58
Show Gist options
  • Save dhesse/e3aabc9946d52304a2b8 to your computer and use it in GitHub Desktop.
Save dhesse/e3aabc9946d52304a2b8 to your computer and use it in GitHub Desktop.
from pymongo import MongoClient
from collections import defaultdict
from dateutil.parser import parse
from random import randint
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
class HourCounter(object):
'''Saves hourly counts form Twitter messages to MongoDB.
WARNING: This class is for illustration purposes only and will hammer
the MongoDB server with requests if you delete many instances at once.
This will almost certainly not be a good idea in production.'''
def __init__(self):
self.counts = defaultdict(int)
def __del__(self):
# this assumes we have MongoDB on localhost
collection = MongoClient().twitter_stats.counts
for time, count in self.counts.items():
collection.update({'time': time},
{'$inc': {'count': count}},
upsert=True)
def __call__(self, tweet):
# make time stamp per-hour
time = parse(tweet['created_at']).replace(minute=0,
second=0,
microsecond=0)
self.counts[time] += 1
def mkDate():
'''Make a random date string.'''
dateString = "Fri Aug 21 {0}:{1}:{2} +0000 2015"
h, m, s = randint(0, 23), randint(0,59), randint(0,59)
return dateString.format(h,m,s)
if __name__ == '__main__':
# create Saprk and Spark Streaming contexts
sc = SparkContext('local', 'TweetCount')
ssc = StreamingContext(sc, 1)
# create a fake stream of tweets containing only the 'created_at' field
nRDD, nTweetsPerRDD = 50, 50
dstream = ssc.queueStream(
[sc.parallelize([{'created_at': mkDate()}
for i in range(nTweetsPerRDD)])
for i in range(nRDD)])
# do the counts
dstream.foreachRDD(lambda RDD: RDD.foreach(HourCounter()))
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment