Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Last active October 13, 2020 19:25
Show Gist options
  • Save HeartSaVioR/3e07476f8f9ddeb783428dbe9dfc5aff to your computer and use it in GitHub Desktop.
Save HeartSaVioR/3e07476f8f9ddeb783428dbe9dfc5aff to your computer and use it in GitHub Desktop.
experimenting iceberg column types with Spark
import org.apache.iceberg.{PartitionSpec, Schema, Table, TableProperties}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types
val hadoopCatalog = new HadoopCatalog(spark.sparkContext.hadoopConfiguration, "/Playground/iceberg-trial/warehouse-spark3.0.1")
val structSchema = Types.StructType.of(
Types.NestedField.optional(161, "a", Types.StringType.get()),
Types.NestedField.optional(162, "b", Types.IntegerType.get()))
val icebergSchema = new Schema(
Types.NestedField.optional(1, "col_b", Types.BooleanType.get()),
Types.NestedField.optional(2, "col_i", Types.IntegerType.get()),
Types.NestedField.optional(3, "col_l", Types.LongType.get()),
Types.NestedField.optional(4, "col_f", Types.FloatType.get()),
Types.NestedField.optional(5, "col_d", Types.DoubleType.get()),
Types.NestedField.optional(6, "col_da", Types.DateType.get()),
Types.NestedField.optional(8, "col_ts_tz", Types.TimestampType.withZone()),
Types.NestedField.optional(10, "col_s", Types.StringType.get()),
Types.NestedField.optional(12, "col_fi", Types.FixedType.ofLength(10)),
Types.NestedField.optional(13, "col_bi", Types.BinaryType.get()),
Types.NestedField.optional(14, "col_de_1", Types.DecimalType.of(9, 0)),
Types.NestedField.optional(15, "col_de_2", Types.DecimalType.of(11, 2)),
Types.NestedField.optional(16, "col_de_3", Types.DecimalType.of(38, 10)),
Types.NestedField.optional(18, "col_li", Types.ListType.ofOptional(181, Types.StringType.get())),
Types.NestedField.optional(19, "col_ma", Types.MapType.ofOptional(191, 192, Types.IntegerType.get(), Types.StringType.get()))
)
val icebergSpec = PartitionSpec.builderFor(icebergSchema).build()
hadoopCatalog.createTable(TableIdentifier.of("default", "table_convert_read_all_types_6"), icebergSchema, icebergSpec)
spark.read.format("iceberg").load("/Users/jlim/WorkArea/Playground/iceberg-trial/warehouse-spark3.0.1/default/table_convert_read_all_types_6")
case class StructData(a: String, b: Int)
case class Data(col_b: Boolean, col_i: Int, col_l: Long, col_f: Float, col_d: Double, col_s: String, col_fi: Array[Byte], col_bi: Array[Byte], col_de: Double, col_li: Seq[String], col_ma: Map[Int, String])
// col_da: java.sql.Date
// col_ts_tz: java.sql.Timestamp
// col_de_1 <= CAST(col_de AS DECIMAL(9, 0))
// col_de_2 <= CAST(col_de AS DECIMAL(11, 2))
// col_de_3 <= CAST(col_de AS DECIMAL(38, 10))
val df = Seq(Data(true, 1, 1L, 1.0f, 1.2d, "hello1", Array[Byte](1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array[Byte](1, 2, 3, 4), 1.23d, Seq("a", "b", "c", "d"), Map(1 -> "a", 2 -> "b", 3 -> "c"))).toDF
val dfWithColumns = df.withColumn("col_da", expr("current_date()")).withColumn("col_ts_tz", expr("now()")).withColumn("col_de_1", expr("CAST(col_de AS DECIMAL(9, 0))")).withColumn("col_de_2", expr("CAST(col_de AS DECIMAL(11, 2))")).withColumn("col_de_3", expr("CAST(col_de AS DECIMAL(38, 10))")).drop("col_de")
dfWithColumns.write.format("iceberg").mode("append").save("/Playground/iceberg-trial/warehouse-spark3.0.1/default/table_convert_read_all_types_6")
case class Data2(col_b: Boolean, col_i: Short, col_l: Int, col_f: Float, col_d: Float, col_s: String, col_fi: Array[Byte], col_bi: Array[Byte], col_de: Double, col_li: Seq[String], col_ma: Map[Int, String])
val df2 = Seq(Data2(true, 1.toShort, 1, 1.0f, 1.2f, "hello1", Array[Byte](1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array[Byte](1, 2, 3, 4), 1.23d, Seq("a", "b", "c", "d"), Map(1 -> "a", 2 -> "b", 3 -> "c"))).toDF
val df2WithColumns = df2.withColumn("col_da", expr("current_date()")).withColumn("col_ts_tz", expr("now()")).withColumn("col_de_1", expr("CAST(col_de AS DECIMAL(9, 0))")).withColumn("col_de_2", expr("CAST(col_de AS DECIMAL(11, 2))")).withColumn("col_de_3", expr("CAST(col_de AS DECIMAL(38, 10))")).drop("col_de")
df2WithColumns.write.format("iceberg").mode("append").save("/Users/jlim/WorkArea/Playground/iceberg-trial/warehouse-spark3.0.1/default/table_convert_read_all_types_6")
case class Data3(col_b: Boolean, col_i: Short, col_l: Int, col_f: Int, col_d: Int, col_s: String, col_fi: Array[Byte], col_bi: Array[Byte], col_de: Double, col_li: Seq[String], col_ma: Map[Int, String])
val df3 = Seq(Data3(true, 1.toShort, 1, 1, 2, "hello1", Array[Byte](1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array[Byte](1, 2, 3, 4), 1.23d, Seq("a", "b", "c", "d"), Map(1 -> "a", 2 -> "b", 3 -> "c"))).toDF
val df3WithColumns = df3.withColumn("col_da", expr("current_date()")).withColumn("col_ts_tz", expr("now()")).withColumn("col_de_1", expr("CAST(col_de AS DECIMAL(9, 0))")).withColumn("col_de_2", expr("CAST(col_de AS DECIMAL(11, 2))")).withColumn("col_de_3", expr("CAST(col_de AS DECIMAL(38, 10))")).drop("col_de")
df3WithColumns.write.format("iceberg").mode("append").save("/Users/jlim/WorkArea/Playground/iceberg-trial/warehouse-spark3.0.1/default/table_convert_read_all_types_6")
case class Data4(col_b: Boolean, col_i: Short, col_l: Int, col_f: Long, col_d: Int, col_s: String, col_fi: Array[Byte], col_bi: Array[Byte], col_de: Double, col_li: Seq[String], col_ma: Map[Int, String])
val df4 = Seq(Data4(true, 1.toShort, 1, 1, 2, "hello1", Array[Byte](1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array[Byte](1, 2, 3, 4), 1.23d, Seq("a", "b", "c", "d"), Map(1 -> "a", 2 -> "b", 3 -> "c"))).toDF
val df4WithColumns = df4.withColumn("col_da", expr("current_date()")).withColumn("col_ts_tz", expr("now()")).withColumn("col_de_1", expr("CAST(col_de AS DECIMAL(9, 0))")).withColumn("col_de_2", expr("CAST(col_de AS DECIMAL(11, 2))")).withColumn("col_de_3", expr("CAST(col_de AS DECIMAL(38, 10))")).drop("col_de")
df4WithColumns.write.format("iceberg").mode("append").save("/Users/jlim/WorkArea/Playground/iceberg-trial/warehouse-spark3.0.1/default/table_convert_read_all_types_6")
case class Data5(col_b: Boolean, col_i: Short, col_l: Int, col_f: Long, col_d: Int, col_s: String, col_fi: Array[Byte], col_bi: Array[Byte], col_de: Double, col_li: Seq[String], col_ma: Map[Int, String])
val df5 = Seq(Data5(true, 1.toShort, 1, 1, 2, "hello1", Array[Byte](1, 2, 3, 5, 6, 8, 10), Array[Byte](1, 2, 3, 4), 1.23d, Seq("a", "b", "c", "d"), Map(1 -> "a", 2 -> "b", 3 -> "c"))).toDF
val df5WithColumns = df5.withColumn("col_da", expr("current_date()")).withColumn("col_ts_tz", expr("now()")).withColumn("col_de_1", expr("CAST(col_de AS DECIMAL(9, 0))")).withColumn("col_de_2", expr("CAST(col_de AS DECIMAL(11, 2))")).withColumn("col_de_3", expr("CAST(col_de AS DECIMAL(38, 10))")).drop("col_de")
df5WithColumns.write.format("iceberg").mode("append").save("/Users/jlim/WorkArea/Playground/iceberg-trial/warehouse-spark3.0.1/default/table_convert_read_all_types_6")
case class Data6(col_b: Boolean, col_i: Short, col_l: Int, col_f: Long, col_d: Int, col_s: String, col_fi: Array[Byte], col_bi: Array[Byte], col_de: Double, col_li: Seq[String], col_ma: Map[Int, String], col_de_1: Long, col_de_2: Double)
val df6 = Seq(Data6(true, 1.toShort, 1, 1, 2, "hello1", Array[Byte](1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array[Byte](1, 2, 3, 4), 1.23d, Seq("a", "b", "c", "d"), Map(1 -> "a", 2 -> "b", 3 -> "c"), 10, 1.23d)).toDF
val df6WithColumns = df6.withColumn("col_da", expr("current_date()")).withColumn("col_ts_tz", expr("now()")).withColumn("col_de_3", expr("CAST(col_de AS DECIMAL(38, 10))")).drop("col_de")
df6WithColumns.write.format("iceberg").mode("append").save("/Users/jlim/WorkArea/Playground/iceberg-trial/warehouse-spark3.0.1/default/table_convert_read_all_types_6")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment