Skip to content

Instantly share code, notes, and snippets.

@bvaradar
Last active October 13, 2020 23:18
Show Gist options
  • Save bvaradar/f2dbb50f7c7a82178c04d41603269306 to your computer and use it in GitHub Desktop.
Save bvaradar/f2dbb50f7c7a82178c04d41603269306 to your computer and use it in GitHub Desktop.
0.6 Patch for Debezium Support
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DebeziumAvroPayload.java
new file mode 100644
index 00000000..cae8f13a
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DebeziumAvroPayload.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.avro.generic.GenericRecord;
+
+public class DebeziumAvroPayload extends OverwriteWithLatestAvroPayload {
+
+ // Field is prefixed with a underscore by transformer to indicate metadata field
+ public static final String OP_FIELD = "_op";
+ public static final String DELETE_OP = "d";
+
+ public DebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public DebeziumAvroPayload(Option<GenericRecord> record) {
+ this(record.isPresent() ? record.get() : null, 0); // natural order
+ }
+
+ @Override
+ protected boolean isDeleteRecord(GenericRecord genericRecord) {
+ return (genericRecord.get(OP_FIELD) != null && genericRecord.get(OP_FIELD).toString().equalsIgnoreCase(
+ DELETE_OP));
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index d8dffdf1..ce3ef24f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -79,7 +79,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
* @param genericRecord instance of {@link GenericRecord} of interest.
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
- private boolean isDeleteRecord(GenericRecord genericRecord) {
+ protected boolean isDeleteRecord(GenericRecord genericRecord) {
Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DebeziumSchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DebeziumSchemaRegistryProvider.java
new file mode 100644
index 00000000..41fe9296
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DebeziumSchemaRegistryProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.FieldAssembler;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class DebeziumSchemaRegistryProvider extends SchemaRegistryProvider {
+
+ public DebeziumSchemaRegistryProvider(TypedProperties props,
+ JavaSparkContext jssc) {
+ super(props, jssc);
+ }
+
+ /**
+ * Debezium target schema is a nested structure with many metadata fields. This will
+ * flatten the schema structure and only require necessary metadata information
+ * @return
+ */
+ @Override
+ public Schema getTargetSchema() {
+ Schema registrySchema = super.getTargetSchema();
+
+ Field dataField = registrySchema.getField("after");
+ Field tsField = registrySchema.getField("ts_ms");
+ Field opField = registrySchema.getField("op");
+
+ // Initialize with metadata columns
+ FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
+ .record("formatted_debezium_payload")
+ .fields()
+ .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
+ .name("_" + opField.name()).type(opField.schema()).withDefault(null);
+
+ // Add data columns to schema
+ dataField.schema()
+ .getTypes()
+ // "after" field is a union with data schema and null schema, so we need to extract only the data schema portion
+ .get(dataField.schema().getIndexNamed(registrySchema.getNamespace() + ".Value"))
+ .getFields()
+ .forEach(field -> {
+ payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
+ });
+
+ Schema schema = payloadFieldAssembler.endRecord();
+ Schema newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+ AvroConversionUtils.convertAvroSchemaToStructType(schema), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
+ RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE);
+ LOG.info("Transformed Avro Schema is :" + schema.toString(true));
+ LOG.info("Transformed Avro Schema after converting :" + newSchema.toString(true));
+ return newSchema;
+ }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 636668a5..43b71496 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -25,6 +25,8 @@ import org.apache.hudi.exception.HoodieIOException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
@@ -38,6 +40,8 @@ import java.util.Collections;
*/
public class SchemaRegistryProvider extends SchemaProvider {
+ protected static final Logger LOG = LogManager.getLogger(SchemaRegistryProvider.class);
+
/**
* Configs supported.
*/
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/DebeziumTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/DebeziumTransformer.java
new file mode 100644
index 00000000..9286739c
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/DebeziumTransformer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+public class DebeziumTransformer implements Transformer {
+
+ protected static final Logger LOG = LogManager.getLogger(DebeziumTransformer.class);
+
+ @Override
+ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
+ TypedProperties properties) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("rowDataset schema is ");
+ rowDataset.printSchema();
+ }
+
+ Dataset<Row> insertedOrUpdatedData = rowDataset
+ .select("ts_ms", "op", "after.*")
+ .withColumnRenamed("ts_ms", "_ts_ms")
+ .withColumnRenamed("op", "_op")
+ .filter(rowDataset.col("op").notEqual("d"));
+
+ Dataset<Row> deletedData = rowDataset
+ .select("ts_ms", "op", "before.*")
+ .withColumnRenamed("ts_ms", "_ts_ms")
+ .withColumnRenamed("op", "_op")
+ .filter(rowDataset.col("op").equalTo("d"));
+
+ final Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TransformedData schema is ");
+ transformedData.printSchema();
+ }
+ return transformedData;
+ }
+}
\ No newline at end of file
@ashishmgofficial
Copy link

getting following -- error: corrupt patch at line 252

@bvaradar
Copy link
Author

Updated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment