Skip to content

Instantly share code, notes, and snippets.

@edenhill
edenhill / latest_msg_consumer.py
Last active April 7, 2020 16:50
Start consumer at latest message
#!/usr/bin/env python
from confluent_kafka import Consumer, KafkaException, KafkaError, OFFSET_END
import sys
if __name__ == '__main__':
broker = "localhost:51895"
topics = ["test"]
group = "mygroup"
@edenhill
edenhill / buffers.h
Created May 25, 2017 11:22
buffers?
/**
* @brief Buffer segment
*/
typedef struct rd_segment_s {
TAILQ_ENTRY(rd_segment_s) seg_link; /*<< rbuf_segments Link */
char *seg_p; /**< Backing-store memory */
size_t seg_of; /**< Current relative write-position
* (length of payload in this segment) */
size_t seg_size; /**< Allocated size of seg_p */
@edenhill
edenhill / tailing_consumer.go
Created May 11, 2017 07:22
confluent-kafka-go example to start consuming 5 messages from the end (tail 5)
// Example function-based high-level Apache Kafka consumer
package main
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
@edenhill
edenhill / ssl.go
Created March 8, 2017 15:07
golang client SSL config
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": group,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"security.protocol": "ssl",
"ssl.ca.location": "/path/to/ca-cert-file",
"ssl.certificate.location": "/path/to/client.pem",
"ssl.key.location": "/path/to/client.key",
//"ssl.key.password": "maybe_a_password_if_needed",
@edenhill
edenhill / producev.c
Last active December 14, 2016 16:22
rd_kafka_producev() var-arg example
/*!
* Partition (int32_t)
*/
#define RD_KAFKA_V_PARTITION(partition) \
_LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), \
(int32_t)partition
/*!
* Message value/payload pointer and length (void *, size_t)
*/
#define RD_KAFKA_V_VALUE(VALUE,LEN) \
If your processing rate is high this might not be optimal since you'll be committing for each message, even if using async commits there's some degree of performance penalty.
So another alternative is to keep `enable.auto.commit` set to True (default) but disable the automatic offset store.
So what is the offset store?
Each time a message is passed from the client to your application its offset is stored for future commit, the next intervalled commit will then use this stored offset. If the stored offset did not change from the last commit nothing happens.
So by setting `enable.auto.offset.store` to False you keep the convenient intervalled auto commit behaviour but you control what offsets are actually eligible for commit.
@edenhill
edenhill / lrk.h
Created November 16, 2016 18:57
compile time type checking
#define _LRK_TYPECHECK(RET,TYPE,ARG) \
({ \
if (0) { \
TYPE __t RD_UNUSED = (ARG); \
} \
RET; \
})
#define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \
({ \
if (0) { \
@edenhill
edenhill / testsocket.c
Created October 26, 2016 09:14
Test sockets
/**
* @brief Test sockets
*
* By plugging in to librdkafka's socket_cb and connect_cb the test framework
* adds an interim socket between the socket exposed to librdkafka (tsk_fd)
* and the socket connecting to the broker (tsk_intfd).
* A thread is created to pass data between the two sockets according
* to test parameters.
* This allows the following network simulations:
* - connection close (abrupt or timeout)
@edenhill
edenhill / str_rep.c
Last active September 15, 2016 20:25
str reps
add_str_rep (const char *typestr, const char *rep (void *obj)) {
LIST_INSERT..(&str_reps, {typestr, rep});
}
const char *to_str (const char *typestr, void *obj) {
str_rep = LIST_FIND(&str_reps, trim_stuff(typestr); /* remove "const", "*", etc.. */));
if (!str_rep)
return rsprintf("(%s)%p", typestr, obj);
{
rd_ts_t r = 1234;
if (rkb->rkb_rk->rk_conf.api_version_request &&
(r = rd_interval(&rkb->rkb_ApiVersion_fail_intvl, 0, 0)) > 0) {
/* Use ApiVersion to query broker for supported API versions. */
rd_rkb_dbg(rkb, PROTOCOL, "X", "Enabling ApiVersion");
rd_kafka_broker_feature_enable(rkb, RD_KAFKA_FEATURE_APIVERSION);
} else {
rd_rkb_dbg(rkb, PROTOCOL, "X", "Not enabling: %"PRId64, r);
}