Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created March 25, 2021 04:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/84dad29bc1ab567ebb6ee8c63b3969ec to your computer and use it in GitHub Desktop.
Save nsivabalan/84dad29bc1ab567ebb6ee8c63b3969ec to your computer and use it in GitHub Desktop.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 50a44ad0..32fab3ad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -257,7 +257,7 @@ public abstract class AbstractHoodieLogRecordScanner {
LOG.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
} catch (Exception e) {
- LOG.error("Got exception when reading log file", e);
+ LOG.error("Got exception when reading log file with reader schema " + readerSchema.toString());
throw new HoodieException("Exception when reading log file ", e);
} finally {
try {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 31fc352a..0a20580a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFileReader;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -35,6 +36,8 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -54,6 +57,8 @@ import javax.annotation.Nonnull;
*/
public class HoodieAvroDataBlock extends HoodieDataBlock {
+ private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
+
private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
@@ -70,6 +75,8 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
super(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
footer);
+ LOG.warn("HoodieAvroDataBlock " + logFile.getPath().toString() + ", schema passed in "
+ + (readerSchema != null ? readerSchema.toString() : "null"));
}
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
@@ -140,28 +147,35 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
schema = writerSchema;
}
- GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, schema);
- // 2. Get the total records
- int totalRecords = 0;
- if (logBlockVersion.hasRecordCount()) {
- totalRecords = dis.readInt();
- }
- List<IndexedRecord> records = new ArrayList<>(totalRecords);
-
- // 3. Read the content
- for (int i = 0; i < totalRecords; i++) {
- int recordLength = dis.readInt();
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(),
- recordLength, decoderCache.get());
- decoderCache.set(decoder);
- IndexedRecord record = reader.read(null, decoder);
- records.add(record);
- dis.skipBytes(recordLength);
+ try {
+ GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, schema);
+ // 2. Get the total records
+ int totalRecords = 0;
+ if (logBlockVersion.hasRecordCount()) {
+ totalRecords = dis.readInt();
+ }
+ List<IndexedRecord> records = new ArrayList<>(totalRecords);
+
+ // 3. Read the content
+ for (int i = 0; i < totalRecords; i++) {
+ int recordLength = dis.readInt();
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(),
+ recordLength, decoderCache.get());
+ decoderCache.set(decoder);
+ IndexedRecord record = reader.read(null, decoder);
+ records.add(record);
+ dis.skipBytes(recordLength);
+ }
+ dis.close();
+ this.records = records;
+ // Free up content to be GC'd, deflate
+ deflate();
+ } catch (Exception e) {
+ LOG.warn("Deser recs. Exc thrown ... ");
+ LOG.warn("writer schema (from log block header) " + writerSchema.toString());
+ LOG.warn("reader schema " + schema.toString());
+ throw e;
}
- dis.close();
- this.records = records;
- // Free up content to be GC'd, deflate
- deflate();
}
//----------------------------------------------------------------------------------------
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index f378f441..030c81e9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -88,7 +88,7 @@ public abstract class AbstractRealtimeRecordReader {
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
- LOG.info("Writer Schema From Log => " + writerSchema.toString(true));
+ LOG.warn("Writer Schema From Log => " + writerSchema.toString(true));
}
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
@@ -96,6 +96,7 @@ public abstract class AbstractRealtimeRecordReader {
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
+ LOG.warn("Writer Schema From Log. new writer schema " + writerSchema.toString());
List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
@@ -103,8 +104,9 @@ public abstract class AbstractRealtimeRecordReader {
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before
-
+ LOG.warn("Projectd fields " + Arrays.toString(projectionFields.toArray()) + ", hive schema " + hiveSchema.toString());
readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
+ LOG.warn("Reader schema projected from writer schema " + readerSchema.toString());
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaLogPaths(), split.getPath(), projectionFields));
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 45b14528..a6d6896e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -113,6 +113,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
+ println("Table schema in avro " + tableAvroSchema.toString())
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
tableState.requiredStructSchema
@@ -306,6 +307,7 @@ private object HoodieMergeOnReadRDD {
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
+ println("Log schema in HoodieMergeOnReadRDD : " + logSchema.toString)
HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.tablePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 94d07b95..dc7d6835 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -144,6 +144,7 @@ private[hudi] object HoodieSparkSqlWriter {
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
+ println("Schema for incoming batch " + schema.toString(true))
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment