Created
March 14, 2024 09:46
-
-
Save mlowicki/c3b942f5545faced93dc414e01a2da70 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Result; | |
use clap::Parser; | |
use rand::{ | |
distributions::{Alphanumeric, DistString}, | |
Rng, | |
}; | |
use rdkafka::{ | |
config::ClientConfig, | |
consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext}, | |
error::KafkaError, | |
ClientContext, Offset, TopicPartitionList, | |
}; | |
use std::sync::Arc; | |
use tokio::{ | |
self, | |
time::{sleep, Duration}, | |
}; | |
struct RdKafkaContext {} | |
impl ClientContext for RdKafkaContext { | |
fn error(&self, error: KafkaError, reason: &str) { | |
println!("{error:?}: {reason}"); | |
} | |
} | |
impl ConsumerContext for RdKafkaContext { | |
fn commit_callback( | |
&self, | |
result: rdkafka::error::KafkaResult<()>, | |
_offsets: &TopicPartitionList, | |
) { | |
match result { | |
Err(ref e) => { | |
println!("commit error: {e:?}"); | |
} | |
Ok(_) => { | |
println!("commit successful"); | |
} | |
}; | |
} | |
} | |
#[derive(Parser, Default, Debug)] | |
#[clap( | |
about = "Tool for play around with Kafka Commit API and its wrappers offered by e.g. librdkafka" | |
)] | |
struct Arguments { | |
#[clap(short, long)] | |
bootstrap: String, | |
#[clap(short, long)] | |
topic: String, | |
#[clap(short, long, default_value_t = 0)] | |
partition: i32, | |
#[clap(long, default_value_t = 1)] | |
partitions: usize, | |
#[clap(short, long, default_value_t = 0)] | |
offset: i64, | |
#[clap(long, default_value_t = 0)] | |
metadata_min: usize, | |
#[clap(long, default_value_t = 0)] | |
metadata_max: usize, | |
#[clap(short, long)] | |
group: String, | |
#[clap(short, long, default_value_t = 1000 * 10)] | |
sleep: u64, | |
#[clap(long, default_value_t = false)] | |
fetch: bool, | |
#[clap(short, long, default_value_t = 1000 * 10)] | |
fetch_timeout: u64, | |
} | |
#[tokio::main] | |
async fn main() -> Result<()> { | |
let args = Arguments::parse(); | |
let mut config = ClientConfig::new(); | |
config | |
.set("bootstrap.servers", args.bootstrap.clone()) | |
.set("group.id", args.group.clone()); | |
println!("config: {:#?}\n", config); | |
let consumer: Arc<BaseConsumer<RdKafkaContext>> = | |
Arc::new(config.create_with_context(RdKafkaContext {})?); | |
if args.fetch { | |
tokio::task::spawn_blocking({ | |
let consumer = consumer.clone(); | |
let mut topic_partition_list = TopicPartitionList::with_capacity(args.partitions); | |
for offset in 0..args.partitions { | |
topic_partition_list | |
.add_partition(&args.topic.clone(), args.partition + offset as i32); | |
} | |
move || -> Result<()> { | |
println!("fetching committed offsets.."); | |
let committed_offsets = consumer.committed_offsets( | |
topic_partition_list, | |
Duration::from_millis(args.fetch_timeout), | |
)?; | |
println!("committed offsets: {committed_offsets:?}\n"); | |
Ok(()) | |
} | |
}) | |
.await??; | |
} | |
let mut topic_partition_list = TopicPartitionList::with_capacity(args.partitions); | |
for offset in 0..args.partitions { | |
let mut tp = | |
topic_partition_list.add_partition(&args.topic.clone(), args.partition + offset as i32); | |
if args.metadata_max.saturating_sub(args.metadata_min) > 0 { | |
let mut rng = rand::thread_rng(); | |
let metadata_size = rng.gen_range(args.metadata_min..args.metadata_max); | |
let metadata = Alphanumeric.sample_string(&mut rng, metadata_size); | |
println!("metadata ({} bytes)", metadata.len()); | |
tp.set_metadata(metadata); | |
} | |
tp.set_offset(Offset::Offset(args.offset))?; | |
} | |
assert!(topic_partition_list.count() == args.partitions); | |
consumer.commit(&topic_partition_list, CommitMode::Async)?; | |
println!("\ncommit triggered"); | |
tokio::spawn({ | |
let consumer = consumer.clone(); | |
async move { | |
loop { | |
if let Some(Err(e)) = consumer.poll(Duration::ZERO) { | |
println!("unexpected error returned by `consumer.poll()`: {e:?}"); | |
} | |
sleep(Duration::from_secs(1)).await; | |
} | |
} | |
}); | |
sleep(Duration::from_millis(args.sleep)).await; | |
println!("done"); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment