Skip to content

Instantly share code, notes, and snippets.

@AniketSaki
Last active May 18, 2024 14:58
Show Gist options
  • Save AniketSaki/089a3dd3d134cdc2c395ad916f7086de to your computer and use it in GitHub Desktop.
Save AniketSaki/089a3dd3d134cdc2c395ad916f7086de to your computer and use it in GitHub Desktop.
PyFlink with Kafka

Apache-Python with Kafka source and sink

Reproduced from https://thecodinginterface.com/blog/kafka-source-sink-with-apache-flink-table-api/

PyFlink is compatible with Python>=3.5<3.9

Process:

  1. Produce events and send to Kafka topic
  2. Set up streaming service via PyFlink DataStream API
  3. Read from Kafka source via PyFlink TABLE API
  4. Process data
  5. Write to Kafka sink via PyFlink TABLE API

Setup Venv

python3 -m venv venv
source venv/bin/activate

Setup Docker Containers

docker-compose up -d

Download kafka-flink connector

http --download https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.0/flink-sql-connector-kafka_2.11-1.13.0.jar

Create Kafka topics in container

docker exec -it broker kafka-topics --create \
    --bootstrap-server localhost:9092 \
    --topic sales-usd

docker exec -it broker kafka-topics --create \
    --bootstrap-server localhost:9092 \
    --topic sales-euros

Generate events

python sales_producer.py

Process events

python sales_processor.py

Verify Kafka sink update

docker exec -it broker kafka-console-consumer --from-beginning \
    --bootstrap-server localhost:9092 \
    --topic sales-euros

Clean up Docker containers

docker-compose down -v

version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.1.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
apache-flink>=1.13.0,<1.14
confluent-kafka>=1.7.0,<1.8
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def main():
# Create streaming environment
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()
# create table environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=settings)
# add kafka connector dependency
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka_2.11-1.13.0.jar')
tbl_env.get_config()\
.get_configuration()\
.set_string("pipeline.jars", "file://{}".format(kafka_jar))
#######################################################################
# Create Kafka Source Table with DDL
#######################################################################
src_ddl = """
CREATE TABLE sales_usd (
seller_id VARCHAR,
amount_usd DOUBLE,
sale_ts BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'sales-usd',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'sales-usd',
'format' = 'json'
)
"""
tbl_env.execute_sql(src_ddl)
# create and initiate loading of source Table
tbl = tbl_env.from_path('sales_usd')
print('\nSource Schema')
tbl.print_schema()
#####################################################################
# Define Tumbling Window Aggregate Calculation (Seller Sales Per Minute)
#####################################################################
sql = """
SELECT
seller_id,
TUMBLE_END(proctime, INTERVAL '60' SECONDS) AS window_end,
SUM(amount_usd) * 0.85 AS window_sales
FROM sales_usd
GROUP BY
TUMBLE(proctime, INTERVAL '60' SECONDS),
seller_id
"""
revenue_tbl = tbl_env.sql_query(sql)
print('\nProcess Sink Schema')
revenue_tbl.print_schema()
###############################################################
# Create Kafka Sink Table
###############################################################
sink_ddl = """
CREATE TABLE sales_euros (
seller_id VARCHAR,
window_end TIMESTAMP(3),
window_sales DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'sales-euros',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
"""
tbl_env.execute_sql(sink_ddl)
# write time windowed aggregations to sink table
revenue_tbl.execute_insert('sales_euros').wait()
tbl_env.execute('windowed-sales-euros')
if __name__ == '__main__':
main()
import argparse
import atexit
import json
import logging
import random
import time
import sys
from confluent_kafka import Producer
logging.basicConfig(
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO,
handlers=[
logging.FileHandler("sales_producer.log"),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger()
SELLERS = ['LNK', 'OMA', 'KC', 'DEN']
class ProducerCallback:
def __init__(self, record, log_success=False):
self.record = record
self.log_success = log_success
def __call__(self, err, msg):
if err:
logger.error('Error producing record {}'.format(self.record))
elif self.log_success:
logger.info('Produced {} to topic {} partition {} offset {}'.format(
self.record,
msg.topic(),
msg.partition(),
msg.offset()
))
def main(args):
logger.info('Starting sales producer')
conf = {
'bootstrap.servers': args.bootstrap_server,
'linger.ms': 200,
'client.id': 'sales-1',
'partitioner': 'murmur2_random'
}
producer = Producer(conf)
atexit.register(lambda p: p.flush(), producer)
i = 1
while True:
is_tenth = i % 10 == 0
sales = {
'seller_id': random.choice(SELLERS),
'amount_usd': random.randrange(100, 1000),
'sale_ts': int(time.time() * 1000)
}
producer.produce(topic=args.topic,
value=json.dumps(sales),
on_delivery=ProducerCallback(sales, log_success=is_tenth))
if is_tenth:
producer.poll(1)
time.sleep(5)
i = 0 # no need to let i grow unnecessarily large
i += 1
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--bootstrap-server', default='localhost:9092')
parser.add_argument('--topic', default='sales-usd')
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment