Skip to content

Instantly share code, notes, and snippets.

@mfinkle
Created April 5, 2016 23:03
Show Gist options
  • Save mfinkle/c08e8727516ad32f3d77934b4e0b3b73 to your computer and use it in GitHub Desktop.
Save mfinkle/c08e8727516ad32f3d77934b4e0b3b73 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"raw\n",
"[{'date': '20160101', 'bookmarks': 0, 'reading': 0, 'intent': 1, 'clientid': 'aaa'}, {'date': '20160101', 'bookmarks': 1, 'reading': 0, 'intent': 1, 'clientid': 'aaa'}, {'date': '20160101', 'bookmarks': 2, 'reading': 1, 'intent': 0, 'clientid': 'aaa'}, {'date': '20160102', 'bookmarks': 0, 'reading': 0, 'intent': 1, 'clientid': 'aaa'}, {'date': '20160102', 'bookmarks': 0, 'reading': 1, 'intent': 1, 'clientid': 'aaa'}, {'date': '20160102', 'bookmarks': 0, 'reading': 0, 'intent': 1, 'clientid': 'aaa'}]\n",
"mapped\n",
"[('aaa|20160101', {'bookmarks': 0, 'reading': 0, 'intent': 1}), ('aaa|20160101', {'bookmarks': 1, 'reading': 0, 'intent': 1}), ('aaa|20160101', {'bookmarks': 2, 'reading': 1, 'intent': 0}), ('aaa|20160102', {'bookmarks': 0, 'reading': 0, 'intent': 1}), ('aaa|20160102', {'bookmarks': 0, 'reading': 1, 'intent': 1}), ('aaa|20160102', {'bookmarks': 0, 'reading': 0, 'intent': 1})]\n",
"agged\n",
"[('aaa|20160102', {'bookmarks': 0, 'reading': 1, 'intent': 3}), ('aaa|20160101', {'bookmarks': 3, 'reading': 1, 'intent': 2})]\n"
]
}
],
"source": [
"def keyify(ping):\n",
" client = ping.pop(\"clientid\", None)\n",
" date = ping.pop(\"date\", None)\n",
" return (client + \"|\" + date, ping)\n",
"\n",
"def aggregate(agg1, agg2):\n",
" for metric, values in agg2.iteritems():\n",
" if metric not in agg1:\n",
" agg1[metric] = values\n",
" continue\n",
" else:\n",
" agg1[metric] += values\n",
"\n",
" return agg1\n",
"\n",
"rawdata = [\n",
" { 'clientid': 'aaa', 'date': '20160101', 'bookmarks': 0, 'intent': 1, 'reading': 0 },\n",
" { 'clientid': 'aaa', 'date': '20160101', 'bookmarks': 1, 'intent': 1, 'reading': 0 },\n",
" { 'clientid': 'aaa', 'date': '20160101', 'bookmarks': 2, 'intent': 0, 'reading': 1 },\n",
" { 'clientid': 'aaa', 'date': '20160102', 'bookmarks': 0, 'intent': 1, 'reading': 0 },\n",
" { 'clientid': 'aaa', 'date': '20160102', 'bookmarks': 0, 'intent': 1, 'reading': 1 },\n",
" { 'clientid': 'aaa', 'date': '20160102', 'bookmarks': 0, 'intent': 1, 'reading': 0 }\n",
"]\n",
"\n",
"print \"raw\"\n",
"print rawdata\n",
"\n",
"dataset = sc.parallelize(rawdata)\n",
"\n",
"mapped = dataset.map(keyify)\n",
"print \"mapped\"\n",
"print mapped.take(6)\n",
"\n",
"agged = mapped.reduceByKey(aggregate)\n",
"print \"agged\"\n",
"print agged.take(6)\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"raw\n",
"[('aaa,20160101,intent', 1), ('aaa,20160101,bookmark', 1), ('aaa,20160101,intent', 1), ('aaa,20160101,reading', 1), ('aaa,20160101,bookmark', 2), ('aaa,20160102,intent', 1), ('aaa,20160102,reading', 1), ('aaa,20160102,intent', 0), ('aaa,20160102,reading', 1), ('aaa,20160102,intent', 1), ('aaa,20160102,bookmark', 1)]\n",
"agged\n",
"[('aaa,20160101,bookmark', 3), ('aaa,20160101,intent', 2), ('aaa,20160102,reading', 2), ('aaa,20160102,intent', 2), ('aaa,20160101,reading', 1), ('aaa,20160102,bookmark', 1)]\n"
]
}
],
"source": [
"otherraw = [\n",
" ('aaa,20160101,intent', 1),\n",
" ('aaa,20160101,bookmark', 1),\n",
" ('aaa,20160101,intent', 1),\n",
" ('aaa,20160101,reading', 1),\n",
" ('aaa,20160101,bookmark', 2),\n",
" ('aaa,20160102,intent', 1),\n",
" ('aaa,20160102,reading', 1),\n",
" ('aaa,20160102,intent', 0),\n",
" ('aaa,20160102,reading', 1),\n",
" ('aaa,20160102,intent', 1),\n",
" ('aaa,20160102,bookmark', 1)\n",
"]\n",
"\n",
"print \"raw\"\n",
"print otherraw\n",
"\n",
"otherraw = sc.parallelize(otherraw)\n",
"\n",
"otheragged = otherraw.reduceByKey(lambda x, y: x + y)\n",
"print \"agged\"\n",
"print otheragged.take(10)\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
@gregglind
Copy link

sc.parallelize solves all ills!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment