Skip to content

Instantly share code, notes, and snippets.

@edenhill
Last active June 3, 2016 13:12
Show Gist options
  • Save edenhill/9bfa5d2c3d1d05ba6ce78fadee6612ac to your computer and use it in GitHub Desktop.
Save edenhill/9bfa5d2c3d1d05ba6ce78fadee6612ac to your computer and use it in GitHub Desktop.
/**
* Handle delivery reports
*/
static void handle_drs (rd_kafka_event_t *rkev) {
const rd_kafka_message_t *rkmessage;
while ((rkmessage = rd_kafka_event_message(rkev))) {
TEST_SAYL(3,"Got rkmessage %s [%"PRId32"] @ %"PRId64": %s\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_err2str(rkmessage->err));
if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
TEST_FAIL("Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
}
}
int main_0039_event (int argc, char **argv) {
/* ......... */
/* Wait for messages to be delivered */
TIMING_START(&t_delivery, "DELIVERY");
while (rd_kafka_outq_len(rk) > 0) {
rd_kafka_event_t *rkev;
rkev = rd_kafka_event_poll(rk, 100);
switch (rd_kafka_event_type(rkev))
{
case RD_KAFKA_EVENT_DR:
TEST_SAY("%s event with %zd messages\n",
rd_kafka_event_name(rkev),
rd_kafka_event_message_count(rkev));
handle_drs(rkev);
break;
default:
TEST_SAY("Unhandled event: %s\n",
rd_kafka_event_name(rkev));
break;
}
rd_kafka_event_destroy(rkev);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment