Created
December 14, 2023 10:21
-
-
Save marvin-hansen/ade251236cfea67c9a8c4099842ef155 to your computer and use it in GitHub Desktop.
Most weirdo messaging bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::error::Error; | |
use std::time::Duration; | |
use fluvio::{Compression, Fluvio, Offset, RecordKey, TopicProducerConfigBuilder}; | |
const TOPIC: &str = "echo"; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn Error + Send>> { | |
let produce_handle = tokio::spawn(produce()); | |
let consume_handle = tokio::spawn(consume()); | |
match tokio::try_join!(produce_handle, consume_handle) { | |
Ok(_) => {} | |
Err(e) => { | |
println!(" Failed to start produce_handle and consume_handle: {:?}", e); | |
} | |
} | |
Ok(()) | |
} | |
/// Produces 10 "Hello, Fluvio" events, followed by a "Done!" event | |
async fn produce() -> Result<(), Box<dyn Error + Send>> { | |
let fluvio = Fluvio::connect().await.unwrap(); | |
let config = TopicProducerConfigBuilder::default() | |
.batch_size(100) | |
.linger(Duration::from_millis(100)) | |
.compression(Compression::Gzip) | |
.build() | |
.expect("Failed to create topic producer config"); | |
let producer = fluvio | |
.topic_producer_with_config(TOPIC, config) | |
.await | |
.expect("Failed to create a producer"); | |
for i in 0..16 { | |
println!("Sending record {i}"); | |
producer | |
.send(format!("Key {i}"), format!("Value {i}")) | |
.await.expect("Failed to send record"); | |
producer.flush().await.expect("Failed to flush"); | |
} | |
println!("Sending record Done!"); | |
producer.send(RecordKey::NULL, "Done!").await.expect("Failed to send Done!"); | |
producer.flush().await.expect("Failed to flush"); | |
println!("Done sending records. Wait a moment to let the consumer catch up."); | |
wait(Duration::from_millis(500)).await; | |
Ok(()) | |
} | |
/// Consumes events until a "Done!" event is read | |
async fn consume() -> Result<(), Box<dyn Error + Send>> { | |
use futures::StreamExt; | |
let consumer = fluvio::consumer(TOPIC, 0).await?; | |
let mut stream = consumer.stream(Offset::beginning()).await?; | |
while let Some(Ok(record)) = stream.next().await { | |
let key = record.get_key().map(|key| key.as_utf8_lossy_string()); | |
let value = record.get_value().as_utf8_lossy_string(); | |
println!("Got record: key={key:?}, value={value}"); | |
if value == "Done!" { | |
return Ok(()); | |
} | |
} | |
Ok(()) | |
} | |
// Wait util | |
pub async fn wait(duration: std::time::Duration) { | |
tokio::time::sleep(duration).await | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sending record 0 | |
Got record: key=Some("Key 0"), value=Value 0 | |
Got record: key=Some("Key 1"), value=Value 1 | |
Got record: key=Some("Key 2"), value=Value 2 | |
Got record: key=Some("Key 3"), value=Value 3 | |
Got record: key=Some("Key 4"), value=Value 4 | |
Got record: key=Some("Key 5"), value=Value 5 | |
Got record: key=Some("Key 6"), value=Value 6 | |
Got record: key=Some("Key 7"), value=Value 7 | |
Got record: key=Some("Key 8"), value=Value 8 | |
Got record: key=Some("Key 9"), value=Value 9 | |
Got record: key=None, value=Done! | |
Sending record 1 | |
Sending record 2 | |
Sending record 3 | |
Sending record 4 | |
Sending record 5 | |
Sending record 6 | |
Sending record 7 | |
Sending record 8 | |
Sending record 9 | |
Sending record 10 | |
Sending record 11 | |
Sending record 12 | |
Sending record 13 | |
Sending record 14 | |
Sending record 15 | |
Sending record Done! | |
Done sending records. Wait a moment to let the consumer catch up. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment