Skip to content

Instantly share code, notes, and snippets.

@abhishekmishragithub
Created June 8, 2023 05:19
Show Gist options
  • Save abhishekmishragithub/fadc65b5fd13f2b3d28efac7bd838b06 to your computer and use it in GitHub Desktop.
Save abhishekmishragithub/fadc65b5fd13f2b3d28efac7bd838b06 to your computer and use it in GitHub Desktop.
kafka_dozer
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- zookeeper
- broker
environment:
# SCHEMA_REGISTRY_HOST_NAME: schema-registry
# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://broker:29092
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
@abhishekmishragithub
Copy link
Author

the python code (producer.py)

import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # import ipdb;ipdb.set_trace()

    # Create Producer instance
    producer = Producer(config)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
    products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

    count = 0
    # while count>100:
    for _ in range(1000):

        user_id = choice(user_ids)
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1
        # import time; time.sleep(1)

    # Block until the messages are sent.
    producer.poll(10000)
    producer.flush()

@abhishekmishragithub
Copy link
Author

abhishekmishragithub commented Jun 8, 2023

the getting_started.ini to store the config

[default]
bootstrap.servers=localhost:9092
; schema.registry.url=localhost:8081
; queue.buffering.max.messages=20000

[consumer]
group.id=python_example_group_1

# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest

Run project:

1 . docker-compose up
2. Create a topic:

docker compose exec broker \
        kafka-topics --create \
          --topic purchases \
          --bootstrap-server localhost:9092 \
          --replication-factor 1 \
          --partitions 1
  1. chmod u+x producer.py
  2. ./producer.py getting_started.ini

@abhishekmishragithub
Copy link
Author

Dozer config:

connections:
  - config : !Kafka
      broker: localhost:9092
      schema_registry_url:
      name: kafka_store
  
sources:
  - name: purchases
    table_name: testable
    connection: !Ref kafka_store
    columns:
  
endpoints:
  - name: movies_with_bookings
    path: /movies_with_bookings
    table_name: purchases

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment