Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Write NYCTaxi to Parquet\n",
"================="
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Client: scheduler=\"127.0.0.1:8786\" processes=56 cores=56>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client, progress\n",
"c = Client('127.0.0.1:8786')\n",
"c"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import dask.dataframe as dd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read CSV"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import dask.dataframe as dd\n",
"dtype = {'VendorID': 'category', 'passenger_count': 'uint8', \n",
" 'RateCodeID': 'uint8', 'payment_type': 'uint8',\n",
" 'store_and_fwd_flag': 'category', 'payment_type': 'category',\n",
" 'trip_distance': 'float32', 'fare_amount': 'float32',\n",
" 'extra': 'float32', 'mta_tax': 'float32',\n",
" 'tip_amount': 'float32', 'tolls_amount': 'float32',\n",
" 'improvement_surcharge': 'float32', 'total_amount': 'float32'}\n",
"\n",
"df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv', dtype=dtype,\n",
" parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],\n",
" storage_options={'anon': True}) #, blocksize=400*2**20)\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df = c.persist(df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Index by datetime column"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"df = df.set_index('tpep_pickup_datetime')\n",
"df = c.persist(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# lags before starting, perhaps to fetch the first block.\n",
"%time df.passenger_count.sum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df.dtypes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Write to Parquet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from dask.dataframe.io.parquet import to_parquet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import s3fs\n",
"fs = s3fs.S3FileSystem()\n",
"\n",
"#if fs.exists('dask-data/nyc-taxi/2015/parquet'):\n",
"# fs.rm('dask-data/nyc-taxi/2015/parquet', recursive=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"to_parquet('s3://dask-data/nyc-taxi/2015/parquet', df, has_nulls=False, object_encoding='utf8')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"del df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read from Parquet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import s3fs\n",
"fs = s3fs.S3FileSystem()\n",
"plain = fs.du('dask-data/nyc-taxi/2015/parquet', deep=True, total=True) / 2**30\n",
"gzip = fs.du('dask-data/nyc-taxi/2015/parquet.gz', deep=True, total=True) / 2**30\n",
"plain, gzip, plain/gzip"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import fastparquet\n",
"pf = fastparquet.ParquetFile('dask-data/nyc-taxi/2015/parquet', open_with=fs.open)\n",
"pf.dtypes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"pf.info"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# raw download speed\n",
"%%time\n",
"with fs.open('dask-data/nyc-taxi/2015/parquet/part.1.parquet', 'rb') as f:\n",
" print(len(f.read()) / 2**20)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from dask.dataframe.io.parquet import read_parquet\n",
"df3 = read_parquet('s3://dask-data/nyc-taxi/2015/parquet.gz', index='tpep_pickup_datetime',\n",
" categories=['VendorID', 'payment_type', 'store_and_fwd_flag'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"df3.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"len(df3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"len(df3.passenger_count)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"df3.passenger_count.sum().compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%prun -D out.stats\n",
"import dask\n",
"with dask.set_options(get=dask.async.get_sync):\n",
" df3.passenger_count.sum().compute()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"c.restart()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment