Skip to content

Instantly share code, notes, and snippets.

@Glutexo
Last active August 28, 2018 08:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Glutexo/7cc6442a42d892c9e9db746506befc26 to your computer and use it in GitHub Desktop.
Save Glutexo/7cc6442a42d892c9e9db746506befc26 to your computer and use it in GitHub Desktop.
Kiel consume bug
# This is a simple tornado application that produces to and consumes from a single
# Kafka topic. It works as follows:
# * inputer is fired whenever new data are entered to stdin. Assumes that this
# data is in JSON format. Decodes them and puts them on a local produce_queue.
# * producer connects to Kafka and watches for new items to appear in a local
# produce_queue. Produces those items to Kafka.
# * consumer connects to Kafka and reads messages from it. It does nothing with
# them, just retrieving them to demonstrate the bug.
#
# Usage:
# * Spin up a Kafka server on kafka:29029.
# * Create a test topic in Kafka.
# * Create a Python 3 virtualenv.
# * Install tornado and kiel.
# * Run this app with `python app.py`.
# * Enter some valid JSON message to the stdin, e.g. "abc" or {"a": "b"}.
# * See a json.decoder.JSONDecodeError being raised from
# /kiel/clients/consumer.py:deserialize_messages.
import asyncio
import collections
import tornado.web
from sys import stdin
from json import loads
from kiel import clients, exc
from tornado.ioloop import IOLoop
TOPIC = "test"
# Message Queue
MQ = ['kafka:29092']
# message queues
mqp = clients.Producer(MQ)
mqc = clients.SingleConsumer(MQ)
# local queue for pushing items into kafka, this queue fills up if kafka goes down
produce_queue = collections.deque([], 999)
def inputer(f, events):
data = loads(f.readline())
produce_queue.appendleft(data)
async def consumer():
"""Consume indefinitely from the 'test' queue."""
connected = False
while True:
# If not connected, attempt to connect...
if not connected:
try:
await mqc.connect()
connected = True
except exc.NoBrokersError:
await asyncio.sleep(5)
continue
# Consume
try:
msgs = await mqc.consume(TOPIC)
# print(msgs)
except exc.NoBrokersError:
connected = False
async def producer():
connected = False
while True:
# If not connected to kafka, attempt to connect...
if not connected:
try:
await mqp.connect()
connected = True
except exc.NoBrokersError:
await asyncio.sleep(5)
continue
# Pull items off our queue to produce
if len(produce_queue) == 0:
await asyncio.sleep(0.01)
continue
for _ in range(0, len(produce_queue)):
msg = produce_queue.popleft()
try:
await mqp.produce(TOPIC, msg)
except exc.NoBrokersError:
connected = False
# Put the item back on the queue so we can push it when we reconnect
produce_queue.appendleft(msg)
app = tornado.web.Application([])
def main():
app.listen(8888)
loop = IOLoop.current()
loop.add_handler(stdin, inputer, IOLoop.READ)
loop.spawn_callback(consumer)
loop.spawn_callback(producer)
try:
loop.start()
except KeyboardInterrupt:
loop.stop()
if __name__ == "__main__":
main()
(venv) $ python app.py
"a"
ERROR:kiel.clients.consumer:Error deserializing message: ''''
Traceback (most recent call last):
File "/Users/stomsa/dev/kiel-consume-bug/venv/lib/python3.6/site-packages/kiel/clients/consumer.py", line 189, in deserialize_messages
value = self.deserializer(msg.value)
File "/Users/stomsa/.pyenv/versions/3.6.6/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/Users/stomsa/.pyenv/versions/3.6.6/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/Users/stomsa/.pyenv/versions/3.6.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment