Skip to content

Instantly share code, notes, and snippets.

@luminoso
Last active January 21, 2020 00:17
Show Gist options
  • Save luminoso/0e1749cd864d31354038aeb47e3be94e to your computer and use it in GitHub Desktop.
Save luminoso/0e1749cd864d31354038aeb47e3be94e to your computer and use it in GitHub Desktop.
kafka producer send stats callback ensure message delivery
stats = {
"errors": 0,
"successes": 0,
"min_offset": None,
"max_offset": None,
"errorTypes": {}
}
def errback_listener(error, **kargs):
if type(error).__name__ not in kargs["stats"]["errorTypes"]:
kargs["stats"]["errorTypes"][type(error).__name__] = 0
kargs["stats"]["errorTypes"][type(error).__name__] += 1
kargs["stats"]["errors"] += 1
future = producer.send(task["kafka"]["topic"], key=bytes('no_partition', encoding='utf-8'), value=bytes(json.dumps(record), encoding='utf-8'))
future.add_errback(errback_listener,stats=stats)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment