Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alex88/6f231f7c94a48c0632ab0d09aaa2d723 to your computer and use it in GitHub Desktop.
Save alex88/6f231f7c94a48c0632ab0d09aaa2d723 to your computer and use it in GitHub Desktop.
Kafka Connect Postgres JSONB Dialect (Kafka 2.2.0)
package com.project.kafka.connect.jdbc.dialect
import com.project.kafka.connect.utils.JsonUtils
import io.confluent.connect.jdbc.dialect.DatabaseDialect
import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider
import io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect
import io.confluent.connect.jdbc.sink.metadata.SinkRecordField
import java.sql.PreparedStatement
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.connect.data.Schema
import org.postgresql.util.PGobject
class PostgreSqlWithJsonDatabaseDialect(
config: AbstractConfig
) : PostgreSqlDatabaseDialect(config) {
companion object {
private const val JSON_FIELD_TYPE = "jsonb"
private val JSON_SCHEMA_TYPES = setOf(
Schema.Type.ARRAY,
Schema.Type.MAP,
Schema.Type.STRUCT
)
}
override fun getSqlType(field: SinkRecordField): String {
return if (JSON_SCHEMA_TYPES.contains(field.schemaType()))
JSON_FIELD_TYPE else
super.getSqlType(field)
}
override fun maybeBindLogical(
statement: PreparedStatement,
index: Int,
schema: Schema,
value: Any?
): Boolean {
if (JSON_SCHEMA_TYPES.contains(schema.type())) {
val jsonObject = PGobject()
jsonObject.type = JSON_FIELD_TYPE
jsonObject.value = JsonUtils.convertToJson(schema, value).toString()
statement.setObject(index, jsonObject)
return true
}
return super.maybeBindLogical(statement, index, schema, value)
}
class Provider : DatabaseDialectProvider.SubprotocolBasedProvider(
PostgreSqlWithJsonDatabaseDialect::class.java.simpleName,
listOf("postgresql")
) {
override fun create(config: AbstractConfig): DatabaseDialect {
return PostgreSqlWithJsonDatabaseDialect(config)
}
/**
* Score would be greater than 0 if this url did in some way match "postgresql"
* We simply increment that number by 1 to score higher than the standard
* [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.Provider].
*/
override fun score(urlInfo: JdbcUrlInfo?): Int {
return super.score(urlInfo)
.let { if (it > 0) it + 1 else it }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment