Skip to content

Instantly share code, notes, and snippets.

@vcabral19
Last active July 4, 2021 19:56
Show Gist options
  • Save vcabral19/2d7f2733190d4512faf2b4325c306a23 to your computer and use it in GitHub Desktop.
Save vcabral19/2d7f2733190d4512faf2b4325c306a23 to your computer and use it in GitHub Desktop.
using dataclass with faker (to generate random values) I've played with producing messages to kafka in a sync fashion
from dataclasses import dataclass, field
import json
import random
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from faker import Faker
faker = Faker()
BROKER_URL = "PLAINTEXT://localhost:9092"
TOPIC_NAME = "org.bigcompany.exercise3.purchases"
class EnhancedJSONEncoder(json.JSONEncoder):
# this is unecessary if you are using __dict__
def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)
@dataclass
class Purchase:
username: str = field(default_factory=faker.user_name)
currency: str = field(default_factory=faker.currency_code)
amount: int = field(default_factory=lambda: random.randint(100, 200000))
def serialize(self):
"""Serializes the object in JSON string format"""
# TODO: Serializer the Purchase object
# See: https://docs.python.org/3/library/json.html#json.dumps
return json.dumps(self.__dict__)#, cls=EnhancedJSONEncoder)
def produce_sync(topic_name):
"""Produces data synchronously into the Kafka Topic"""
p = Producer({"bootstrap.servers": BROKER_URL})
# TODO: Write a synchronous production loop.
# See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Producer.flush
while True:
# TODO: Instantiate a `Purchase` on every iteration. Make sure to serialize it before
# sending it to Kafka!
iter_purchase = Purchase()
p.produce(topic_name, iter_purchase.serialize())
p.flush()
def main():
"""Checks for topic and creates the topic if it does not exist"""
create_topic(TOPIC_NAME)
try:
produce_sync(TOPIC_NAME)
except KeyboardInterrupt as e:
print("shutting down")
def create_topic(client):
"""Creates the topic with the given topic name"""
client = AdminClient({"bootstrap.servers": BROKER_URL})
futures = client.create_topics(
[NewTopic(topic=TOPIC_NAME, num_partitions=5, replication_factor=1)]
)
for _, future in futures.items():
try:
future.result()
except Exception as e:
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment