Skip to content

Instantly share code, notes, and snippets.

@jrzaurin
Last active March 3, 2019 15:37
Show Gist options
  • Save jrzaurin/ffdcdd6600b5d36548e87d8053c5474b to your computer and use it in GitHub Desktop.
Save jrzaurin/ffdcdd6600b5d36548e87d8053c5474b to your computer and use it in GitHub Desktop.
df_test = pd.read_csv(PATH/'adult.test')
df_test['json'] = df_test.apply(lambda x: x.to_json(), axis=1)
messages = df_test.json.tolist()
def start_producing():
producer = KafkaProducer(bootstrap_servers=KAFKA_HOST)
for i in range(200):
message_id = str(uuid.uuid4())
message = {'request_id': message_id, 'data': json.loads(messages[i])}
producer.send('app_messages', json.dumps(message).encode('utf-8'))
producer.flush()
print("\033[1;31;40m -- PRODUCER: Sent message with id {}".format(message_id))
sleep(2)
def start_consuming():
consumer = KafkaConsumer('app_messages', bootstrap_servers=KAFKA_HOST)
for msg in consumer:
message = json.loads(msg.value)
if 'prediction' in message:
request_id = message['request_id']
print("\033[1;32;40m ** CONSUMER: Received prediction {} for request id {}".format(message['prediction'], request_id))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment