Skip to content

Instantly share code, notes, and snippets.

@merlin-quix
merlin-quix / ingestion_viakafka_using_quix-qdrant.ipynb
Created January 31, 2024 15:40
Ingestion_viaKafka_Using_Quix-Qdrant.ipynb
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@merlin-quix
merlin-quix / streaming-csv-with-quix-streams.py
Created March 3, 2023 12:29
streaming-csv-with-quix-streams
import quixstreams as qx
import pandas as pd
import time
from urllib import request
# download a sample data.csv. If you want to use your own data file
# just comment out line 9 and 10 and place your data.csv
# file in the same directory as this file.
print("Downloading CSV")
f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/data.csv")
for message in consumer:
mv = message.value
# Load the decoded value as JSON
mjson = json.loads(mv)
# Print the message key and a selection of data points from the JSON
print(f"KEY: {message.key} | "
f"VALUE: {datetime.utcfromtimestamp(mjson['time']/1e3)},
{mjson['host']},{mjson['address']} ")
from kafka import KafkaConsumer
import json
from datetime import datetime
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False)
# so that we can re-read them as often as we like.
consumer = KafkaConsumer('nginx-log-producer-compressionv2',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
# Add a switch to let us easily switch between methods
compmethod = 'topic'
# Initialize a simple counter for the message key
message_key = 0
# Iterate through file line-by-line
with open(file, "r") as file_handle:
for line in file_handle:
@merlin-quix
merlin-quix / timeseries_kafka_create_consumer_2.py
Last active January 23, 2023 07:21
Step 2 in the procedure "Creating a Kafka Consumer" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
for message in consumer:
mframe = pd.DataFrame(message.value)
# Multiply the quantity by the price and store in a new "revenue" column
mframe['revenue'] = mframe['Quantity'] * mframe['Price']
# Aggregate the StockCodes in the individual batch by revenue
summary = mframe.groupby('StockCode')['revenue'].sum()
print(summary)
@merlin-quix
merlin-quix / timeseries_kafka_create_consumer_1.py
Last active January 23, 2023 07:20
Step 1 in the procedure "Creating a Kafka Consumer" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
from kafka import KafkaConsumer
import json
import pandas as pd
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False)
# so that we can re-read them as often as we like.
consumer = KafkaConsumer('transactions',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
@merlin-quix
merlin-quix / timeseries_kafka_create_producer_1.py
Last active January 23, 2023 07:20
Step 1 in the procedure "Creating a Kafka Producer to send data" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer
# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')
@merlin-quix
merlin-quix / timeseries_kafka_create_producer_2.py
Last active January 23, 2023 07:17
Step 2 in the procedure "Creating a Kafka Producer to send data" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
# Set a basic message counter and define the file path
counter = 0
file = "online_retail_II.csv"
for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10):
# For each chunk, convert the invoice date into the correct time format
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"])
# Set the counter as the message key
import json
from kafka import KafkaProducer
file = 'access.log'
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"),
compression_type="gzip")