Skip to content

Instantly share code, notes, and snippets.

@merlin-quix
Created March 3, 2023 12:29
Show Gist options
  • Save merlin-quix/e42594da530432cbd050bd83db97c857 to your computer and use it in GitHub Desktop.
Save merlin-quix/e42594da530432cbd050bd83db97c857 to your computer and use it in GitHub Desktop.
streaming-csv-with-quix-streams
import quixstreams as qx
import pandas as pd
import time
from urllib import request
# download a sample data.csv. If you want to use your own data file
# just comment out line 9 and 10 and place your data.csv
# file in the same directory as this file.
print("Downloading CSV")
f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/data.csv")
with open('data.csv', "wb") as data_file:
data_file.write(f.read())
# Connect to your kafka client (assumes you havn instance of Kafka running on your local machine)
client = qx.KafkaStreamingClient('127.0.0.1:9092')
print("Opening topic")
topic_producer = client.get_topic_producer('your-csv-topic')
output_stream = topic_producer.create_stream()
# prepare data
df = pd.read_csv('data.csv')
for col_i in ['device_id','rider','streamId','team','version']:
df = df.rename(columns={col_i: "TAG__" + col_i})
print("Writing data")
seconds_to_wait = 0.5 # this is to not burn through the CSV immediately, set to 0 to avoid sleep, but will still send row by row
for i in range(len(df)):
start_loop = time.time()
df_i = df.iloc[[i]]
output_stream.timeseries.publish(df_i)
print("Sending " + str(i) + "/" + str(len(df)))
end_loop = time.time()
time.sleep(max(0.0, seconds_to_wait - (end_loop - start_loop)))
print("Finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment