Created
December 17, 2016 15:43
-
-
Save mrocklin/89bccf2f4f37611b40c18967bb182066 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Write NYCTaxi to Parquet\n", | |
"=================" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"<Client: scheduler=\"127.0.0.1:8786\" processes=56 cores=56>" | |
] | |
}, | |
"execution_count": 1, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"from dask.distributed import Client, progress\n", | |
"c = Client('127.0.0.1:8786')\n", | |
"c" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import dask.dataframe as dd" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Read CSV" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import dask.dataframe as dd\n", | |
"dtype = {'VendorID': 'category', 'passenger_count': 'uint8', \n", | |
" 'RateCodeID': 'uint8', 'payment_type': 'uint8',\n", | |
" 'store_and_fwd_flag': 'category', 'payment_type': 'category',\n", | |
" 'trip_distance': 'float32', 'fare_amount': 'float32',\n", | |
" 'extra': 'float32', 'mta_tax': 'float32',\n", | |
" 'tip_amount': 'float32', 'tolls_amount': 'float32',\n", | |
" 'improvement_surcharge': 'float32', 'total_amount': 'float32'}\n", | |
"\n", | |
"df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv', dtype=dtype,\n", | |
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],\n", | |
" storage_options={'anon': True}) #, blocksize=400*2**20)\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"df = c.persist(df)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Index by datetime column" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"df = df.set_index('tpep_pickup_datetime')\n", | |
"df = c.persist(df)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"# lags before starting, perhaps to fetch the first block.\n", | |
"%time df.passenger_count.sum()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"df.dtypes" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"df.head()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Write to Parquet" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"from dask.dataframe.io.parquet import to_parquet" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import s3fs\n", | |
"fs = s3fs.S3FileSystem()\n", | |
"\n", | |
"#if fs.exists('dask-data/nyc-taxi/2015/parquet'):\n", | |
"# fs.rm('dask-data/nyc-taxi/2015/parquet', recursive=True)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"to_parquet('s3://dask-data/nyc-taxi/2015/parquet', df, has_nulls=False, object_encoding='utf8')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"del df" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Read from Parquet" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import s3fs\n", | |
"fs = s3fs.S3FileSystem()\n", | |
"plain = fs.du('dask-data/nyc-taxi/2015/parquet', deep=True, total=True) / 2**30\n", | |
"gzip = fs.du('dask-data/nyc-taxi/2015/parquet.gz', deep=True, total=True) / 2**30\n", | |
"plain, gzip, plain/gzip" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import fastparquet\n", | |
"pf = fastparquet.ParquetFile('dask-data/nyc-taxi/2015/parquet', open_with=fs.open)\n", | |
"pf.dtypes" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"pf.info" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"# raw download speed\n", | |
"%%time\n", | |
"with fs.open('dask-data/nyc-taxi/2015/parquet/part.1.parquet', 'rb') as f:\n", | |
" print(len(f.read()) / 2**20)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"from dask.dataframe.io.parquet import read_parquet\n", | |
"df3 = read_parquet('s3://dask-data/nyc-taxi/2015/parquet.gz', index='tpep_pickup_datetime',\n", | |
" categories=['VendorID', 'payment_type', 'store_and_fwd_flag'])" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"df3.head()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"len(df3)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"len(df3.passenger_count)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%time\n", | |
"df3.passenger_count.sum().compute()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"%%prun -D out.stats\n", | |
"import dask\n", | |
"with dask.set_options(get=dask.async.get_sync):\n", | |
" df3.passenger_count.sum().compute()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"c.restart()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.5.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment