Skip to content

Instantly share code, notes, and snippets.

@kindofblue
Forked from jzhuge/explain_so
Created December 27, 2023 21:50
Show Gist options
  • Save kindofblue/2e1ad5de819abc890a6744542f2a6c6b to your computer and use it in GitHub Desktop.
Save kindofblue/2e1ad5de819abc890a6744542f2a6c6b to your computer and use it in GitHub Desktop.
Writing to iceberg table with sort order after coalesce
== Parsed Logical Plan ==
OverwriteByExpression RelationV2[id#61L] prodhive.jzhuge.wso_244_so_1633558130, true, Map(matchByName -> true), true
+- Repartition 4, false
+- Sort [id#3L ASC NULLS FIRST], true
+- Project [id#0L AS id#3L]
+- Range (0, 100000, step=1, splits=Some(6))
== Analyzed Logical Plan ==
OverwriteByExpression RelationV2[id#61L] prodhive.jzhuge.wso_244_so_1633558130, true, Map(matchByName -> true), true
+- Repartition 4, false
+- Sort [id#3L ASC NULLS FIRST], true
+- Project [id#0L AS id#3L]
+- Range (0, 100000, step=1, splits=Some(6))
== Optimized Logical Plan ==
OverwriteByExpression RelationV2[id#61L] prodhive.jzhuge.wso_244_so_1633558130, true, Map(matchByName -> true), true, IcebergWrite schema=struct<id: bigint not null>, format=parquet
+- Sort [id#0L ASC NULLS FIRST], false
+- RepartitionByExpression [id#0L ASC NULLS FIRST], 500
+- Repartition 4, false
+- Range (0, 100000, step=1, splits=Some(6))
== Physical Plan ==
OverwriteByExpression prodhive.jzhuge.wso_244_so_1633558130, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap@7bdc4108, true, IcebergOverwriteByFilter(table=prodhive.jzhuge.wso_244_so_1633558130, format=PARQUET, filter=true)
+- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
+- Exchange rangepartitioning(id#0L ASC NULLS FIRST, 500)
+- Coalesce 4
+- *(1) Range (0, 100000, step=1, splits=6)
== Parsed Logical Plan ==
OverwritePartitionsDynamic RelationV2[id#77L, part#78L] prodhive.jzhuge.wso_244_so_part_1633558130, Map(matchByName -> true), true
+- Repartition 4, false
+- Project [id#3L, (id#3L % cast(2 as bigint)) AS part#65L]
+- Sort [id#3L ASC NULLS FIRST], true
+- Project [id#0L AS id#3L]
+- Range (0, 100000, step=1, splits=Some(6))
== Analyzed Logical Plan ==
OverwritePartitionsDynamic RelationV2[id#77L, part#78L] prodhive.jzhuge.wso_244_so_part_1633558130, Map(matchByName -> true), true
+- Repartition 4, false
+- Project [id#3L, (id#3L % cast(2 as bigint)) AS part#65L]
+- Sort [id#3L ASC NULLS FIRST], true
+- Project [id#0L AS id#3L]
+- Range (0, 100000, step=1, splits=Some(6))
== Optimized Logical Plan ==
OverwritePartitionsDynamic RelationV2[id#77L, part#78L] prodhive.jzhuge.wso_244_so_part_1633558130, Map(matchByName -> true), true, IcebergWrite schema=struct<id: bigint not null,part: bigint>, format=parquet
+- Sort [part#65L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false
+- RepartitionByExpression [part#65L ASC NULLS FIRST, id#0L ASC NULLS FIRST], 500
+- Repartition 4, false
+- Project [id#0L, (id#0L % 2) AS part#65L]
+- Range (0, 100000, step=1, splits=Some(6))
== Physical Plan ==
OverwritePartitionsDynamic prodhive.jzhuge.wso_244_so_part_1633558130, org.apache.spark.sql.util.CaseInsensitiveStringMap@7bdc4108, IcebergDynamicOverwrite(table=prodhive.jzhuge.wso_244_so_part_1633558130, format=PARQUET)
+- *(2) Sort [part#65L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0
+- Exchange rangepartitioning(part#65L ASC NULLS FIRST, id#0L ASC NULLS FIRST, 500)
+- Coalesce 4
+- *(1) Project [id#0L, (id#0L % 2) AS part#65L]
+- *(1) Range (0, 100000, step=1, splits=6)
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"nterop": {
"id": "43"
}
},
"outputs": [
{
"data": {
"text/plain": [
"Waiting for a Spark session to start..."
]
},
"metadata": {
"nterop": {
"id": "62"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"500"
]
},
"execution_count": 2,
"metadata": {
"nterop": {
"id": "63"
}
},
"output_type": "execute_result"
}
],
"source": [
"spark.conf.get(\"spark.sql.shuffle.partitions\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nterop": {
"id": "46"
}
},
"source": [
"|Table Type|Partitioned|Has Sort Order|Output File Count|\n",
"|----------|-----------|--------------|-----------------|\n",
"|Hive|No||4|\n",
"|Hive|Yes||4 + 4|\n",
"|Iceberg|No|No|4|\n",
"|Iceberg|Yes|No|4 + 4|\n",
"|Iceberg|No|Yes|500|\n",
"|Iceberg|Yes|Yes|250 + 251|"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"nterop": {
"id": "11"
}
},
"outputs": [
{
"data": {
"text/plain": [
"epoch = 1633558130\n",
"db = jzhuge\n",
"df = [id: bigint]\n"
]
},
"metadata": {
"nterop": {
"id": "64"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[id: bigint]"
]
},
"execution_count": 3,
"metadata": {
"nterop": {
"id": "65"
}
},
"output_type": "execute_result"
}
],
"source": [
"val epoch = java.time.Instant.now().getEpochSecond()\n",
"val db = sys.env.getOrElse(\"BD_USER\", \"jzhuge\")\n",
"val df = spark.range(100000).toDF(\"id\").sort(\"id\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nterop": {
"id": "36"
}
},
"source": [
"# Hive Table"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"nterop": {
"id": "4"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Stage 0:============================================> (3 + 1) / 4]"
]
},
{
"data": {
"text/plain": [
"hive_table = jzhuge.wso_244_hive_1633558130\n"
]
},
"metadata": {
"nterop": {
"id": "66"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"jzhuge.wso_244_hive_1633558130"
]
},
"execution_count": 4,
"metadata": {
"nterop": {
"id": "67"
}
},
"output_type": "execute_result"
}
],
"source": [
"val hive_table = s\"${db}.wso_244_hive_${epoch}\"\n",
"spark.sql(s\"DROP TABLE IF EXISTS ${hive_table}\")\n",
"spark.sql(s\"CREATE TABLE ${hive_table}(id bigint) USING hive_parquet\")\n",
"df.coalesce(4)\n",
" .write\n",
" .byName\n",
" .mode(\"overwrite\")\n",
" .insertInto(hive_table)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nterop": {
"id": "36"
}
},
"source": [
"# Hive Table - Partitioned"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"nterop": {
"id": "4"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"[Stage 1:==============> (1 + 3) / 4]"
]
},
{
"data": {
"text/plain": [
"hive_part_table = jzhuge.wso_244_hive_part_1633558130\n"
]
},
"metadata": {
"nterop": {
"id": "68"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"jzhuge.wso_244_hive_part_1633558130"
]
},
"execution_count": 5,
"metadata": {
"nterop": {
"id": "69"
}
},
"output_type": "execute_result"
}
],
"source": [
"val hive_part_table = s\"${db}.wso_244_hive_part_${epoch}\"\n",
"spark.sql(s\"DROP TABLE IF EXISTS ${hive_part_table}\")\n",
"spark.sql(s\"CREATE TABLE ${hive_part_table}(id bigint, part bigint) USING hive_parquet PARTITIONED BY (part)\")\n",
"df.withColumn(\"part\", col(\"id\") % 2)\n",
" .coalesce(4)\n",
" .write\n",
" .byName\n",
" .mode(\"overwrite\")\n",
" .insertInto(hive_part_table)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nterop": {
"id": "37"
}
},
"source": [
"# Iceberg Table - No Sort Order"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"nterop": {
"id": "4"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"[Stage 2:> (0 + 4) / 4]"
]
},
{
"data": {
"text/plain": [
"iceberg_no_sort_order_table = jzhuge.wso_244_nso_1633558130\n"
]
},
"metadata": {
"nterop": {
"id": "70"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"jzhuge.wso_244_nso_1633558130"
]
},
"execution_count": 6,
"metadata": {
"nterop": {
"id": "71"
}
},
"output_type": "execute_result"
}
],
"source": [
"val iceberg_no_sort_order_table = s\"${db}.wso_244_nso_${epoch}\"\n",
"spark.sql(s\"DROP TABLE IF EXISTS ${iceberg_no_sort_order_table}\")\n",
"spark.sql(s\"CREATE TABLE ${iceberg_no_sort_order_table}(id bigint)\")\n",
"df.coalesce(4)\n",
" .write\n",
" .byName\n",
" .mode(\"overwrite\")\n",
" .insertInto(iceberg_no_sort_order_table)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nterop": {
"id": "37"
}
},
"source": [
"# Iceberg Table - No Sort Order - Partitioned"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"nterop": {
"id": "4"
}
},
"outputs": [
{
"data": {
"text/plain": [
"iceberg_no_sort_order_part_table = jzhuge.wso_244_nso_part_1633558130\n"
]
},
"metadata": {
"nterop": {
"id": "72"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"jzhuge.wso_244_nso_part_1633558130"
]
},
"execution_count": 7,
"metadata": {
"nterop": {
"id": "73"
}
},
"output_type": "execute_result"
}
],
"source": [
"val iceberg_no_sort_order_part_table = s\"${db}.wso_244_nso_part_${epoch}\"\n",
"spark.sql(s\"DROP TABLE IF EXISTS ${iceberg_no_sort_order_part_table}\")\n",
"spark.sql(s\"CREATE TABLE ${iceberg_no_sort_order_part_table}(id bigint, part bigint) PARTITIONED BY (part)\")\n",
"df.withColumn(\"part\", col(\"id\") % 2)\n",
" .coalesce(4)\n",
" .write\n",
" .byName\n",
" .mode(\"overwrite\")\n",
" .insertInto(iceberg_no_sort_order_part_table)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nterop": {
"id": "38"
}
},
"source": [
"# Iceberg Table - Sort Order"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"nterop": {
"id": "4"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Stage 6:======================================================>(497 + 2) / 500]"
]
},
{
"data": {
"text/plain": [
"iceberg_sort_order_table = jzhuge.wso_244_so_1633558130\n"
]
},
"metadata": {
"nterop": {
"id": "74"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"jzhuge.wso_244_so_1633558130"
]
},
"execution_count": 8,
"metadata": {
"nterop": {
"id": "75"
}
},
"output_type": "execute_result"
}
],
"source": [
"val iceberg_sort_order_table = s\"${db}.wso_244_so_${epoch}\"\n",
"spark.sql(s\"DROP TABLE IF EXISTS ${iceberg_sort_order_table}\")\n",
"spark.sql(s\"CREATE TABLE ${iceberg_sort_order_table}(id bigint)\")\n",
"spark.sql(s\"ALTER TABLE ${iceberg_sort_order_table} WRITE LOCALLY ORDERED BY (id)\")\n",
"df.coalesce(4)\n",
" .write\n",
" .byName\n",
" .mode(\"overwrite\")\n",
" .insertInto(iceberg_sort_order_table)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nterop": {
"id": "38"
}
},
"source": [
"# Iceberg Table - Sort Order - Partitioned"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"nterop": {
"id": "4"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Stage 9:======================================================>(495 + 2) / 500]"
]
},
{
"data": {
"text/plain": [
"iceberg_sort_order_part_table = jzhuge.wso_244_so_part_1633558130\n"
]
},
"metadata": {
"nterop": {
"id": "76"
}
},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"jzhuge.wso_244_so_part_1633558130"
]
},
"execution_count": 9,
"metadata": {
"nterop": {
"id": "77"
}
},
"output_type": "execute_result"
}
],
"source": [
"val iceberg_sort_order_part_table = s\"${db}.wso_244_so_part_${epoch}\"\n",
"spark.sql(s\"DROP TABLE IF EXISTS ${iceberg_sort_order_part_table}\")\n",
"spark.sql(s\"CREATE TABLE ${iceberg_sort_order_part_table}(id bigint, part bigint) PARTITIONED BY (part)\")\n",
"spark.sql(s\"ALTER TABLE ${iceberg_sort_order_part_table} WRITE LOCALLY ORDERED BY (part, id)\")\n",
"df.withColumn(\"part\", col(\"id\") % 2)\n",
" .coalesce(4)\n",
" .write\n",
" .byName\n",
" .mode(\"overwrite\")\n",
" .insertInto(iceberg_sort_order_part_table)"
]
}
],
"metadata": {
"hide_input": false,
"kernelspec": {
"display_name": "Spark 2.4.4-unstable - Scala 2.11",
"language": "scala",
"name": "spark-2.4.4-unstable-scala"
},
"language_info": {
"codemirror_mode": "text/x-scala",
"file_extension": ".scala",
"mimetype": "text/x-scala",
"name": "scala",
"pygments_lexer": "scala",
"version": "2.11.8"
},
"nterop": {
"seedId": "77"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
spark-sql> SHOW TBLPROPERTIES jzhuge.wso_244_so_1633558130;
sort-order id ASC NULLS FIRST
format iceberg/parquet
current-snapshot-id 1949824784145854272
provider iceberg
write.distribution-mode none
spark-sql> SHOW TBLPROPERTIES jzhuge.wso_244_so_part_1633558130;
current-snapshot-id 6079782482913875055
format iceberg/parquet
sort-order part ASC NULLS FIRST, id ASC NULLS FIRST
provider iceberg
write.distribution-mode none
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment