Skip to content

Instantly share code, notes, and snippets.

View augustodn's full-sized avatar

Augusto de Nevrezé augustodn

View GitHub Profile
def configure_kafka_sink(server: str, topic_name: str) -> KafkaSink:
return (
KafkaSink.builder()
.set_bootstrap_servers(server)
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic(topic_name)
.set_value_serialization_schema(SimpleStringSchema())
.build()
@augustodn
augustodn / postgres_sink.py
Created June 24, 2024 11:15
postgre sink config
def configure_postgre_sink(sql_dml: str, type_info: Types) -> JdbcSink:
"""Makes postgres sink initialization. Config params are set in this function."""
return JdbcSink.sink(
sql_dml,
type_info,
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url(f"jdbc:postgresql://{POSTGRES_HOST}/flinkdb")
.with_driver_name("org.postgresql.Driver")
.with_user_name("flinkuser")
.with_password("flinkpassword")
@augustodn
augustodn / docker-compose.yaml
Created June 24, 2024 11:08
Flink in session mode, docker-compose.yaml
flink-jobmanager:
build:
context: ./pyflink
dockerfile: Dockerfile
ports:
- "8081:8081"
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
@augustodn
augustodn / kafka_ports_container.yml
Created June 18, 2024 20:29
kafka_ports_container
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092