Skip to content

Instantly share code, notes, and snippets.

@avantgardnerio
Created December 3, 2022 00:31
Show Gist options
  • Save avantgardnerio/cdf54a95d7afb32b66ee67cd03cfe184 to your computer and use it in GitHub Desktop.
Save avantgardnerio/cdf54a95d7afb32b66ee67cd03cfe184 to your computer and use it in GitHub Desktop.
#[tokio::test]
async fn multi_node_test() {
let base_dir = env::temp_dir().join("coldb");
let _ = fs::remove_dir_all(&base_dir);
fs::create_dir_all(&base_dir).unwrap();
let node_cnt = 5;
let mut futures: FuturesUnordered<BoxFuture<_>> = FuturesUnordered::new();
let eps: HashMap<NodeId, String> = (1..=node_cnt)
.map(|id| {
let path = base_dir.join(id.to_string());
let path = path.to_str().unwrap().to_string();
(id, path)
})
.collect();
// Have to create all sockets before running any server
let sockets: HashMap<NodeId, UnixListener> = eps
.iter()
.map(|(id, path)| {
let uds = UnixListener::bind(path.clone()).unwrap();
(*id, uds)
})
.collect();
let mut services: HashMap<NodeId, Arc<ColDbInstance>> = HashMap::new();
for (id, uds) in sockets.into_iter() {
let service = Arc::new(ColDbInstance::new(id, eps.clone()).await.unwrap());
futures.push(Box::pin(serve(service.clone(), uds)));
services.insert(id, service);
}
let voter_count = min(eps.len(), 4);
let node_ids: Vec<_> = eps.keys().sorted().cloned().collect();
let voter_ids: Vec<_> = node_ids[..voter_count].iter().cloned().collect();
let test = async move {
println!("Test: Waiting for a leader...");
let mut leader_id = 0u64;
for id in voter_ids.iter() {
let service = services.get(id).unwrap();
leader_id = service.wait_for_leader().await.unwrap();
println!("Test: Node {}: has leader {}", id, leader_id);
}
let leader = services.get(&leader_id).unwrap();
let metrics = leader.get_metrics();
println!("Test: Cluster is stable metrics={:?}", metrics);
sleep(Duration::from_millis(500)).await;
for id in node_ids[voter_count..node_ids.len()].iter() {
println!("Test: Adding node {} to main cluster as a learner", id);
let res = leader.raft.add_learner(*id, true).await.unwrap();
let metrics = leader.get_metrics();
println!("Test: add_learner log_id={:?} metrics={:?}", res, metrics);
}
let learner_id = node_ids.iter().find(|id| !voter_ids.contains(*id)).unwrap();
let learner = services.get(learner_id).unwrap();
let not_leader = voter_ids.iter().find(|id| **id != leader_id).unwrap();
let path = base_dir.join(not_leader.to_string()).to_str().unwrap().to_string();
let mut client = test_client(path.as_str()).await;
update(&mut client, "create table person (id int primary key, name string)").await;
update(&mut client, "insert into person (id, name) values (1, 'Hopper'), (2, 'Kay')").await;
let rbs = query(&mut client, "select * from person").await;
let expected = r#"
+----+--------+
| id | name |
+----+--------+
| 1 | Hopper |
| 2 | Kay |
+----+--------+
"#;
assert_batches(expected, rbs);
sleep(Duration::from_secs(5)).await;
println!("Test: shutting down learner={:?}", learner.get_metrics());
println!("Test: shutting down leader={:?}", leader.get_metrics());
};
futures.push(Box::pin(test));
let _ = futures.next().await;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment