Last active
August 28, 2018 08:51
-
-
Save Glutexo/7cc6442a42d892c9e9db746506befc26 to your computer and use it in GitHub Desktop.
Kiel consume bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a simple tornado application that produces to and consumes from a single | |
# Kafka topic. It works as follows: | |
# * inputer is fired whenever new data are entered to stdin. Assumes that this | |
# data is in JSON format. Decodes them and puts them on a local produce_queue. | |
# * producer connects to Kafka and watches for new items to appear in a local | |
# produce_queue. Produces those items to Kafka. | |
# * consumer connects to Kafka and reads messages from it. It does nothing with | |
# them, just retrieving them to demonstrate the bug. | |
# | |
# Usage: | |
# * Spin up a Kafka server on kafka:29029. | |
# * Create a test topic in Kafka. | |
# * Create a Python 3 virtualenv. | |
# * Install tornado and kiel. | |
# * Run this app with `python app.py`. | |
# * Enter some valid JSON message to the stdin, e.g. "abc" or {"a": "b"}. | |
# * See a json.decoder.JSONDecodeError being raised from | |
# /kiel/clients/consumer.py:deserialize_messages. | |
import asyncio | |
import collections | |
import tornado.web | |
from sys import stdin | |
from json import loads | |
from kiel import clients, exc | |
from tornado.ioloop import IOLoop | |
TOPIC = "test" | |
# Message Queue | |
MQ = ['kafka:29092'] | |
# message queues | |
mqp = clients.Producer(MQ) | |
mqc = clients.SingleConsumer(MQ) | |
# local queue for pushing items into kafka, this queue fills up if kafka goes down | |
produce_queue = collections.deque([], 999) | |
def inputer(f, events): | |
data = loads(f.readline()) | |
produce_queue.appendleft(data) | |
async def consumer(): | |
"""Consume indefinitely from the 'test' queue.""" | |
connected = False | |
while True: | |
# If not connected, attempt to connect... | |
if not connected: | |
try: | |
await mqc.connect() | |
connected = True | |
except exc.NoBrokersError: | |
await asyncio.sleep(5) | |
continue | |
# Consume | |
try: | |
msgs = await mqc.consume(TOPIC) | |
# print(msgs) | |
except exc.NoBrokersError: | |
connected = False | |
async def producer(): | |
connected = False | |
while True: | |
# If not connected to kafka, attempt to connect... | |
if not connected: | |
try: | |
await mqp.connect() | |
connected = True | |
except exc.NoBrokersError: | |
await asyncio.sleep(5) | |
continue | |
# Pull items off our queue to produce | |
if len(produce_queue) == 0: | |
await asyncio.sleep(0.01) | |
continue | |
for _ in range(0, len(produce_queue)): | |
msg = produce_queue.popleft() | |
try: | |
await mqp.produce(TOPIC, msg) | |
except exc.NoBrokersError: | |
connected = False | |
# Put the item back on the queue so we can push it when we reconnect | |
produce_queue.appendleft(msg) | |
app = tornado.web.Application([]) | |
def main(): | |
app.listen(8888) | |
loop = IOLoop.current() | |
loop.add_handler(stdin, inputer, IOLoop.READ) | |
loop.spawn_callback(consumer) | |
loop.spawn_callback(producer) | |
try: | |
loop.start() | |
except KeyboardInterrupt: | |
loop.stop() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(venv) $ python app.py | |
"a" | |
ERROR:kiel.clients.consumer:Error deserializing message: '''' | |
Traceback (most recent call last): | |
File "/Users/stomsa/dev/kiel-consume-bug/venv/lib/python3.6/site-packages/kiel/clients/consumer.py", line 189, in deserialize_messages | |
value = self.deserializer(msg.value) | |
File "/Users/stomsa/.pyenv/versions/3.6.6/lib/python3.6/json/__init__.py", line 354, in loads | |
return _default_decoder.decode(s) | |
File "/Users/stomsa/.pyenv/versions/3.6.6/lib/python3.6/json/decoder.py", line 339, in decode | |
obj, end = self.raw_decode(s, idx=_w(s, 0).end()) | |
File "/Users/stomsa/.pyenv/versions/3.6.6/lib/python3.6/json/decoder.py", line 357, in raw_decode | |
raise JSONDecodeError("Expecting value", s, err.value) from None | |
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment