Skip to content

Instantly share code, notes, and snippets.

@adam704a
Last active June 5, 2018 12:34
Show Gist options
  • Save adam704a/9167bf3f5600471f4a73be81809c8137 to your computer and use it in GitHub Desktop.
Save adam704a/9167bf3f5600471f4a73be81809c8137 to your computer and use it in GitHub Desktop.
The Great Tweet Migration
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Migrate Tweets from Mongo to a date partitioned format in S3\n",
"This is the process for moving 61 million tweets from a mongo database to Amazon's S3"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Connect to S3\n",
"# where are the aws credentails you might ask. these are configured using the aws cli.\n",
"import boto3\n",
"\n",
"s3 = boto3.resource('s3')\n",
"bucket = s3.Bucket('org.rti.mj.tweets')\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Connect to MongoDB\n",
"from pymongo import MongoClient\n",
"import datetime\n",
"\n",
"\n",
"# User exists in the mj_analysis database, so you have to authenticate there\n",
"c = MongoClient(\"mongodb://datascientist:youthoughtiwasgoingtoleavethepasswordinheredidntyou@mongo.ictedge.org/mj_sample\")\n",
"db = c.mj_sample\n",
"collection = db.mj_sample\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Handy little function for iterating over dates. You have to <3 the yeild here.\n",
"from datetime import timedelta, date\n",
"\n",
"\n",
"def daterange(start_time, end_time):\n",
" for n in range(int ((end_time - start_time).days*24)):\n",
" yield start_time + timedelta(hours=n)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Set date range\n",
"start_time = datetime.datetime(2014, 11, 15, 0, 0)\n",
"end_time = datetime.datetime(2016, 4, 21, 0, 0)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from bson import json_util\n",
"import json\n",
"\n",
"for n_time in daterange(start_time, end_time):\n",
" \n",
" # for each interval write a new file\n",
" with open('data.json', 'w') as outfile:\n",
" \n",
" print(\"querying \"+ str(n_time) +\" \"+ str(n_time + datetime.timedelta(hours=1)) ) \n",
" _end = n_time + datetime.timedelta(hours=1)\n",
" \n",
" for post in collection.find({\"created_at\" : { \"$lte\" : _end, \"$gte\": n_time} }):\n",
" \n",
" json.dump(post, outfile, default=json_util.default)\n",
" outfile.write('\\n')\n",
" \n",
" # Save file to S3 /year/month/date/part-0000n.json (where n = hour)\n",
" data = open('data.json', 'rb')\n",
" bucket.put_object(Key=str(n_time.year)+'/'+str(n_time.month)+'/'+str(n_time.day)+'/part-0000'+str(n_time.hour)+'.json', Body=data)\n",
" \n",
" outfile.close() "
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.4.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment