Last active
July 4, 2021 19:56
-
-
Save vcabral19/2d7f2733190d4512faf2b4325c306a23 to your computer and use it in GitHub Desktop.
using dataclass with faker (to generate random values) I've played with producing messages to kafka in a sync fashion
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass, field | |
import json | |
import random | |
from confluent_kafka import Consumer, Producer | |
from confluent_kafka.admin import AdminClient, NewTopic | |
from faker import Faker | |
faker = Faker() | |
BROKER_URL = "PLAINTEXT://localhost:9092" | |
TOPIC_NAME = "org.bigcompany.exercise3.purchases" | |
class EnhancedJSONEncoder(json.JSONEncoder): | |
# this is unecessary if you are using __dict__ | |
def default(self, o): | |
if dataclasses.is_dataclass(o): | |
return dataclasses.asdict(o) | |
return super().default(o) | |
@dataclass | |
class Purchase: | |
username: str = field(default_factory=faker.user_name) | |
currency: str = field(default_factory=faker.currency_code) | |
amount: int = field(default_factory=lambda: random.randint(100, 200000)) | |
def serialize(self): | |
"""Serializes the object in JSON string format""" | |
# TODO: Serializer the Purchase object | |
# See: https://docs.python.org/3/library/json.html#json.dumps | |
return json.dumps(self.__dict__)#, cls=EnhancedJSONEncoder) | |
def produce_sync(topic_name): | |
"""Produces data synchronously into the Kafka Topic""" | |
p = Producer({"bootstrap.servers": BROKER_URL}) | |
# TODO: Write a synchronous production loop. | |
# See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Producer.flush | |
while True: | |
# TODO: Instantiate a `Purchase` on every iteration. Make sure to serialize it before | |
# sending it to Kafka! | |
iter_purchase = Purchase() | |
p.produce(topic_name, iter_purchase.serialize()) | |
p.flush() | |
def main(): | |
"""Checks for topic and creates the topic if it does not exist""" | |
create_topic(TOPIC_NAME) | |
try: | |
produce_sync(TOPIC_NAME) | |
except KeyboardInterrupt as e: | |
print("shutting down") | |
def create_topic(client): | |
"""Creates the topic with the given topic name""" | |
client = AdminClient({"bootstrap.servers": BROKER_URL}) | |
futures = client.create_topics( | |
[NewTopic(topic=TOPIC_NAME, num_partitions=5, replication_factor=1)] | |
) | |
for _, future in futures.items(): | |
try: | |
future.result() | |
except Exception as e: | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment