Skip to content

Instantly share code, notes, and snippets.

@mlowicki
Created March 14, 2024 09:46
Show Gist options
  • Save mlowicki/c3b942f5545faced93dc414e01a2da70 to your computer and use it in GitHub Desktop.
Save mlowicki/c3b942f5545faced93dc414e01a2da70 to your computer and use it in GitHub Desktop.
use anyhow::Result;
use clap::Parser;
use rand::{
distributions::{Alphanumeric, DistString},
Rng,
};
use rdkafka::{
config::ClientConfig,
consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext},
error::KafkaError,
ClientContext, Offset, TopicPartitionList,
};
use std::sync::Arc;
use tokio::{
self,
time::{sleep, Duration},
};
struct RdKafkaContext {}
impl ClientContext for RdKafkaContext {
fn error(&self, error: KafkaError, reason: &str) {
println!("{error:?}: {reason}");
}
}
impl ConsumerContext for RdKafkaContext {
fn commit_callback(
&self,
result: rdkafka::error::KafkaResult<()>,
_offsets: &TopicPartitionList,
) {
match result {
Err(ref e) => {
println!("commit error: {e:?}");
}
Ok(_) => {
println!("commit successful");
}
};
}
}
#[derive(Parser, Default, Debug)]
#[clap(
about = "Tool for play around with Kafka Commit API and its wrappers offered by e.g. librdkafka"
)]
struct Arguments {
#[clap(short, long)]
bootstrap: String,
#[clap(short, long)]
topic: String,
#[clap(short, long, default_value_t = 0)]
partition: i32,
#[clap(long, default_value_t = 1)]
partitions: usize,
#[clap(short, long, default_value_t = 0)]
offset: i64,
#[clap(long, default_value_t = 0)]
metadata_min: usize,
#[clap(long, default_value_t = 0)]
metadata_max: usize,
#[clap(short, long)]
group: String,
#[clap(short, long, default_value_t = 1000 * 10)]
sleep: u64,
#[clap(long, default_value_t = false)]
fetch: bool,
#[clap(short, long, default_value_t = 1000 * 10)]
fetch_timeout: u64,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Arguments::parse();
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", args.bootstrap.clone())
.set("group.id", args.group.clone());
println!("config: {:#?}\n", config);
let consumer: Arc<BaseConsumer<RdKafkaContext>> =
Arc::new(config.create_with_context(RdKafkaContext {})?);
if args.fetch {
tokio::task::spawn_blocking({
let consumer = consumer.clone();
let mut topic_partition_list = TopicPartitionList::with_capacity(args.partitions);
for offset in 0..args.partitions {
topic_partition_list
.add_partition(&args.topic.clone(), args.partition + offset as i32);
}
move || -> Result<()> {
println!("fetching committed offsets..");
let committed_offsets = consumer.committed_offsets(
topic_partition_list,
Duration::from_millis(args.fetch_timeout),
)?;
println!("committed offsets: {committed_offsets:?}\n");
Ok(())
}
})
.await??;
}
let mut topic_partition_list = TopicPartitionList::with_capacity(args.partitions);
for offset in 0..args.partitions {
let mut tp =
topic_partition_list.add_partition(&args.topic.clone(), args.partition + offset as i32);
if args.metadata_max.saturating_sub(args.metadata_min) > 0 {
let mut rng = rand::thread_rng();
let metadata_size = rng.gen_range(args.metadata_min..args.metadata_max);
let metadata = Alphanumeric.sample_string(&mut rng, metadata_size);
println!("metadata ({} bytes)", metadata.len());
tp.set_metadata(metadata);
}
tp.set_offset(Offset::Offset(args.offset))?;
}
assert!(topic_partition_list.count() == args.partitions);
consumer.commit(&topic_partition_list, CommitMode::Async)?;
println!("\ncommit triggered");
tokio::spawn({
let consumer = consumer.clone();
async move {
loop {
if let Some(Err(e)) = consumer.poll(Duration::ZERO) {
println!("unexpected error returned by `consumer.poll()`: {e:?}");
}
sleep(Duration::from_secs(1)).await;
}
}
});
sleep(Duration::from_millis(args.sleep)).await;
println!("done");
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment