Skip to content

Instantly share code, notes, and snippets.

View merlin-quix's full-sized avatar

Merlin merlin-quix

View GitHub Profile
@merlin-quix
merlin-quix / timeseries_kafka_create_producer_2.py
Last active January 23, 2023 07:17
Step 2 in the procedure "Creating a Kafka Producer to send data" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
# Set a basic message counter and define the file path
counter = 0
file = "online_retail_II.csv"
for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10):
# For each chunk, convert the invoice date into the correct time format
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"])
# Set the counter as the message key
@merlin-quix
merlin-quix / timeseries_kafka_create_producer_1.py
Last active January 23, 2023 07:20
Step 1 in the procedure "Creating a Kafka Producer to send data" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer
# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')
@merlin-quix
merlin-quix / timeseries_kafka_create_consumer_1.py
Last active January 23, 2023 07:20
Step 1 in the procedure "Creating a Kafka Consumer" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
from kafka import KafkaConsumer
import json
import pandas as pd
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False)
# so that we can re-read them as often as we like.
consumer = KafkaConsumer('transactions',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
@merlin-quix
merlin-quix / timeseries_kafka_create_consumer_2.py
Last active January 23, 2023 07:21
Step 2 in the procedure "Creating a Kafka Consumer" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
for message in consumer:
mframe = pd.DataFrame(message.value)
# Multiply the quantity by the price and store in a new "revenue" column
mframe['revenue'] = mframe['Quantity'] * mframe['Price']
# Aggregate the StockCodes in the individual batch by revenue
summary = mframe.groupby('StockCode')['revenue'].sum()
print(summary)
# importing packages
import pandas as pd
import time, json
import datetime as dt
import requests
from kafka import KafkaProducer
df = pd.read_csv("spam.csv")
# initializing Kafka Producer Client
for i in df.index:
# Create a message that includes a message ID and email text
data = {'MessageID': str(i),
'MessageBody': df['Message'][i]}
# Print the message so you can see what is being sent
print(f'Sending message: {data}')
# Send the data with the KafkaProducer client
from pandas import DataFrame
import time, json
import datetime as dt
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import numpy as np
from kafka import KafkaProducer
from kafka import KafkaConsumer
import tensorflow_text as text
#1 — Initialize a Consumer for reading the email messages from the emails topic
consumer = KafkaConsumer(group_id="python-consumer",
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',# Always read from beginning
enable_auto_commit=False, # Keep Kafka messages as ‘unread’
value_deserializer=lambda x:json.loads(x))
consumer.subscribe("emails")
print(f'Initialized Kafka consumer at {dt.datetime.utcnow()})
for message in consumer:
# Get the message body
m=message.value
# Turn the message back into a DataFrame
df=DataFrame(m,index=[0])
# Extract the Message from the DataFrame
dff = pd.Series(df['MessageBody'])
import json
from kafka import KafkaProducer
file = 'access.log'
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"),
compression_type="gzip")