Last active
March 14, 2020 04:37
-
-
Save amalgjose/85b9f5e40d422f8a59e2fa73039c0c9d to your computer and use it in GitHub Desktop.
Python program to stream data from Kafka using Faust.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import faust | |
# The definition of message | |
class Greeting(faust.Record): | |
from_name: str | |
to_name: str | |
# Here we initialize the application. The Kafka broker details are specified in the broker details. | |
app = faust.App('hello-app', broker='kafka://192.168.0.20') | |
# Here we define the topic and define the template of the message | |
topic = app.topic('hello-topic', value_type=Greeting) | |
# This is the faust agent that reads the data from the kafka topic asynchronously. | |
@app.agent(topic) | |
async def hello(greetings): | |
async for greeting in greetings: | |
print(f'Hello from {greeting.from_name} to {greeting.to_name}') | |
# This function acts as the producer and send messages to Kafka at the mentioned time interval | |
# Here the time interval is 0.1 seconds. You can adjust this and test the speed of produce & consume. | |
@app.timer(interval=0.1) | |
async def example_sender(app): | |
await hello.send( | |
value=Greeting(from_name='Amal', to_name='you'), | |
) | |
if __name__ == '__main__': | |
app.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment