Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import quixstreams as qx | |
import pandas as pd | |
import time | |
from urllib import request | |
# download a sample data.csv. If you want to use your own data file | |
# just comment out line 9 and 10 and place your data.csv | |
# file in the same directory as this file. | |
print("Downloading CSV") | |
f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/data.csv") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for message in consumer: | |
mv = message.value | |
# Load the decoded value as JSON | |
mjson = json.loads(mv) | |
# Print the message key and a selection of data points from the JSON | |
print(f"KEY: {message.key} | " | |
f"VALUE: {datetime.utcfromtimestamp(mjson['time']/1e3)}, | |
{mjson['host']},{mjson['address']} ") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KafkaConsumer | |
import json | |
from datetime import datetime | |
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False) | |
# so that we can re-read them as often as we like. | |
consumer = KafkaConsumer('nginx-log-producer-compressionv2', | |
group_id='test-consumer-group', | |
bootstrap_servers=['localhost:9092'], | |
value_deserializer=lambda v: json.loads(v.decode('utf-8')), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Add a switch to let us easily switch between methods | |
compmethod = 'topic' | |
# Initialize a simple counter for the message key | |
message_key = 0 | |
# Iterate through file line-by-line | |
with open(file, "r") as file_handle: | |
for line in file_handle: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for message in consumer: | |
mframe = pd.DataFrame(message.value) | |
# Multiply the quantity by the price and store in a new "revenue" column | |
mframe['revenue'] = mframe['Quantity'] * mframe['Price'] | |
# Aggregate the StockCodes in the individual batch by revenue | |
summary = mframe.groupby('StockCode')['revenue'].sum() | |
print(summary) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KafkaConsumer | |
import json | |
import pandas as pd | |
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False) | |
# so that we can re-read them as often as we like. | |
consumer = KafkaConsumer('transactions', | |
group_id='test-consumer-group', | |
bootstrap_servers=['localhost:9092'], | |
value_deserializer=lambda m: json.loads(m.decode('utf-8')), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import packages | |
import pandas as pd | |
import json | |
import datetime as dt | |
from time import sleep | |
from kafka import KafkaProducer | |
# Initialize Kafka Producer Client | |
producer = KafkaProducer(bootstrap_servers=['localhost:9092']) | |
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Set a basic message counter and define the file path | |
counter = 0 | |
file = "online_retail_II.csv" | |
for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10): | |
# For each chunk, convert the invoice date into the correct time format | |
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"]) | |
# Set the counter as the message key |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from kafka import KafkaProducer | |
file = 'access.log' | |
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'], | |
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'), | |
key_serializer=lambda y:str(y).encode("utf-8"), | |
compression_type="gzip") |
NewerOlder