Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created August 11, 2016 18:55
Show Gist options
  • Save mrocklin/fa8e30776e82d46015bfe8fd3b35fcb1 to your computer and use it in GitHub Desktop.
Save mrocklin/fa8e30776e82d46015bfe8fd3b35fcb1 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\"\n",
" align=\"right\"\n",
" width=\"30%\"\n",
" alt=\"Dask logo\">\n",
"\n",
"DataFrames on a Cluster\n",
"=======================\n",
"\n",
"<img src=\"http://www.numfocus.org/uploads/6/0/6/9/60696727/6893890_orig.png\"\n",
" align=\"left\"\n",
" width=\"30%\"\n",
" alt=\"Pandas logo\">\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read single dataframe from S3 with Pandas"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"from s3fs import S3FileSystem\n",
"\n",
"s3 = S3FileSystem(anon=True)\n",
"s3.ls('dask-data/nyc-taxi/2015/')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"with s3.open('dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv') as f:\n",
" df = pd.read_csv(f, nrows=5, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parallelize Pandas with Dask.dataframe\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from dask.distributed import Executor, progress\n",
"e = Executor('127.0.0.1:8786')\n",
"e"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"\n",
"df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv', \n",
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],\n",
" storage_options={'anon': True})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = e.persist(df)\n",
"progress(df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"Dask DataFrames\n",
"---------------\n",
"\n",
"* Coordinate many Pandas DataFrames across a cluster\n",
"* Faithfully implement a subset of the Pandas API\n",
"* Use Pandas under the hood (for speed and maturity)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"df.dtypes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%time len(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%time df.passenger_count.sum().compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Compute average trip distance grouped by passenger count\n",
"df.groupby(df.passenger_count).trip_distance.mean().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tip Fraction, grouped by day-of-week and hour-of-day"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)]\n",
"df2 = df2.assign(tip_fraction=df2.tip_amount / df2.fare_amount)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Group df.tpep_pickup_datetime by dayofweek and hour\n",
"dayofweek = df2.groupby(df2.tpep_pickup_datetime.dt.dayofweek).tip_fraction.mean() \n",
"hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean()\n",
"\n",
"dayofweek, hour = e.persist([dayofweek, hour])\n",
"progress(dayofweek, hour)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Plot results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from bokeh.plotting import figure, output_notebook, show\n",
"output_notebook()\n",
"\n",
"fig = figure(title='Tip Fraction',\n",
" x_axis_label='Hour of day',\n",
" y_axis_label='Tip Fraction',\n",
" height=300)\n",
"fig.line(x=hour.index.compute(), y=hour.compute(), line_width=3)\n",
"fig.y_range.start = 0\n",
"\n",
"show(fig)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment