Created
March 3, 2023 12:29
-
-
Save merlin-quix/e42594da530432cbd050bd83db97c857 to your computer and use it in GitHub Desktop.
streaming-csv-with-quix-streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import quixstreams as qx | |
import pandas as pd | |
import time | |
from urllib import request | |
# download a sample data.csv. If you want to use your own data file | |
# just comment out line 9 and 10 and place your data.csv | |
# file in the same directory as this file. | |
print("Downloading CSV") | |
f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/data.csv") | |
with open('data.csv', "wb") as data_file: | |
data_file.write(f.read()) | |
# Connect to your kafka client (assumes you havn instance of Kafka running on your local machine) | |
client = qx.KafkaStreamingClient('127.0.0.1:9092') | |
print("Opening topic") | |
topic_producer = client.get_topic_producer('your-csv-topic') | |
output_stream = topic_producer.create_stream() | |
# prepare data | |
df = pd.read_csv('data.csv') | |
for col_i in ['device_id','rider','streamId','team','version']: | |
df = df.rename(columns={col_i: "TAG__" + col_i}) | |
print("Writing data") | |
seconds_to_wait = 0.5 # this is to not burn through the CSV immediately, set to 0 to avoid sleep, but will still send row by row | |
for i in range(len(df)): | |
start_loop = time.time() | |
df_i = df.iloc[[i]] | |
output_stream.timeseries.publish(df_i) | |
print("Sending " + str(i) + "/" + str(len(df))) | |
end_loop = time.time() | |
time.sleep(max(0.0, seconds_to_wait - (end_loop - start_loop))) | |
print("Finished") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment