Skip to content

Instantly share code, notes, and snippets.

View Kavit900's full-sized avatar

Kavit Kavit900

View GitHub Profile
@Kavit900
Kavit900 / WeatherDeserializationSchema.java
Created November 25, 2023 09:34
Weather Deserialization File
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
public class WeatherDeserializationSchema extends AbstractDeserializationSchema<Weather> {
private static final long serialVersionUUID = 1L;
private transient ObjectMapper objectMapper;
@Kavit900
Kavit900 / Weather.java
Created November 25, 2023 09:33
Weather instance class file
import java.util.Objects;
public class Weather {
/*
{
"city": "New York",
"temperature": "10.34"
}
*/
@Kavit900
Kavit900 / Main.java
Created November 25, 2023 09:32
Main processing file
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.TopicPartition;
@Kavit900
Kavit900 / pom.xml
Created November 25, 2023 09:31
Dependency file for Flink consumer
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.kavit</groupId>
<artifactId>flink-kafka2postgres</artifactId>
<version>1.0-SNAPSHOT</version>
@Kavit900
Kavit900 / Dockerfile-kafka-python-producer
Created November 25, 2023 09:25
Dockerfile for Kafka Python Producer
From python:3.8-slim
COPY requirements.txt .
RUN set -ex; \
pip install --no-cache-dir -r requirements.txt
# Copy resources
WORKDIR /
COPY wait-for-it.sh wait-for-it.sh
@Kavit900
Kavit900 / requirements.txt
Created November 25, 2023 09:24
Dependency for Kafka Python Producer script
kafka-python==2.0.2
schedule==1.1.0
aiokafka==0.7.2
Faker==15.1.3
@Kavit900
Kavit900 / wait-for-it.sh
Created November 25, 2023 09:23
Wait for Kafka and Zookeeper to come up
#!/usr/bin/env bash
# Use this script to test if a given TCP host/port are available
cmdname=$(basename $0)
echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }
usage()
{
cat << USAGE >&2
@Kavit900
Kavit900 / python-producer.py
Created November 25, 2023 09:15
Kafka Python Producer
import datetime
import time
import random
import schedule
from json import dumps
from faker import Faker
from kafka import KafkaProducer
@Kavit900
Kavit900 / Dockerfile-postgres
Created November 25, 2023 08:15
Dockerfile for postgres
FROM postgres:latest
COPY create_table.sql /docker-entrypoint-initdb.d/
@Kavit900
Kavit900 / create_table.sql
Created November 25, 2023 08:13
Create a Weather Table
CREATE TABLE weather (
id SERIAL PRIMARY KEY,
city VARCHAR (255) NOT NULL,
average_temperature DOUBLE PRECISION
);