Skip to content

Instantly share code, notes, and snippets.

@Glutexo
Last active August 7, 2018 12:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Glutexo/9cde0d7fff501a4f67ad13bd3e0c3a91 to your computer and use it in GitHub Desktop.
Save Glutexo/9cde0d7fff501a4f67ad13bd3e0c3a91 to your computer and use it in GitHub Desktop.
Kiel produce bug
Traceback (most recent call last):
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/iterables.py", line 24, in drain
yield next_item(iterable)
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/iterables.py", line 17, in next_item
return coll.popitem()
KeyError: 'popitem(): dictionary is empty'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/iterables.py", line 26, in drain
raise StopIteration
StopIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 758, in _run_callback
ret = callback()
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/stack_context.py", line 300, in null_wrapper
return fn(*args, **kwargs)
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
future.result()
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "simple.py", line 41, in produce
yield p.produce(topic, msg)
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
yielded = self.gen.throw(*exc_info)
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/clients/producer.py", line 115, in produce
yield self.flush()
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper
yielded = next(result)
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/clients/producer.py", line 152, in flush
for topic, msgs in drain(self.unsent):
RuntimeError: generator raised StopIteration
from kiel import clients
from logging import basicConfig, debug, DEBUG
from tornado import gen, ioloop
from sys import argv
basicConfig(level=DEBUG)
brokers = ["kafka:29092"]
topic = "my_topic"
@gen.coroutine
def consume():
c = clients.SingleConsumer(brokers=brokers)
debug("*connecting")
yield c.connect()
debug("*connected")
while True:
debug("*waiting for messages")
msgs = yield c.consume(topic)
debug("*received messages")
for msg in msgs:
print(msg)
@gen.coroutine
def produce():
p = clients.Producer(brokers=brokers)
debug("*connecting")
yield p.connect()
debug("*connected")
while True:
debug("*waiting for message")
msg = input()
debug("*received message")
debug("*publishing message")
yield p.produce(topic, msg)
debug("*published message")
def run(function_name):
loop = ioloop.IOLoop.instance()
function_callable = globals()[function_name]
loop.add_callback(function_callable)
try:
loop.start()
except KeyboardInterrupt:
loop.stop()
if __name__ == "__main__":
run(argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment