Skip to content

Instantly share code, notes, and snippets.

@vlovich
Created November 17, 2022 21:57
Show Gist options
  • Save vlovich/fddbd15c52a3b86648688e2fc3d66e30 to your computer and use it in GitHub Desktop.
Save vlovich/fddbd15c52a3b86648688e2fc3d66e30 to your computer and use it in GitHub Desktop.
glommio deadlock under valgrind
use std::rc::Rc;
use futures_util::future::join_all;
use glommio::*;
#[derive(Copy, Clone, Debug)]
enum Message {
Ping,
Pong,
}
fn main() {
let use_mesh = true;
let mesh = channels::channel_mesh::MeshBuilder::full(2, 1);
LocalExecutorPoolBuilder::new(glommio::PoolPlacement::MaxSpread(
mesh.nr_peers(),
CpuSet::online().ok(),
))
.on_all_shards(enclose!((mesh) move || async move {
println!("Started thread {}", executor().id());
if use_mesh {
let (senders, mut receivers) = mesh.join().await.unwrap();
let senders = Rc::new(senders);
let mut readers = Vec::new();
for (peer, stream) in receivers.streams() {
let senders = senders.clone();
readers.push(spawn_local(async move {
loop {
let msg = stream.recv().await;
match msg {
Some(msg) => {
match msg {
Message::Ping => {
println!("{}: Responding pong to {}", senders.peer_id(), peer);
senders.as_ref().send_to(peer, Message::Pong).await.unwrap();
let mut next_peer = (peer + 1) % senders.nr_consumers();
if next_peer == senders.peer_id() {
next_peer = (next_peer + 1) % senders.nr_consumers();
}
println!("{}: Sending ping to {}", senders.peer_id(), next_peer);
if let Err(e) = senders.as_ref().send_to(next_peer, Message::Ping).await {
println!("{}: Stopping - {:?}", senders.peer_id(), e);
return
}
},
Message::Pong => return,
}
},
None => return,
}
}
}));
}
if senders.peer_id() == 0 {
println!("Initializing ping from peer 0");
senders.send_to(1, Message::Ping).await.unwrap()
}
println!("Thread {} task results: {:?}", executor().id(), join_all(readers).await);
}
println!("Terminating thread {}", executor().id());
}))
.unwrap()
.join_all();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment