Skip to content

Instantly share code, notes, and snippets.

@merlin-quix
Last active January 23, 2023 07:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save merlin-quix/05f1bd8099f3fec4018e2ded590cef8e to your computer and use it in GitHub Desktop.
Save merlin-quix/05f1bd8099f3fec4018e2ded590cef8e to your computer and use it in GitHub Desktop.
Step 2 in the procedure "Creating a Kafka Producer to send data" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
# Set a basic message counter and define the file path
counter = 0
file = "online_retail_II.csv"
for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10):
# For each chunk, convert the invoice date into the correct time format
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"])
# Set the counter as the message key
key = str(counter).encode()
# Convert the data frame chunk into a dictionary
chunkd = chunk.to_dict()
# Encode the dictionary into a JSON Byte Array
data = json.dumps(chunkd, default=str).encode('utf-8')
# Send the data to Kafka
producer.send(topic="transactions", key=key, value=data)
# Sleep to simulate a real-world interval
sleep(0.5)
# Increment the message counter for the message key
counter = counter + 1
print(f'Sent record to topic at time {dt.datetime.utcnow()}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment