Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import json
from time import sleep
from bs4 import BeautifulSoup
from kafka import KafkaConsumer, KafkaProducer
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(key, encoding='utf-8')
value_bytes = bytes(value, encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print('Message published successfully.')
except Exception as ex:
print('Exception in publishing message')
print(str(ex))
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
return _producer
def parse(markup):
title = '-'
submit_by = '-'
description = '-'
calories = 0
ingredients = []
rec = {}
try:
soup = BeautifulSoup(markup, 'lxml')
# title
title_section = soup.select('.recipe-summary__h1')
# submitter
submitter_section = soup.select('.submitter__name')
# description
description_section = soup.select('.submitter__description')
# ingredients
ingredients_section = soup.select('.recipe-ingred_txt')
# calories
calories_section = soup.select('.calorie-count')
if calories_section:
calories = calories_section[0].text.replace('cals', '').strip()
if ingredients_section:
for ingredient in ingredients_section:
ingredient_text = ingredient.text.strip()
if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
ingredients.append({'step': ingredient.text.strip()})
if description_section:
description = description_section[0].text.strip().replace('"', '')
if submitter_section:
submit_by = submitter_section[0].text.strip()
if title_section:
title = title_section[0].text
rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
'ingredients': ingredients}
except Exception as ex:
print('Exception while parsing')
print(str(ex))
finally:
return json.dumps(rec)
if __name__ == '__main__':
print('Running Consumer..')
parsed_records = []
topic_name = 'raw_recipes'
parsed_topic_name = 'parsed_recipes'
consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
for msg in consumer:
html = msg.value
result = parse(html)
parsed_records.append(result)
consumer.close()
sleep(5)
if len(parsed_records) > 0:
print('Publishing records..')
producer = connect_kafka_producer()
for rec in parsed_records:
publish_message(producer, parsed_topic_name, 'parsed', rec)
@Testudinate

This comment has been minimized.

Copy link

commented Jun 5, 2019

Hello ,
Could you please give advice how to commit offset on your example ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.