Skip to content

Instantly share code, notes, and snippets.

View merlin-quix's full-sized avatar

Merlin merlin-quix

View GitHub Profile
@merlin-quix
merlin-quix / ingestion_viakafka_using_quix-qdrant.ipynb
Created January 31, 2024 15:40
Ingestion_viaKafka_Using_Quix-Qdrant.ipynb
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@merlin-quix
merlin-quix / streaming-csv-with-quix-streams.py
Created March 3, 2023 12:29
streaming-csv-with-quix-streams
import quixstreams as qx
import pandas as pd
import time
from urllib import request
# download a sample data.csv. If you want to use your own data file
# just comment out line 9 and 10 and place your data.csv
# file in the same directory as this file.
print("Downloading CSV")
f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/data.csv")
for message in consumer:
mv = message.value
# Load the decoded value as JSON
mjson = json.loads(mv)
# Print the message key and a selection of data points from the JSON
print(f"KEY: {message.key} | "
f"VALUE: {datetime.utcfromtimestamp(mjson['time']/1e3)},
{mjson['host']},{mjson['address']} ")
from kafka import KafkaConsumer
import json
from datetime import datetime
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False)
# so that we can re-read them as often as we like.
consumer = KafkaConsumer('nginx-log-producer-compressionv2',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"),
compression_type="gzip",
linger_ms=1000,
batch_size=10485760)
# Add a switch to let us easily switch between methods
compmethod = 'topic'
# Initialize a simple counter for the message key
message_key = 0
# Iterate through file line-by-line
with open(file, "r") as file_handle:
for line in file_handle:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x,default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"))
import json
from kafka import KafkaProducer
file = 'access.log'
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"),
compression_type="gzip")
for message in consumer:
# Get the message body
m=message.value
# Turn the message back into a DataFrame
df=DataFrame(m,index=[0])
# Extract the Message from the DataFrame
dff = pd.Series(df['MessageBody'])
#1 — Initialize a Consumer for reading the email messages from the emails topic
consumer = KafkaConsumer(group_id="python-consumer",
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',# Always read from beginning
enable_auto_commit=False, # Keep Kafka messages as ‘unread’
value_deserializer=lambda x:json.loads(x))
consumer.subscribe("emails")
print(f'Initialized Kafka consumer at {dt.datetime.utcnow()})