Skip to content

Instantly share code, notes, and snippets.

@joao-parana
Created October 30, 2022 21:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joao-parana/2adbd97c70c701668cd5e778a92262ea to your computer and use it in GitHub Desktop.
Save joao-parana/2adbd97c70c701668cd5e778a92262ea to your computer and use it in GitHub Desktop.
TestAppendWithPartitioning for test identity transform partitioning in Iceberg version 1.0.0
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import static org.apache.iceberg.Files.localInput;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
public class TestAppendWithPartitioning {
static final Logger logger = LoggerFactory.getLogger(TestAppendWithPartitioning.class);
static {
System.setProperty("log4j2.configurationFile", "dlh-log4j2.xml");
// TestLog4JV1.test();
// TestLog4J2.test();
}
private static String lakehouse = "/tmp/iceberg-test-2";
private Catalog catalog;
private Schema schema;
public static void main(String[] args) {
// Execute "rm -rf /tmp/iceberg-test-2/bookings" before test
TestAppendWithPartitioning tableAppender = new TestAppendWithPartitioning();
tableAppender.setupCatalogAndSchema();
System.out.println("\n\n");
PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
Table table1 = tableAppender.createAndAppend(unpartitioned, "001");
PartitionSpec partition1 = PartitionSpec.builderFor(tableAppender.schema)
.identity("hotel_name")
.build();
Table table2 = tableAppender.createAndAppend(partition1, "002");
PartitionSpec partition2 = PartitionSpec.builderFor(tableAppender.schema)
.month("ts")
.build();
Table table3 = tableAppender.createAndAppend(partition2, "003");
PartitionSpec partition3 = PartitionSpec.builderFor(tableAppender.schema)
.identity("hotel_name")
.month("ts")
.build();
Table table4 = tableAppender.createAndAppend(partition3, "004");
PartitionSpec partition4 = PartitionSpec.builderFor(tableAppender.schema)
.day("ts")
.build();
Table table5 = tableAppender.createAndAppend(partition4, "005");
tableAppender.listIcebergTableContent(table1);
tableAppender.listIcebergTableContent(table2);
tableAppender.listIcebergTableContent(table3);
tableAppender.listIcebergTableContent(table4);
tableAppender.listIcebergTableContent(table5);
}
private void setupCatalogAndSchema() {
this.schema = new Schema(
required(1, "ts", Types.TimestampType.withoutZone()),
required(2, "hotel_id", Types.LongType.get()),
optional(3, "hotel_name", Types.StringType.get()),
required(4, "arrival_date", Types.DateType.get()),
required(5, "value", Types.DoubleType.get()));
Configuration conf = new Configuration();
conf.set(CatalogProperties.WAREHOUSE_LOCATION, TestAppendWithPartitioning.lakehouse);
String warehousePath = "file://" + TestAppendWithPartitioning.lakehouse;
this.catalog = new HadoopCatalog(conf, warehousePath);
}
private Table createAndAppend(
PartitionSpec partitionSpec, String sequentialFormatedNumber) {
Table table = null;
String dataDir = "bookings/rome_hotels_" + sequentialFormatedNumber;
TableIdentifier tableIdentifier =
TableIdentifier.parse(dataDir.replace('/', '.'));
try {
table = this.catalog.createTable(tableIdentifier, this.schema, partitionSpec);
logger.info("Iceberg table '" + table.name() +
"' created on '" + table.location() + "'");
} catch (Exception e) {
logger.error("Error when I tried to create table Iceberg: " + e.getLocalizedMessage());
}
File parquetFile = new File(this.lakehouse + "/" + dataDir + "/arq_" +
sequentialFormatedNumber + ".parquet");
this.appendDataToIcebergTable(this.schema, table, parquetFile);
return table;
}
private void appendDataToIcebergTable(
Schema schema, Table table, File parquetFile) {
logger.info("Appending records ");
List<GenericRecord> records = Lists.newArrayList();
// Generating one record for particioning test
GenericRecord genericRecord = GenericRecord.create(schema);
LocalDateTime localDateTime = LocalDateTime.now(ZoneId.of("UTC"));
genericRecord.setField("ts", localDateTime);
genericRecord.setField("hotel_id", 1000L);
genericRecord.setField("hotel_name", "hotel_name-" + 1000);
genericRecord.setField("arrival_date", LocalDate.of(2023, 1, 1));
genericRecord.setField("value", (double) 4.13);
records.add(genericRecord);
FileAppender<GenericRecord> fileAppender = null;
try {
fileAppender = Parquet.write(Files.localOutput(parquetFile))
.schema(table.schema())
.createWriterFunc(GenericParquetWriter::buildWriter)
.build();
fileAppender.addAll(records);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
fileAppender.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
final SortOrder sortOrder = SortOrder.builderFor(schema)
// .withOrderId(1)
.asc("ts", NullOrder.NULLS_FIRST)
.asc("hotel_name", NullOrder.NULLS_FIRST)
.build();
DataFile dataFile = DataFiles.builder(table.spec())
.withInputFile(localInput(parquetFile))
.withMetrics(fileAppender.metrics())
.withFormat(FileFormat.PARQUET)
.withSortOrder(sortOrder)
.build();
// A cada commit de transação é gerado um novo snapshot
Transaction t = table.newTransaction();
t.newAppend().appendFile(dataFile).commit();
// commit all changes to the table
t.commitTransaction();
}
private void listIcebergTableContent(Table table) {
IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(table);
String suffix = "" + table.name().split("_")[2] + ":\t";
try (CloseableIterable<Record> result = scanBuilder.build()) {
for (Record record : result) {
System.out.println(suffix + "\t" + record);
}
System.out.println(line());
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Error when try to get values from Iceberg table");
}
}
private static String line() {
return "-".repeat(127 - 41);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment