Skip to content

Instantly share code, notes, and snippets.

@longliveenduro
Created September 13, 2017 08:25
Show Gist options
  • Save longliveenduro/10aa3baeea15fd577b3bfac2f1494e2d to your computer and use it in GitHub Desktop.
Save longliveenduro/10aa3baeea15fd577b3bfac2f1494e2d to your computer and use it in GitHub Desktop.
Kafka Connect Custom Transformation ExtractFields
package net.gutefrage.connector.transforms
import java.util
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements.{requireMap, requireStruct}
import org.apache.kafka.connect.transforms.util.SimpleConfig
import scala.collection.JavaConverters._
object ExtractFields {
private val FIELDS_CONFIG = "fields"
private val STRUCT_NAME_CONFIG = "structName"
private val PURPOSE = "fields extraction"
val OVERVIEW_DOC: String = "Extract the specified fields from a Struct when schema present, or a Map in the case of schemaless data." +
"<p/>Use the concrete transformation type designed for the record key (<code>" + classOf[ExtractFields.Key[_ <: Nothing]].getName + "</code>) " +
"or value (<code>" + classOf[ExtractFields.Value[_ <: Nothing]].getName + "</code>)."
val CONFIG_DEF: ConfigDef = new ConfigDef()
.define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Field names to extract.")
.define(STRUCT_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Name for the extracted struct")
class Key[R <: ConnectRecord[R]] extends ExtractFields[R] {
override protected def operatingSchema(record: R): Schema = record.keySchema
override protected def operatingValue(record: R): AnyRef = record.key
override protected def newRecord(record: R, updatedSchema: Schema, updatedValue: AnyRef): R =
record.newRecord(record.topic, record.kafkaPartition, updatedSchema, updatedValue, record.valueSchema, record.value, record.timestamp)
}
class Value[R <: ConnectRecord[R]] extends ExtractFields[R] {
override protected def operatingSchema(record: R): Schema = record.valueSchema
override protected def operatingValue(record: R): AnyRef = record.value
override protected def newRecord(record: R, updatedSchema: Schema, updatedValue: AnyRef): R =
record.newRecord(record.topic, record.kafkaPartition, record.keySchema, record.key, updatedSchema, updatedValue, record.timestamp)
}
}
abstract class ExtractFields[R <: ConnectRecord[R]] extends Transformation[R] {
@transient
private var fieldNames: List[String] = null
@transient
private var structName: String = null
def configure(props: util.Map[String, _]): Unit = {
val config = new SimpleConfig(ExtractFields.CONFIG_DEF, props)
fieldNames = config.getList(ExtractFields.FIELDS_CONFIG).asScala.toList
structName = config.getString(ExtractFields.STRUCT_NAME_CONFIG)
}
def schemaFromNestedField(remainingFieldsInTree: List[String], value: Struct): Schema = {
remainingFieldsInTree match {
case fieldName :: Nil => value.schema().field(fieldName).schema()
case fieldName :: rest => schemaFromNestedField(rest, value.getStruct(fieldName))
case Nil => value.schema()
}
}
def valueFromNestedField(remainingFieldsInTree: List[String], value: Struct): Any = {
remainingFieldsInTree match {
case fieldName :: Nil => if(value == null) throw new Exception(s"Unable to fetch field $fieldName from $value") else value.get(fieldName)
case fieldName :: rest => valueFromNestedField(rest, value.getStruct(fieldName))
case Nil => value
}
}
def apply(record: R): R = {
val schema = operatingSchema(record)
if (schema == null) {
val value = requireMap(operatingValue(record), ExtractFields.PURPOSE)
newRecord(record, null, value.get(fieldNames))
}
else {
val hierarchyFieldNames = fieldNames.map {
fieldName => fieldName.split('.').toList
}
hierarchyFieldNames.foreach(l => println(l))
val value = requireStruct(operatingValue(record), ExtractFields.PURPOSE)
val newSchema = hierarchyFieldNames.foldLeft(SchemaBuilder.struct().name(structName)) {
case (builder, fieldNameHierarchy) =>
val fieldSchema: Schema = schemaFromNestedField(fieldNameHierarchy, value)
builder.field(fieldNameHierarchy.last, fieldSchema)
}.build()
val newStruct = hierarchyFieldNames.foldLeft(new Struct(newSchema)) {
case (struct, fieldNameHierarchy) =>
struct.put(fieldNameHierarchy.last, valueFromNestedField(fieldNameHierarchy, value))
}
newRecord(record, newSchema, newStruct)
}
}
def close(): Unit = {}
def config: ConfigDef = ExtractFields.CONFIG_DEF
protected def operatingSchema(record: R): Schema
protected def operatingValue(record: R): AnyRef
protected def newRecord(record: R, updatedSchema: Schema, updatedValue: AnyRef): R
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment