Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created July 2, 2021 00:03
Show Gist options
  • Save mrocklin/02ac8bbb0c671459644faed4146820c1 to your computer and use it in GitHub Desktop.
Save mrocklin/02ac8bbb0c671459644faed4146820c1 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test with a single file"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--2021-07-01 18:43:22-- https://data.gharchive.org/2015-01-01.json.gz\n",
"Resolving data.gharchive.org (data.gharchive.org)... 2606:4700:3037::6815:2eaf, 2606:4700:3035::ac43:a8ce, 172.67.168.206, ...\n",
"Connecting to data.gharchive.org (data.gharchive.org)|2606:4700:3037::6815:2eaf|:443... connected.\n",
"HTTP request sent, awaiting response... 404 Not Found\n",
"2021-07-01 18:43:23 ERROR 404: Not Found.\n",
"\n"
]
}
],
"source": [
"!wget https://data.gharchive.org/2015-01-01-15.json.gz"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"({'id': '2489651045',\n",
" 'type': 'CreateEvent',\n",
" 'actor': {'id': 665991,\n",
" 'login': 'petroav',\n",
" 'gravatar_id': '',\n",
" 'url': 'https://api.github.com/users/petroav',\n",
" 'avatar_url': 'https://avatars.githubusercontent.com/u/665991?'},\n",
" 'repo': {'id': 28688495,\n",
" 'name': 'petroav/6.828',\n",
" 'url': 'https://api.github.com/repos/petroav/6.828'},\n",
" 'payload': {'ref': 'master',\n",
" 'ref_type': 'branch',\n",
" 'master_branch': 'master',\n",
" 'description': \"Solution to homework and assignments from MIT's 6.828 (Operating Systems Engineering). Done in my spare time.\",\n",
" 'pusher_type': 'user'},\n",
" 'public': True,\n",
" 'created_at': '2015-01-01T15:00:00Z'},)"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.bag as db\n",
"import ujson\n",
"\n",
"records = db.read_text(\"2015-01-01-15.json.gz\").map(ujson.loads)\n",
"records.take(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## What kinds of events are there?\n",
"\n",
"We can't convert the entire dataset to a single Paruqet file. There are several different schemas overlapping here. We can filter out one subset though and work with that.\n",
"\n",
"PushEvents seem popular, and also include interesting information. Let's start there."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('CreateEvent', 1471),\n",
" ('PushEvent', 5815),\n",
" ('WatchEvent', 1230),\n",
" ('ReleaseEvent', 60),\n",
" ('PullRequestEvent', 474),\n",
" ('IssuesEvent', 545),\n",
" ('ForkEvent', 355),\n",
" ('GollumEvent', 61),\n",
" ('IssueCommentEvent', 844),\n",
" ('DeleteEvent', 260),\n",
" ('PullRequestReviewCommentEvent', 136),\n",
" ('CommitCommentEvent', 73),\n",
" ('MemberEvent', 25),\n",
" ('PublicEvent', 2)]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"records.pluck(\"type\").frequencies().compute()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"({'id': '2489651051',\n",
" 'type': 'PushEvent',\n",
" 'actor': {'id': 3854017,\n",
" 'login': 'rspt',\n",
" 'gravatar_id': '',\n",
" 'url': 'https://api.github.com/users/rspt',\n",
" 'avatar_url': 'https://avatars.githubusercontent.com/u/3854017?'},\n",
" 'repo': {'id': 28671719,\n",
" 'name': 'rspt/rspt-theme',\n",
" 'url': 'https://api.github.com/repos/rspt/rspt-theme'},\n",
" 'payload': {'push_id': 536863970,\n",
" 'size': 1,\n",
" 'distinct_size': 1,\n",
" 'ref': 'refs/heads/master',\n",
" 'head': '6b089eb4a43f728f0a594388092f480f2ecacfcd',\n",
" 'before': '437c03652caa0bc4a7554b18d5c0a394c2f3d326',\n",
" 'commits': [{'sha': '6b089eb4a43f728f0a594388092f480f2ecacfcd',\n",
" 'author': {'email': '5c682c2d1ec4073e277f9ba9f4bdf07e5794dabe@rspt.ch',\n",
" 'name': 'rspt'},\n",
" 'message': 'Fix main header height on mobile',\n",
" 'distinct': True,\n",
" 'url': 'https://api.github.com/repos/rspt/rspt-theme/commits/6b089eb4a43f728f0a594388092f480f2ecacfcd'}]},\n",
" 'public': True,\n",
" 'created_at': '2015-01-01T15:00:01Z'},\n",
" {'id': '2489651053',\n",
" 'type': 'PushEvent',\n",
" 'actor': {'id': 6339799,\n",
" 'login': 'izuzero',\n",
" 'gravatar_id': '',\n",
" 'url': 'https://api.github.com/users/izuzero',\n",
" 'avatar_url': 'https://avatars.githubusercontent.com/u/6339799?'},\n",
" 'repo': {'id': 28270952,\n",
" 'name': 'izuzero/xe-module-ajaxboard',\n",
" 'url': 'https://api.github.com/repos/izuzero/xe-module-ajaxboard'},\n",
" 'payload': {'push_id': 536863972,\n",
" 'size': 1,\n",
" 'distinct_size': 1,\n",
" 'ref': 'refs/heads/develop',\n",
" 'head': 'ec819b9df4fe612bb35bf562f96810bf991f9975',\n",
" 'before': '590433109f221a96cf19ea7a7d9a43ca333e3b3e',\n",
" 'commits': [{'sha': 'ec819b9df4fe612bb35bf562f96810bf991f9975',\n",
" 'author': {'email': 'df05f55543db3c62cf64f7438018ec37f3605d3c@gmail.com',\n",
" 'name': 'Eunsoo Lee'},\n",
" 'message': '#20 게시글 및 댓글 삭제 시 새로고침이 되는 문제 해결\\n\\n원래 의도는 새로고침이 되지 않고 확인창만으로 해결되어야 함.\\n기본 게시판 대응 플러그인에서 발생한 이슈.',\n",
" 'distinct': True,\n",
" 'url': 'https://api.github.com/repos/izuzero/xe-module-ajaxboard/commits/ec819b9df4fe612bb35bf562f96810bf991f9975'}]},\n",
" 'public': True,\n",
" 'created_at': '2015-01-01T15:00:01Z'})"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"records.filter(lambda record: record[\"type\"] == \"PushEvent\").take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Flatten data for dataframe friendly shape\n",
"\n",
"Now let's flatten down this data so that Pandas operations make sense for it. Also, let's filter out some of the attributes that we care about more."
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[{'user': 'rspt',\n",
" 'repo': 'rspt/rspt-theme',\n",
" 'created_at': '2015-01-01T15:00:01Z',\n",
" 'message': 'Fix main header height on mobile',\n",
" 'author': 'rspt'}]"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def process(record):\n",
" try:\n",
" for commit in record[\"payload\"][\"commits\"]:\n",
" yield {\n",
" \"user\": record[\"actor\"][\"login\"],\n",
" \"repo\": record[\"repo\"][\"name\"],\n",
" \"created_at\": record[\"created_at\"],\n",
" \"message\": commit[\"message\"],\n",
" \"author\": commit[\"author\"][\"name\"],\n",
" }\n",
" except KeyError:\n",
" pass\n",
" \n",
"[record] = records.filter(lambda record: record[\"type\"] == \"PushEvent\").take(1)\n",
"list(process(record))"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"({'user': 'rspt',\n",
" 'repo': 'rspt/rspt-theme',\n",
" 'created_at': '2015-01-01T15:00:01Z',\n",
" 'message': 'Fix main header height on mobile',\n",
" 'author': 'rspt'},\n",
" {'user': 'izuzero',\n",
" 'repo': 'izuzero/xe-module-ajaxboard',\n",
" 'created_at': '2015-01-01T15:00:01Z',\n",
" 'message': '#20 게시글 및 댓글 삭제 시 새로고침이 되는 문제 해결\\n\\n원래 의도는 새로고침이 되지 않고 확인창만으로 해결되어야 함.\\n기본 게시판 대응 플러그인에서 발생한 이슈.',\n",
" 'author': 'Eunsoo Lee'},\n",
" {'user': 'winterbe',\n",
" 'repo': 'winterbe/streamjs',\n",
" 'created_at': '2015-01-01T15:00:03Z',\n",
" 'message': 'Add comparator support for min, max operations',\n",
" 'author': 'Benjamin Winterberg'},\n",
" {'user': 'hermanwahyudi',\n",
" 'repo': 'hermanwahyudi/selenium',\n",
" 'created_at': '2015-01-01T15:00:03Z',\n",
" 'message': 'Update README.md',\n",
" 'author': 'Herman'},\n",
" {'user': 'jdilt',\n",
" 'repo': 'jdilt/jdilt.github.io',\n",
" 'created_at': '2015-01-01T15:00:03Z',\n",
" 'message': 'refine index page and about page',\n",
" 'author': 'jdilt'},\n",
" {'user': 'sundaymtn',\n",
" 'repo': 'sundaymtn/waterline',\n",
" 'created_at': '2015-01-01T15:00:04Z',\n",
" 'message': 'Thu Jan 1 10:00:02 EST 2015',\n",
" 'author': 'Seth Carter'},\n",
" {'user': 'zhouzhi2015',\n",
" 'repo': 'zhouzhi2015/temp',\n",
" 'created_at': '2015-01-01T15:00:04Z',\n",
" 'message': '测测',\n",
" 'author': '1184795629@qq.com'},\n",
" {'user': 'caleb-eades',\n",
" 'repo': 'caleb-eades/MinecraftServers',\n",
" 'created_at': '2015-01-01T15:00:05Z',\n",
" 'message': 'Auto Snapshot Server State',\n",
" 'author': 'caleb-eades'},\n",
" {'user': 'hcremers',\n",
" 'repo': 'ktgw0316/LightZone-l10n-nl',\n",
" 'created_at': '2015-01-01T15:00:05Z',\n",
" 'message': 'Translated by hcremers',\n",
" 'author': 'hans'},\n",
" {'user': 'vatsaaa',\n",
" 'repo': 'vatsaaa/mycodesnips',\n",
" 'created_at': '2015-01-01T15:00:08Z',\n",
" 'message': 'Update StrPalindrome.java',\n",
" 'author': 'Gudakesh Ankur Vatsa'})"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"flattened = records.filter(lambda record: record[\"type\"] == \"PushEvent\").map(process).flatten()\n",
"flattened.take(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Convert to a Dask Dataframe"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>user</th>\n",
" <th>repo</th>\n",
" <th>created_at</th>\n",
" <th>message</th>\n",
" <th>author</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>rspt</td>\n",
" <td>rspt/rspt-theme</td>\n",
" <td>2015-01-01T15:00:01Z</td>\n",
" <td>Fix main header height on mobile</td>\n",
" <td>rspt</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>izuzero</td>\n",
" <td>izuzero/xe-module-ajaxboard</td>\n",
" <td>2015-01-01T15:00:01Z</td>\n",
" <td>#20 게시글 및 댓글 삭제 시 새로고침이 되는 문제 해결\\n\\n원래 의도는 새로고...</td>\n",
" <td>Eunsoo Lee</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>winterbe</td>\n",
" <td>winterbe/streamjs</td>\n",
" <td>2015-01-01T15:00:03Z</td>\n",
" <td>Add comparator support for min, max operations</td>\n",
" <td>Benjamin Winterberg</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>hermanwahyudi</td>\n",
" <td>hermanwahyudi/selenium</td>\n",
" <td>2015-01-01T15:00:03Z</td>\n",
" <td>Update README.md</td>\n",
" <td>Herman</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>jdilt</td>\n",
" <td>jdilt/jdilt.github.io</td>\n",
" <td>2015-01-01T15:00:03Z</td>\n",
" <td>refine index page and about page</td>\n",
" <td>jdilt</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" user repo created_at \\\n",
"0 rspt rspt/rspt-theme 2015-01-01T15:00:01Z \n",
"1 izuzero izuzero/xe-module-ajaxboard 2015-01-01T15:00:01Z \n",
"2 winterbe winterbe/streamjs 2015-01-01T15:00:03Z \n",
"3 hermanwahyudi hermanwahyudi/selenium 2015-01-01T15:00:03Z \n",
"4 jdilt jdilt/jdilt.github.io 2015-01-01T15:00:03Z \n",
"\n",
" message author \n",
"0 Fix main header height on mobile rspt \n",
"1 #20 게시글 및 댓글 삭제 시 새로고침이 되는 문제 해결\\n\\n원래 의도는 새로고... Eunsoo Lee \n",
"2 Add comparator support for min, max operations Benjamin Winterberg \n",
"3 Update README.md Herman \n",
"4 refine index page and about page jdilt "
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = flattened.to_dataframe()\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Write to local parquet"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"df.to_parquet(\"test.parq\", compression=\"snappy\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Now it's easy to read more efficiently with dask dataframe"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>user</th>\n",
" <th>repo</th>\n",
" <th>created_at</th>\n",
" <th>message</th>\n",
" <th>author</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>rspt</td>\n",
" <td>rspt/rspt-theme</td>\n",
" <td>2015-01-01T15:00:01Z</td>\n",
" <td>Fix main header height on mobile</td>\n",
" <td>rspt</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>izuzero</td>\n",
" <td>izuzero/xe-module-ajaxboard</td>\n",
" <td>2015-01-01T15:00:01Z</td>\n",
" <td>#20 게시글 및 댓글 삭제 시 새로고침이 되는 문제 해결\\n\\n원래 의도는 새로고...</td>\n",
" <td>Eunsoo Lee</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>winterbe</td>\n",
" <td>winterbe/streamjs</td>\n",
" <td>2015-01-01T15:00:03Z</td>\n",
" <td>Add comparator support for min, max operations</td>\n",
" <td>Benjamin Winterberg</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>hermanwahyudi</td>\n",
" <td>hermanwahyudi/selenium</td>\n",
" <td>2015-01-01T15:00:03Z</td>\n",
" <td>Update README.md</td>\n",
" <td>Herman</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>jdilt</td>\n",
" <td>jdilt/jdilt.github.io</td>\n",
" <td>2015-01-01T15:00:03Z</td>\n",
" <td>refine index page and about page</td>\n",
" <td>jdilt</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" user repo created_at \\\n",
"0 rspt rspt/rspt-theme 2015-01-01T15:00:01Z \n",
"1 izuzero izuzero/xe-module-ajaxboard 2015-01-01T15:00:01Z \n",
"2 winterbe winterbe/streamjs 2015-01-01T15:00:03Z \n",
"3 hermanwahyudi hermanwahyudi/selenium 2015-01-01T15:00:03Z \n",
"4 jdilt jdilt/jdilt.github.io 2015-01-01T15:00:03Z \n",
"\n",
" message author \n",
"0 Fix main header height on mobile rspt \n",
"1 #20 게시글 및 댓글 삭제 시 새로고침이 되는 문제 해결\\n\\n원래 의도는 새로고... Eunsoo Lee \n",
"2 Add comparator support for min, max operations Benjamin Winterberg \n",
"3 Update README.md Herman \n",
"4 refine index page and about page jdilt "
]
},
"execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.dataframe as dd\n",
"\n",
"df = dd.read_parquet(\"test.parq\")\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scale\n",
"\n",
"Now that we have figured out our flow locally. Let's build a collection around the data for a full year, process it, and saving it to cloud object storage."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Get a list of dates and filenames\n",
"\n",
"Before we downloaded a file from the internet to our local computer and then wrapped a Dask Bag around it. \n",
"\n",
"Now we're going to need to create a Dask Bag around a list of files on the internet. We can pass a list of web addresses to Dask Bag as follows."
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"({'id': '2489651045',\n",
" 'type': 'CreateEvent',\n",
" 'actor': {'id': 665991,\n",
" 'login': 'petroav',\n",
" 'gravatar_id': '',\n",
" 'url': 'https://api.github.com/users/petroav',\n",
" 'avatar_url': 'https://avatars.githubusercontent.com/u/665991?'},\n",
" 'repo': {'id': 28688495,\n",
" 'name': 'petroav/6.828',\n",
" 'url': 'https://api.github.com/repos/petroav/6.828'},\n",
" 'payload': {'ref': 'master',\n",
" 'ref_type': 'branch',\n",
" 'master_branch': 'master',\n",
" 'description': \"Solution to homework and assignments from MIT's 6.828 (Operating Systems Engineering). Done in my spare time.\",\n",
" 'pusher_type': 'user'},\n",
" 'public': True,\n",
" 'created_at': '2015-01-01T15:00:00Z'},)"
]
},
"execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.bag as db\n",
"import ujson\n",
"\n",
"records = db.read_text([\"https://data.gharchive.org/2015-01-01-15.json.gz\"]).map(ujson.loads)\n",
"records.take(1)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we need to create a list of filenames of all dates in the last year. Fortunately tools like Pandas lets us do this easily."
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"# TODO, get a list of dates of the last year, turn this list into a list of filenames like what is above with https://data.gharchive"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Dask Cluster on the Cloud\n",
"\n",
"Now we make a Dask cluster close to where the data lives. We also need to make a software environment with the correct libraries."
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [],
"source": [
"# TODO\n",
"import coiled\n",
"\n",
"coiled.create_software_environment(\n",
" name=\"github-parquet\",\n",
" conda=[\"dask\", \"pyarrow\", \"s3fs\"],\n",
")\n",
"cluster = coiled.Cluster(software=\"github-parquet\", ...)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Transform and store data in cloud object store"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"(db\n",
" .read_text(filenames)\n",
" .map(ujson.loads)\n",
" .filter(lambda record: record[\"type\"] == \"PushEvent\")\n",
" .map(process)\n",
" .flatten()\n",
" .to_dataframe()\n",
" .to_parquet(\"...\", compression=\"snappy\")\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment