Skip to content

Instantly share code, notes, and snippets.

@samukasmk
Last active December 27, 2023 15:10
Show Gist options
  • Save samukasmk/1ec22759cbcb914a1a68c6ae266d61b0 to your computer and use it in GitHub Desktop.
Save samukasmk/1ec22759cbcb914a1a68c6ae266d61b0 to your computer and use it in GitHub Desktop.
Using Kafka library confluent-kafka-python

Installing

pip install confluent-kafka

Initialization

It's important to declare producer and consumer objects in global scope for better performance

These objects will handle connections with Kafka providers in other threads in background

File: kafka_connectors

import socket
from confluent_kafka import Producer, Consumer

kafka_config = {
    'producer': {
        'bootstrap.servers': 'host1:9092,host2:9092',
        'client.id': socket.gethostname()
    },
     'consumer': {
        'bootstrap.servers': 'host1:9092,host2:9092',
        'group.id': 'foo',
        'auto.offset.reset': 'smallest'
     }
}


producer = Producer(kafka_config['producer'])
consumer = Consumer(kafka_config['consumer'])

Producing in asynchronous mode

from kafka_connectors import producer

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

# sending 100 events in async mode
for i in range(100):    
    producer.produce(topic, key="key", value=f"value-{i}", callback=acked)

# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)

Producing in synchronous mode (slower)

from kafka_connectors import producer

# sending 100 events in sync mode
for i in range(100):    
    producer.produce(topic="topic", key="key", value=f"value-{i}", callback=acked)
    producer.flush()

Consuming in synchronous mode (respecting normal sorting)

from confluent_kafka import KafkaException
from kafka_connectors import consumer

running = True

max_messages_by_poll = 5


def do_something(msg):
    # exit if error
    if msg.error():
        raise KafkaException(msg.error())

    # display received message
    topic = msg.topic()
    msg_key = msg.key().decode('utf-8')
    msg_value = msg.value().decode('utf-8')
    print(f'Received message: {msg_key}: {msg_value} from topic: {topic}')


def consume_loop_sync(consumer, topics):
    try:
        consumer.subscribe(topics)
        global running

        while running:
            # consume kafka topic
            received_messages = consumer.poll(num_messages=max_messages_by_poll, timeout=1.0)

            # for each message received, do something
            for msg in received_messages:
                do_something(msg)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()


def shutdown():
    print('shuting down consumer...')
    global running
    running = False


if __name__ == '__main__':
    try:
        consume_loop_sync(consumer=consumer, topics=["topic"])
    except KeyboardInterrupt:
        shutdown()

Consuming in asynchronous mode (as fast as possible)

from threading import Thread
from confluent_kafka import KafkaException
from kafka_connectors import consumer

running = True

max_threads_in_parallel = 5


def do_something(msg, batch_number):
    # exit if error
    if msg.error():
        raise KafkaException(msg.error())

    # display received message
    topic = msg.topic()
    msg_key = msg.key().decode('utf-8')
    msg_value = msg.value().decode('utf-8')
    print(f'[Batch: {batch_number}] Received message: {msg_key}: {msg_value} from topic: {topic}')


def consume_loop_async(consumer, topics):
    try:
        consumer.subscribe(topics)
        global running

        batch_number = 1

        while running:
            # consume kafka topic
            received_messages = consumer.poll(num_messages=max_threads_in_parallel, timeout=1.0)

            open_threads = []

            # for each message received
            for msg in received_messages:
                # create thread
                thread = Thread(target=do_something, args=(msg, batch_number,))

                # start thread in parallel
                thread.start()

                # append to open threads
                open_threads.append(thread)

            # wait for all threads in batch to finish
            for thread in open_threads:
                thread.join()

            # finish batch execution
            print("All theads completed")
            batch_number += 1


    finally:
        # Close down consumer to commit final offsets.
        consumer.close()


def shutdown():
    print('shuting down consumer...')
    global running
    running = False


if __name__ == '__main__':
    try:
        consume_loop_async(consumer=consumer, topics=["topic"])
    except KeyboardInterrupt:
        shutdown()

Sources:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment