This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Set a basic message counter and define the file path | |
counter = 0 | |
file = "online_retail_II.csv" | |
for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10): | |
# For each chunk, convert the invoice date into the correct time format | |
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"]) | |
# Set the counter as the message key |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import packages | |
import pandas as pd | |
import json | |
import datetime as dt | |
from time import sleep | |
from kafka import KafkaProducer | |
# Initialize Kafka Producer Client | |
producer = KafkaProducer(bootstrap_servers=['localhost:9092']) | |
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KafkaConsumer | |
import json | |
import pandas as pd | |
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False) | |
# so that we can re-read them as often as we like. | |
consumer = KafkaConsumer('transactions', | |
group_id='test-consumer-group', | |
bootstrap_servers=['localhost:9092'], | |
value_deserializer=lambda m: json.loads(m.decode('utf-8')), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for message in consumer: | |
mframe = pd.DataFrame(message.value) | |
# Multiply the quantity by the price and store in a new "revenue" column | |
mframe['revenue'] = mframe['Quantity'] * mframe['Price'] | |
# Aggregate the StockCodes in the individual batch by revenue | |
summary = mframe.groupby('StockCode')['revenue'].sum() | |
print(summary) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# importing packages | |
import pandas as pd | |
import time, json | |
import datetime as dt | |
import requests | |
from kafka import KafkaProducer | |
df = pd.read_csv("spam.csv") | |
# initializing Kafka Producer Client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for i in df.index: | |
# Create a message that includes a message ID and email text | |
data = {'MessageID': str(i), | |
'MessageBody': df['Message'][i]} | |
# Print the message so you can see what is being sent | |
print(f'Sending message: {data}') | |
# Send the data with the KafkaProducer client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pandas import DataFrame | |
import time, json | |
import datetime as dt | |
import pandas as pd | |
import tensorflow as tf | |
from tensorflow import keras | |
import numpy as np | |
from kafka import KafkaProducer | |
from kafka import KafkaConsumer | |
import tensorflow_text as text |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#1 — Initialize a Consumer for reading the email messages from the emails topic | |
consumer = KafkaConsumer(group_id="python-consumer", | |
bootstrap_servers=['localhost:9092'], | |
auto_offset_reset='earliest',# Always read from beginning | |
enable_auto_commit=False, # Keep Kafka messages as ‘unread’ | |
value_deserializer=lambda x:json.loads(x)) | |
consumer.subscribe("emails") | |
print(f'Initialized Kafka consumer at {dt.datetime.utcnow()}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for message in consumer: | |
# Get the message body | |
m=message.value | |
# Turn the message back into a DataFrame | |
df=DataFrame(m,index=[0]) | |
# Extract the Message from the DataFrame | |
dff = pd.Series(df['MessageBody']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from kafka import KafkaProducer | |
file = 'access.log' | |
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'], | |
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'), | |
key_serializer=lambda y:str(y).encode("utf-8"), | |
compression_type="gzip") |
OlderNewer