Last active
October 10, 2020 06:12
-
-
Save roaramburu/aa64e2bceeacb7189779f9b1b1afe7a9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Convert CSV Files to Apache ORC and Apache Parquet\n", | |
"\n", | |
"Easily convert gigabytes of CSV on AWS S3 files using `blazingSQL` and `cudf`.\n", | |
"\n", | |
"#### Import Packages Start Clusters" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"BlazingContext ready\n" | |
] | |
} | |
], | |
"source": [ | |
"from blazingsql import BlazingContext\n", | |
"bc = BlazingContext()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### Register AWS S3 Storage Plugin w/ BlazingSQL" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"(True,\n", | |
" '',\n", | |
" OrderedDict([('type', 's3'),\n", | |
" ('bucket_name', 'blazingsql-colab'),\n", | |
" ('access_key_id', ''),\n", | |
" ('secret_key', ''),\n", | |
" ('session_token', ''),\n", | |
" ('encryption_type', <S3EncryptionType.NONE: 1>),\n", | |
" ('kms_key_amazon_resource_name', ''),\n", | |
" ('endpoint_override', ''),\n", | |
" ('region', '')]))" | |
] | |
}, | |
"execution_count": 2, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"bc.s3('bsql_data', bucket_name='blazingsql-colab')" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### Create the Table on CSV files stored on AWS S3" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"bc.create_table('yellow_taxi', 's3://bsql_data/taxi_data/yellow_tripdata*')" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### SQL Query\n", | |
"We need to cast the `timestamp` fields because the datatype inference of `cudf` and therfore `blazingsql` fails on the two datetime columns. We can also cast the other fields, but datatype inference ran on the other columns successfully." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"query = '''\n", | |
"select\n", | |
"VendorID,\n", | |
"cast(tpep_pickup_datetime as timestamp) as tpep_pickup_datetime,\n", | |
"cast(tpep_dropoff_datetime as timestamp) as tpep_dropoff_datetime,\n", | |
"passenger_count,\n", | |
"trip_distance,\n", | |
"RatecodeID,\n", | |
"store_and_fwd_flag,\n", | |
"PULocationID,\n", | |
"DOLocationID,\n", | |
"payment_type,\n", | |
"fare_amount,\n", | |
"extra,\n", | |
"mta_tax,\n", | |
"tip_amount,\n", | |
"tolls_amount,\n", | |
"improvement_surcharge,\n", | |
"total_amount,\n", | |
"congestion_surcharge\n", | |
"from yellow_taxi\n", | |
"'''" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### Execute Query and Write Files\n", | |
"That's it, we use `.repartition()` to choose a partition size we want before writing, and then compress the data using `snappy`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 16.3 s, sys: 21.3 s, total: 37.6 s\n", | |
"Wall time: 39.6 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"# Query and Write to Apache ORC\n", | |
"bc.sql(query).to_orc('yellow_tripdata.orc', compression='snappy')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 16.8 s, sys: 21.6 s, total: 38.4 s\n", | |
"Wall time: 32.7 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"# Query and Write to Apache Parquet\n", | |
"bc.sql(query).to_parquet('yellow_tripdata.parquet', compression='snappy')" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "RAPIDS Stable", | |
"language": "python", | |
"name": "rapids-stable" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.6" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment