Skip to content

Instantly share code, notes, and snippets.

@marvin-hansen
Created December 14, 2023 10:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marvin-hansen/ade251236cfea67c9a8c4099842ef155 to your computer and use it in GitHub Desktop.
Save marvin-hansen/ade251236cfea67c9a8c4099842ef155 to your computer and use it in GitHub Desktop.
Most weirdo messaging bug
use std::error::Error;
use std::time::Duration;
use fluvio::{Compression, Fluvio, Offset, RecordKey, TopicProducerConfigBuilder};
const TOPIC: &str = "echo";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send>> {
let produce_handle = tokio::spawn(produce());
let consume_handle = tokio::spawn(consume());
match tokio::try_join!(produce_handle, consume_handle) {
Ok(_) => {}
Err(e) => {
println!(" Failed to start produce_handle and consume_handle: {:?}", e);
}
}
Ok(())
}
/// Produces 10 "Hello, Fluvio" events, followed by a "Done!" event
async fn produce() -> Result<(), Box<dyn Error + Send>> {
let fluvio = Fluvio::connect().await.unwrap();
let config = TopicProducerConfigBuilder::default()
.batch_size(100)
.linger(Duration::from_millis(100))
.compression(Compression::Gzip)
.build()
.expect("Failed to create topic producer config");
let producer = fluvio
.topic_producer_with_config(TOPIC, config)
.await
.expect("Failed to create a producer");
for i in 0..16 {
println!("Sending record {i}");
producer
.send(format!("Key {i}"), format!("Value {i}"))
.await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");
}
println!("Sending record Done!");
producer.send(RecordKey::NULL, "Done!").await.expect("Failed to send Done!");
producer.flush().await.expect("Failed to flush");
println!("Done sending records. Wait a moment to let the consumer catch up.");
wait(Duration::from_millis(500)).await;
Ok(())
}
/// Consumes events until a "Done!" event is read
async fn consume() -> Result<(), Box<dyn Error + Send>> {
use futures::StreamExt;
let consumer = fluvio::consumer(TOPIC, 0).await?;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
let key = record.get_key().map(|key| key.as_utf8_lossy_string());
let value = record.get_value().as_utf8_lossy_string();
println!("Got record: key={key:?}, value={value}");
if value == "Done!" {
return Ok(());
}
}
Ok(())
}
// Wait util
pub async fn wait(duration: std::time::Duration) {
tokio::time::sleep(duration).await
}
Sending record 0
Got record: key=Some("Key 0"), value=Value 0
Got record: key=Some("Key 1"), value=Value 1
Got record: key=Some("Key 2"), value=Value 2
Got record: key=Some("Key 3"), value=Value 3
Got record: key=Some("Key 4"), value=Value 4
Got record: key=Some("Key 5"), value=Value 5
Got record: key=Some("Key 6"), value=Value 6
Got record: key=Some("Key 7"), value=Value 7
Got record: key=Some("Key 8"), value=Value 8
Got record: key=Some("Key 9"), value=Value 9
Got record: key=None, value=Done!
Sending record 1
Sending record 2
Sending record 3
Sending record 4
Sending record 5
Sending record 6
Sending record 7
Sending record 8
Sending record 9
Sending record 10
Sending record 11
Sending record 12
Sending record 13
Sending record 14
Sending record 15
Sending record Done!
Done sending records. Wait a moment to let the consumer catch up.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment