Skip to content

Instantly share code, notes, and snippets.

@andybryant
Created August 3, 2018 11:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andybryant/8b6b532963a723bb1cfb5ddf8c9dd53f to your computer and use it in GitHub Desktop.
Save andybryant/8b6b532963a723bb1cfb5ddf8c9dd53f to your computer and use it in GitHub Desktop.
Batching Kafka Connect JDBC Postgres Dialect
package your.company.here;
import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialectProvider;
import io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect;
import org.apache.kafka.common.config.AbstractConfig;
import java.sql.Connection;
import java.sql.SQLException;
public class BatchingPostgreSqlDatabaseDialect extends PostgreSqlDatabaseDialect {
/**
* The provider for {@link BatchingPostgreSqlDatabaseDialect}.
*/
public static class Provider extends DatabaseDialectProvider.SubprotocolBasedProvider {
public Provider() {
super(BatchingPostgreSqlDatabaseDialect.class.getSimpleName(), "postgresql");
}
@Override
public int score(final JdbcUrlInfo urlInfo) {
return super.score(urlInfo) + 100; // force this version to be used
}
@Override
public DatabaseDialect create(final AbstractConfig config) {
return new BatchingPostgreSqlDatabaseDialect(config);
}
}
/**
* Create a new dialect instance with the given connector configuration.
*
* @param config the connector configuration; may not be null
*/
public BatchingPostgreSqlDatabaseDialect(final AbstractConfig config) {
super(config);
System.out.println("Created dialect with config " + config);
}
@Override
public Connection getConnection() throws SQLException {
final Connection connection = super.getConnection();
// disable autocommit so that row limits will be honored when fetching data
// see https://github.com/confluentinc/kafka-connect-jdbc/issues/34
connection.setAutoCommit(false);
return connection;
}
}
your.company.here.BatchingPostgreSqlDatabaseDialect$Provider
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment