Created
March 25, 2021 04:11
-
-
Save nsivabalan/84dad29bc1ab567ebb6ee8c63b3969ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java | |
index 50a44ad0..32fab3ad 100644 | |
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java | |
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java | |
@@ -257,7 +257,7 @@ public abstract class AbstractHoodieLogRecordScanner { | |
LOG.error("Got IOException when reading log file", e); | |
throw new HoodieIOException("IOException when reading log file ", e); | |
} catch (Exception e) { | |
- LOG.error("Got exception when reading log file", e); | |
+ LOG.error("Got exception when reading log file with reader schema " + readerSchema.toString()); | |
throw new HoodieException("Exception when reading log file ", e); | |
} finally { | |
try { | |
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java | |
index 31fc352a..0a20580a 100644 | |
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java | |
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java | |
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log.block; | |
import org.apache.hudi.avro.HoodieAvroUtils; | |
import org.apache.hudi.common.fs.SizeAwareDataInputStream; | |
import org.apache.hudi.common.model.HoodieLogFile; | |
+import org.apache.hudi.common.table.log.HoodieLogFileReader; | |
import org.apache.hudi.common.util.Option; | |
import org.apache.hudi.exception.HoodieIOException; | |
@@ -35,6 +36,8 @@ import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.io.Encoder; | |
import org.apache.avro.io.EncoderFactory; | |
import org.apache.hadoop.fs.FSDataInputStream; | |
+import org.apache.log4j.LogManager; | |
+import org.apache.log4j.Logger; | |
import java.io.ByteArrayInputStream; | |
import java.io.ByteArrayOutputStream; | |
@@ -54,6 +57,8 @@ import javax.annotation.Nonnull; | |
*/ | |
public class HoodieAvroDataBlock extends HoodieDataBlock { | |
+ private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class); | |
+ | |
private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>(); | |
private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>(); | |
@@ -70,6 +75,8 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { | |
super(content, inputStream, readBlockLazily, | |
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, | |
footer); | |
+ LOG.warn("HoodieAvroDataBlock " + logFile.getPath().toString() + ", schema passed in " | |
+ + (readerSchema != null ? readerSchema.toString() : "null")); | |
} | |
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) { | |
@@ -140,28 +147,35 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { | |
schema = writerSchema; | |
} | |
- GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, schema); | |
- // 2. Get the total records | |
- int totalRecords = 0; | |
- if (logBlockVersion.hasRecordCount()) { | |
- totalRecords = dis.readInt(); | |
- } | |
- List<IndexedRecord> records = new ArrayList<>(totalRecords); | |
- | |
- // 3. Read the content | |
- for (int i = 0; i < totalRecords; i++) { | |
- int recordLength = dis.readInt(); | |
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(), | |
- recordLength, decoderCache.get()); | |
- decoderCache.set(decoder); | |
- IndexedRecord record = reader.read(null, decoder); | |
- records.add(record); | |
- dis.skipBytes(recordLength); | |
+ try { | |
+ GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, schema); | |
+ // 2. Get the total records | |
+ int totalRecords = 0; | |
+ if (logBlockVersion.hasRecordCount()) { | |
+ totalRecords = dis.readInt(); | |
+ } | |
+ List<IndexedRecord> records = new ArrayList<>(totalRecords); | |
+ | |
+ // 3. Read the content | |
+ for (int i = 0; i < totalRecords; i++) { | |
+ int recordLength = dis.readInt(); | |
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(), | |
+ recordLength, decoderCache.get()); | |
+ decoderCache.set(decoder); | |
+ IndexedRecord record = reader.read(null, decoder); | |
+ records.add(record); | |
+ dis.skipBytes(recordLength); | |
+ } | |
+ dis.close(); | |
+ this.records = records; | |
+ // Free up content to be GC'd, deflate | |
+ deflate(); | |
+ } catch (Exception e) { | |
+ LOG.warn("Deser recs. Exc thrown ... "); | |
+ LOG.warn("writer schema (from log block header) " + writerSchema.toString()); | |
+ LOG.warn("reader schema " + schema.toString()); | |
+ throw e; | |
} | |
- dis.close(); | |
- this.records = records; | |
- // Free up content to be GC'd, deflate | |
- deflate(); | |
} | |
//---------------------------------------------------------------------------------------- | |
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java | |
index f378f441..030c81e9 100644 | |
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java | |
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java | |
@@ -88,7 +88,7 @@ public abstract class AbstractRealtimeRecordReader { | |
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields()); | |
} else { | |
writerSchema = schemaFromLogFile; | |
- LOG.info("Writer Schema From Log => " + writerSchema.toString(true)); | |
+ LOG.warn("Writer Schema From Log => " + writerSchema.toString(true)); | |
} | |
// Add partitioning fields to writer schema for resulting row to contain null values for these fields | |
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); | |
@@ -96,6 +96,7 @@ public abstract class AbstractRealtimeRecordReader { | |
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) | |
: new ArrayList<>(); | |
writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); | |
+ LOG.warn("Writer Schema From Log. new writer schema " + writerSchema.toString()); | |
List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), | |
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields); | |
@@ -103,8 +104,9 @@ public abstract class AbstractRealtimeRecordReader { | |
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap); | |
// TODO(vc): In the future, the reader schema should be updated based on log files & be able | |
// to null out fields not present before | |
- | |
+ LOG.warn("Projectd fields " + Arrays.toString(projectionFields.toArray()) + ", hive schema " + hiveSchema.toString()); | |
readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields); | |
+ LOG.warn("Reader schema projected from writer schema " + readerSchema.toString()); | |
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", | |
split.getDeltaLogPaths(), split.getPath(), projectionFields)); | |
} | |
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | |
index 45b14528..a6d6896e 100644 | |
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | |
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | |
@@ -113,6 +113,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, | |
config: Configuration): Iterator[InternalRow] = | |
new Iterator[InternalRow] { | |
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) | |
+ println("Table schema in avro " + tableAvroSchema.toString()) | |
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) | |
private val requiredFieldPosition = | |
tableState.requiredStructSchema | |
@@ -306,6 +307,7 @@ private object HoodieMergeOnReadRDD { | |
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = { | |
val fs = FSUtils.getFs(split.tablePath, config) | |
+ println("Log schema in HoodieMergeOnReadRDD : " + logSchema.toString) | |
HoodieMergedLogRecordScanner.newBuilder() | |
.withFileSystem(fs) | |
.withBasePath(split.tablePath) | |
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | |
index 94d07b95..dc7d6835 100644 | |
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | |
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | |
@@ -144,6 +144,7 @@ private[hudi] object HoodieSparkSqlWriter { | |
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) | |
sparkContext.getConf.registerAvroSchemas(schema) | |
log.info(s"Registered avro schema : ${schema.toString(true)}") | |
+ println("Schema for incoming batch " + schema.toString(true)) | |
// Convert to RDD[HoodieRecord] | |
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment