Skip to content

Instantly share code, notes, and snippets.

# importing packages
import pandas as pd
import time, json
import datetime as dt
import requests
from kafka import KafkaProducer
df = pd.read_csv("spam.csv")
# initializing Kafka Producer Client
from pandas import DataFrame
import time, json
import datetime as dt
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import numpy as np
from kafka import KafkaProducer
from kafka import KafkaConsumer
import tensorflow_text as text
#1 — Initialize a Consumer for reading the email messages from the emails topic
consumer = KafkaConsumer(group_id="python-consumer",
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',# Always read from beginning
enable_auto_commit=False, # Keep Kafka messages as ‘unread’
value_deserializer=lambda x:json.loads(x))
consumer.subscribe("emails")
print(f'Initialized Kafka consumer at {dt.datetime.utcnow()})
for i in df.index:
# Create a message that includes a message ID and email text
data = {'MessageID': str(i),
'MessageBody': df['Message'][i]}
# Print the message so you can see what is being sent
print(f'Sending message: {data}')
# Send the data with the KafkaProducer client
for message in consumer:
# Get the message body
m=message.value
# Turn the message back into a DataFrame
df=DataFrame(m,index=[0])
# Extract the Message from the DataFrame
dff = pd.Series(df['MessageBody'])
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x,default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"))
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"),
compression_type="gzip",
linger_ms=1000,
batch_size=10485760)
import json
from kafka import KafkaProducer
file = 'access.log'
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'),
key_serializer=lambda y:str(y).encode("utf-8"),
compression_type="gzip")
@merlin-quix
merlin-quix / timeseries_kafka_create_producer_2.py
Last active January 23, 2023 07:17
Step 2 in the procedure "Creating a Kafka Producer to send data" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
# Set a basic message counter and define the file path
counter = 0
file = "online_retail_II.csv"
for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10):
# For each chunk, convert the invoice date into the correct time format
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"])
# Set the counter as the message key
@merlin-quix
merlin-quix / timeseries_kafka_create_producer_1.py
Last active January 23, 2023 07:20
Step 1 in the procedure "Creating a Kafka Producer to send data" from this article: https://www.quix.io/blog/send-timeseries-data-to-kafka-python/
# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer
# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')