Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created May 20, 2014 15:58
Show Gist options
  • Save mrocklin/8b3062b7d5dd06b55947 to your computer and use it in GitHub Desktop.
Save mrocklin/8b3062b7d5dd06b55947 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "",
"signature": "sha256:bffcd177214e7e623651a94c4b5af4d223f48ff82690c43ecb58a0c690661dae"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Ingesting Bitcoin with `blaze.data`"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from blaze.data import *\n",
"schema = '{transaction: int, sender: int, recipient: int, date: datetime, value: real}'\n",
"dd = CSV('user_edges.txt', schema=schema)\n",
"tuple(dd.py[:2])"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 1,
"text": [
"((1, 2, 2, datetime.datetime(2013, 4, 10, 14, 22, 50), 24.375),\n",
" (1, 2, 782477, datetime.datetime(2013, 4, 10, 14, 22, 50), 0.7709))"
]
}
],
"prompt_number": 1
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Specifying computation with `blaze.expr`"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from blaze.expr.table import *\n",
"t = TableSymbol(schema)\n",
"outdegree = (By(t, t['sender'], t['recipient'].count())\n",
" .sort('recipient', ascending=False)\n",
" .head(10))"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Executing queries with `blaze.compute`"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from blaze.compute.python import compute"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"list(compute(outdegree, dd.py[:]))"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 4,
"text": [
"[(25, 8196276),\n",
" (1374, 1344435),\n",
" (11, 532534),\n",
" (645014, 315872),\n",
" (29, 234864),\n",
" (3958, 142709),\n",
" (35087, 127782),\n",
" (193558, 127151),\n",
" (74, 122034),\n",
" (870051, 120088)]"
]
}
],
"prompt_number": 4
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example: *Who are the big spenders?*"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"big_spenders = (By(t, t['sender'], t['value'].sum())\n",
" .sort('value', ascending=False)\n",
" .head(10))\n"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 5
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"list(compute(big_spenders, dd.py[:]))"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 6,
"text": [
"[(11, 52461821.94165766),\n",
" (1374, 23394277.034151807),\n",
" (25, 13178095.975724494),\n",
" (29, 5330179.983046564),\n",
" (12564, 3669712.399824968),\n",
" (782688, 2929023.064647781),\n",
" (74, 2122710.961163437),\n",
" (91638, 2094827.8251607446),\n",
" (27, 2058124.131470339),\n",
" (20, 1182868.148780274)]"
]
}
],
"prompt_number": 6
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Lets do that again, now in SQL\n",
"\n",
"This will require\n",
"\n",
"1. Migrating our data from a CSV file into a SQLite database\n",
"2. Executing our query inside SQLite\n",
"\n",
"Blaze will help us with both the migration and the computation"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Migration\n",
"\n",
"First lets migrate our data to SQL"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sql = SQL('sqlite:///btc.db', 'user_edges', schema=schema)\n",
"# sql.extend(dd)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 7
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Execute\n",
"\n",
"That was easy, now lets form a query and then execute it against the SQL backend"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from blaze.compute.sql import dispatch, TableExpr\n",
"from blaze.data.sql import SQL\n",
"@dispatch(TableExpr, SQL)\n",
"def compute(t, s):\n",
" query = compute(t, s.table)\n",
" with sql.engine.connect() as conn:\n",
" for item in conn.execute(query):\n",
" yield item"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 8
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Translate Blaze expression into a SQLAlchemy query\n",
"from blaze.compute.sql import compute\n",
"query = compute(big_spenders, sql.table)\n",
"print(query)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"SELECT user_edges.sender, sum(user_edges.value) AS value \n",
"FROM user_edges GROUP BY user_edges.sender ORDER BY value DESC\n",
" LIMIT :param_1\n"
]
}
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"with sql.engine.connect() as conn:\n",
" for row in conn.execute(query):\n",
" print row"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Affecting the compute system\n",
"\n",
"Blaze acts as a thin layer on top of other mature compute systems. Maximizing performance often requires diving deep into your backend. We endeavor to keep your backend just barely under the surface, always within reach as your performance demands increase.\n",
"\n",
"Here the SQLAlchemy engine is directly accessible. We add an index onto the data to accelerate queries."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"with sql.engine.connect() as conn:\n",
" conn.execute('CREATE INDEX from_index ON user_edges (senders);')"
],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment