Skip to content

Instantly share code, notes, and snippets.

View limadelrey's full-sized avatar

Francisco Lima limadelrey

  • Porto, Portugal
View GitHub Profile
@limadelrey
limadelrey / bike-store-source-connector.properties
Created July 3, 2020 00:33
Kafka Connect: How to create a real time data pipeline using Change Data Capture (CDC)
# https://debezium.io/docs/connectors/postgresql/
# Unique name for the source connector.
name=bike-store-source-connector
# The name of the Java class for the connector.
connector.class=io.debezium.connector.postgresql.PostgresConnector
# The maximum number of tasks that should be created for this connector.
tasks.max=1
# The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance.
slot.name=debezium
# Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored.
@limadelrey
limadelrey / docker-compose.yml
Created July 3, 2020 00:36
Kafka Connect: How to create a real time data pipeline using Change Data Capture (CDC)
services:
kafka:
container_name: bike-store-kafka
image: landoop/fast-data-dev:2.5
environment:
ADV_HOST: 127.0.0.1 # Change to 192.168.99.100 if using Docker Toolbox
RUNTESTS: 0 # Disable Running tests so the cluster starts faster
CONNECTORS: debezium-postgres # Allows only described connectors
SAMPLEDATA: 0 # Disable sample data topic creation
RUNNING_SAMPLEDATA: 0 # Disable sample data
@limadelrey
limadelrey / BookHandler.java
Last active January 20, 2021 23:33
BookHandler
public class BookHandler {
private static final String ID_PARAMETER = "id";
private static final String PAGE_PARAMETER = "page";
private static final String LIMIT_PARAMETER = "limit";
private final BookService bookService;
public BookHandler(BookService bookService) {
this.bookService = bookService;
public class BookRouter {
private final Vertx vertx;
private final BookHandler bookHandler;
private final BookValidationHandler bookValidationHandler;
public BookRouter(Vertx vertx,
BookHandler bookHandler,
BookValidationHandler bookValidationHandler) {
this.vertx = vertx;
public class BookService {
private static final Logger LOGGER = LoggerFactory.getLogger(BookService.class);
private final PgPool dbClient;
private final BookRepository bookRepository;
public BookService(PgPool dbClient,
BookRepository bookRepository) {
this.dbClient = dbClient;
public class BookRepository {
private static final Logger LOGGER = LoggerFactory.getLogger(BookRepository.class);
private static final String SQL_SELECT_ALL = "SELECT * FROM books LIMIT #{limit} OFFSET #{offset}";
private static final String SQL_SELECT_BY_ID = "SELECT * FROM books WHERE id = #{id}";
private static final String SQL_INSERT = "INSERT INTO books (author, country, image_link, language, link, pages, title, year) " +
"VALUES (#{author}, #{country}, #{image_link}, #{language}, #{link}, #{pages}, #{title}, #{year}) RETURNING id";
private static final String SQL_UPDATE = "UPDATE books SET author = #{author}, country = #{country}, image_link = #{image_link}, " +
"language = #{language}, link = #{link}, pages = #{pages}, title = #{title}, year = #{year} WHERE id = #{id}";
public class ApiVerticle extends AbstractVerticle {
private static final Logger LOGGER = LoggerFactory.getLogger(ApiVerticle.class);
@Override
public void start(Promise<Void> promise) {
final PgPool dbClient = DbUtils.buildDbClient(vertx);
final BookRepository bookRepository = new BookRepository();
final BookService bookService = new BookService(dbClient, bookRepository);
public class MigrationVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> promise) {
final Configuration config = DbUtils.buildMigrationsConfiguration();
final Flyway flyway = new Flyway(config);
flyway.migrate();
promise.complete();
public class MainVerticle extends AbstractVerticle {
private static final Logger LOGGER = LoggerFactory.getLogger(MainVerticle.class);
@Override
public void start() {
final long start = System.currentTimeMillis();
deployMigrationVerticle(vertx)
.flatMap(migrationVerticleId -> deployApiVerticle(vertx))
public class HealthCheckRouter {
/**
* Set health check routes
*
* @param vertx Vertx context
* @param router Router
* @param dbClient PostgreSQL pool
*/
public void setRouter(Vertx vertx,