Last active
August 7, 2018 12:14
-
-
Save Glutexo/9cde0d7fff501a4f67ad13bd3e0c3a91 to your computer and use it in GitHub Desktop.
Kiel produce bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Traceback (most recent call last): | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/iterables.py", line 24, in drain | |
yield next_item(iterable) | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/iterables.py", line 17, in next_item | |
return coll.popitem() | |
KeyError: 'popitem(): dictionary is empty' | |
During handling of the above exception, another exception occurred: | |
Traceback (most recent call last): | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/iterables.py", line 26, in drain | |
raise StopIteration | |
StopIteration | |
The above exception was the direct cause of the following exception: | |
Traceback (most recent call last): | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 758, in _run_callback | |
ret = callback() | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/stack_context.py", line 300, in null_wrapper | |
return fn(*args, **kwargs) | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 779, in _discard_future_result | |
future.result() | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run | |
yielded = self.gen.throw(*exc_info) | |
File "simple.py", line 41, in produce | |
yield p.produce(topic, msg) | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run | |
value = future.result() | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run | |
yielded = self.gen.throw(*exc_info) | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/clients/producer.py", line 115, in produce | |
yield self.flush() | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run | |
value = future.result() | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper | |
yielded = next(result) | |
File "/Users/Glutexo/dev/my_repo/venv/lib/python3.7/site-packages/kiel/clients/producer.py", line 152, in flush | |
for topic, msgs in drain(self.unsent): | |
RuntimeError: generator raised StopIteration |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kiel import clients | |
from logging import basicConfig, debug, DEBUG | |
from tornado import gen, ioloop | |
from sys import argv | |
basicConfig(level=DEBUG) | |
brokers = ["kafka:29092"] | |
topic = "my_topic" | |
@gen.coroutine | |
def consume(): | |
c = clients.SingleConsumer(brokers=brokers) | |
debug("*connecting") | |
yield c.connect() | |
debug("*connected") | |
while True: | |
debug("*waiting for messages") | |
msgs = yield c.consume(topic) | |
debug("*received messages") | |
for msg in msgs: | |
print(msg) | |
@gen.coroutine | |
def produce(): | |
p = clients.Producer(brokers=brokers) | |
debug("*connecting") | |
yield p.connect() | |
debug("*connected") | |
while True: | |
debug("*waiting for message") | |
msg = input() | |
debug("*received message") | |
debug("*publishing message") | |
yield p.produce(topic, msg) | |
debug("*published message") | |
def run(function_name): | |
loop = ioloop.IOLoop.instance() | |
function_callable = globals()[function_name] | |
loop.add_callback(function_callable) | |
try: | |
loop.start() | |
except KeyboardInterrupt: | |
loop.stop() | |
if __name__ == "__main__": | |
run(argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment