Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import quixstreams as qx | |
import pandas as pd | |
import time | |
from urllib import request | |
# download a sample data.csv. If you want to use your own data file | |
# just comment out line 9 and 10 and place your data.csv | |
# file in the same directory as this file. | |
print("Downloading CSV") | |
f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/data.csv") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for message in consumer: | |
mv = message.value | |
# Load the decoded value as JSON | |
mjson = json.loads(mv) | |
# Print the message key and a selection of data points from the JSON | |
print(f"KEY: {message.key} | " | |
f"VALUE: {datetime.utcfromtimestamp(mjson['time']/1e3)}, | |
{mjson['host']},{mjson['address']} ") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KafkaConsumer | |
import json | |
from datetime import datetime | |
# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False) | |
# so that we can re-read them as often as we like. | |
consumer = KafkaConsumer('nginx-log-producer-compressionv2', | |
group_id='test-consumer-group', | |
bootstrap_servers=['localhost:9092'], | |
value_deserializer=lambda v: json.loads(v.decode('utf-8')), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'], | |
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'), | |
key_serializer=lambda y:str(y).encode("utf-8"), | |
compression_type="gzip", | |
linger_ms=1000, | |
batch_size=10485760) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Add a switch to let us easily switch between methods | |
compmethod = 'topic' | |
# Initialize a simple counter for the message key | |
message_key = 0 | |
# Iterate through file line-by-line | |
with open(file, "r") as file_handle: | |
for line in file_handle: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], | |
value_serializer=lambda x:json.dumps(x,default=str).encode('utf-8'), | |
key_serializer=lambda y:str(y).encode("utf-8")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from kafka import KafkaProducer | |
file = 'access.log' | |
producer_compression = KafkaProducer(bootstrap_servers=['localhost:9092'], | |
value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8'), | |
key_serializer=lambda y:str(y).encode("utf-8"), | |
compression_type="gzip") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for message in consumer: | |
# Get the message body | |
m=message.value | |
# Turn the message back into a DataFrame | |
df=DataFrame(m,index=[0]) | |
# Extract the Message from the DataFrame | |
dff = pd.Series(df['MessageBody']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#1 — Initialize a Consumer for reading the email messages from the emails topic | |
consumer = KafkaConsumer(group_id="python-consumer", | |
bootstrap_servers=['localhost:9092'], | |
auto_offset_reset='earliest',# Always read from beginning | |
enable_auto_commit=False, # Keep Kafka messages as ‘unread’ | |
value_deserializer=lambda x:json.loads(x)) | |
consumer.subscribe("emails") | |
print(f'Initialized Kafka consumer at {dt.datetime.utcnow()}) |
NewerOlder