Python program to stream data from Kafka using Faust.
import faust
# The definition of message
class Greeting(faust.Record):
from_name: str
to_name: str
# Here we initialize the application. The Kafka broker details are specified in the broker details.
app = faust.App('hello-app', broker='kafka://')
# Here we define the topic and define the template of the message
topic = app.topic('hello-topic', value_type=Greeting)
# This is the faust agent that reads the data from the kafka topic asynchronously.
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
# This function acts as the producer and send messages to Kafka at the mentioned time interval
# Here the time interval is 0.1 seconds. You can adjust this and test the speed of produce & consume.
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Amal', to_name='you'),
if __name__ == '__main__':
