Skip to content

Instantly share code, notes, and snippets.

@sabine
Created December 30, 2021 17:41
Show Gist options
  • Save sabine/83c8e8360bd71cfe263d37b67f529db5 to your computer and use it in GitHub Desktop.
Save sabine/83c8e8360bd71cfe263d37b67f529db5 to your computer and use it in GitHub Desktop.
fn main() {
let nc = match nats::connect("127.0.0.1:4222") {
Ok(nc) => nc,
Err(err) => panic!("Error: {}", err),
};
nc.delete_stream("file_processing");
let stream = nats::jetstream::StreamConfig {
name: "file_processing".to_string(),
subjects: Some(vec!["resize_image".to_string()]),
retention: nats::jetstream::RetentionPolicy::Limits,
max_consumers: -1,
storage: nats::jetstream::StorageType::File,
..nats::jetstream::StreamConfig::default()
};
let resp = match nc.create_stream(stream) {
Ok(resp) => resp,
Err(err) => panic!("Error: {}", err),
};
println!("Response: {:?}", resp);
for n in 1..5 {
let payload = format!("resize_image:{}", n);
match nc.publish("resize_image", &payload) {
Ok(_) => {
println!("Published: {}", payload)
}
Err(err) => panic!("Error: {}", err),
};
}
let consumer = nats::jetstream::ConsumerConfig {
deliver_subject: None,
durable_name: Some("file_processing".to_string()),
deliver_policy: nats::jetstream::DeliverPolicy::All,
..nats::jetstream::ConsumerConfig::default()
};
let mut consumer = nc.create_consumer("file_processing", consumer).unwrap();
println!("created consumer");
loop {
let msg = consumer.process_timeout(|msg| {
println!("got message {:?}", msg);
msg.double_ack(nats::jetstream::AckKind::Ack);
Ok(msg.data.to_vec())
});
println!("msg: {:?}", msg);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment