Skip to content

Instantly share code, notes, and snippets.

@robo-corg
Created April 24, 2019 21:14
Show Gist options
  • Save robo-corg/4752a40cb643318464e58ab66cf7d23e to your computer and use it in GitHub Desktop.
Save robo-corg/4752a40cb643318464e58ab66cf7d23e to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"\n",
"spark = (\n",
" SparkSession.builder\n",
" .appName('hashing-repro')\n",
" .getOrCreate()\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import functions as psf"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import uuid"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"generate_uuid = psf.udf(lambda row_id: str(uuid.uuid4()))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create a bunch of uuids"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Row(id=0, uuid='ff701c08-63ef-48ff-9ccf-4faeb10add4b')"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"test_df = spark.range(10000).withColumn('uuid', generate_uuid('id'))\n",
"test_df.head()"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"test_df = test_df.repartition(100, 'uuid')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Write to parquet then read back\n",
"\n",
"This seems to be important. In the real world this would be the boundary between batch jobs or something like that..."
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"test_df.write.parquet('/tmp/test.parquet', mode='overwrite')"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"test_df = spark.read.parquet('/tmp/test.parquet')"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----+\n",
"|SPARK_PARTITION_ID()|count|\n",
"+--------------------+-----+\n",
"| 1| 1404|\n",
"| 6| 1171|\n",
"| 3| 1301|\n",
"| 5| 1232|\n",
"| 4| 1278|\n",
"| 7| 756|\n",
"| 2| 1342|\n",
"| 0| 1516|\n",
"+--------------------+-----+\n",
"\n"
]
}
],
"source": [
"test_df.groupby(psf.spark_partition_id()).count().show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Grab the first 100 rows\n",
"\n",
"This the right way to do this but there shouldn't be any harm to it right?"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['27feca96-7e5a-4072-b3ec-b17501d0f9d6',\n",
" 'cf639bb9-3b4b-4440-918e-86c6d9d60e1f',\n",
" '5001f034-442f-4988-a57b-b77be8108f16']"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"first_uuids = [row['uuid'] for row in test_df.collect()[:100]]\n",
"\n",
"first_uuids[:3]"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"test_filtered_df = test_df.filter(test_df.uuid.isin(first_uuids)).repartition(10, 'uuid')"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----+\n",
"|SPARK_PARTITION_ID()|count|\n",
"+--------------------+-----+\n",
"| 1| 100|\n",
"+--------------------+-----+\n",
"\n"
]
}
],
"source": [
"test_filtered_df.groupby(psf.spark_partition_id()).count().show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Other partition sizes will also produce bad results"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-----+\n",
"|SPARK_PARTITION_ID()|count|\n",
"+--------------------+-----+\n",
"| 1| 100|\n",
"+--------------------+-----+\n",
"\n"
]
}
],
"source": [
"test_filtered_df = test_df.filter(test_df.uuid.isin(first_uuids)).repartition(5, 'uuid').groupby(psf.spark_partition_id()).count().show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment