Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mike11339/b6b141dfdf75cb8f98f8a466c69fe37d to your computer and use it in GitHub Desktop.
Save mike11339/b6b141dfdf75cb8f98f8a466c69fe37d to your computer and use it in GitHub Desktop.
from confluent_kafka import Producer
from python_kafka import Timer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
msg = ('kafkatest' * 20).encode()[:100]
size = 1000000
def delivery_report(err, decoded_message, original_message):
if err is not None:
print(err)
def confluent_producer_async():
for _ in range(size):
producer.produce(
"topic1",
msg,
callback=lambda err, decoded_message, original_message=msg: delivery_report( # noqa
err, decoded_message, original_message
),
)
producer.flush()
def confluent_producer_sync():
for _ in range(100000):
producer.produce(
"topic1",
msg,
callback=lambda err, decoded_message, original_message=msg: delivery_report( # noqa
err, decoded_message, original_message
),
)
producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment