Skip to content

Instantly share code, notes, and snippets.

@amalgjose
Last active March 14, 2020 04:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amalgjose/85b9f5e40d422f8a59e2fa73039c0c9d to your computer and use it in GitHub Desktop.
Save amalgjose/85b9f5e40d422f8a59e2fa73039c0c9d to your computer and use it in GitHub Desktop.
Python program to stream data from Kafka using Faust.
import faust
# The definition of message
class Greeting(faust.Record):
from_name: str
to_name: str
# Here we initialize the application. The Kafka broker details are specified in the broker details.
app = faust.App('hello-app', broker='kafka://192.168.0.20')
# Here we define the topic and define the template of the message
topic = app.topic('hello-topic', value_type=Greeting)
# This is the faust agent that reads the data from the kafka topic asynchronously.
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
# This function acts as the producer and send messages to Kafka at the mentioned time interval
# Here the time interval is 0.1 seconds. You can adjust this and test the speed of produce & consume.
@app.timer(interval=0.1)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Amal', to_name='you'),
)
if __name__ == '__main__':
app.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment