Skip to content

Instantly share code, notes, and snippets.

@Silvertongue26
Last active May 8, 2022 18:53
Show Gist options
  • Save Silvertongue26/54f0d60151047323b48d44398336d498 to your computer and use it in GitHub Desktop.
Save Silvertongue26/54f0d60151047323b48d44398336d498 to your computer and use it in GitHub Desktop.
Part of zookeeper_kafka_pyspark_polarity-analysis
# Import libraries
from kafka import KafkaConsumer
import json
topic_name = 'TW_ANALYSIS'
# Creata Kafka consumer, same default configuration frome the producer
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
api_version=(2, 0, 2),
# Deserialize the string from the producer since it comes in hex
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
# Message loader from Json
for message in consumer:
tweets = json.loads(json.dumps(message.value))
print(tweets)
@Silvertongue26
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment