Created
December 3, 2022 00:31
-
-
Save avantgardnerio/cdf54a95d7afb32b66ee67cd03cfe184 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[tokio::test] | |
async fn multi_node_test() { | |
let base_dir = env::temp_dir().join("coldb"); | |
let _ = fs::remove_dir_all(&base_dir); | |
fs::create_dir_all(&base_dir).unwrap(); | |
let node_cnt = 5; | |
let mut futures: FuturesUnordered<BoxFuture<_>> = FuturesUnordered::new(); | |
let eps: HashMap<NodeId, String> = (1..=node_cnt) | |
.map(|id| { | |
let path = base_dir.join(id.to_string()); | |
let path = path.to_str().unwrap().to_string(); | |
(id, path) | |
}) | |
.collect(); | |
// Have to create all sockets before running any server | |
let sockets: HashMap<NodeId, UnixListener> = eps | |
.iter() | |
.map(|(id, path)| { | |
let uds = UnixListener::bind(path.clone()).unwrap(); | |
(*id, uds) | |
}) | |
.collect(); | |
let mut services: HashMap<NodeId, Arc<ColDbInstance>> = HashMap::new(); | |
for (id, uds) in sockets.into_iter() { | |
let service = Arc::new(ColDbInstance::new(id, eps.clone()).await.unwrap()); | |
futures.push(Box::pin(serve(service.clone(), uds))); | |
services.insert(id, service); | |
} | |
let voter_count = min(eps.len(), 4); | |
let node_ids: Vec<_> = eps.keys().sorted().cloned().collect(); | |
let voter_ids: Vec<_> = node_ids[..voter_count].iter().cloned().collect(); | |
let test = async move { | |
println!("Test: Waiting for a leader..."); | |
let mut leader_id = 0u64; | |
for id in voter_ids.iter() { | |
let service = services.get(id).unwrap(); | |
leader_id = service.wait_for_leader().await.unwrap(); | |
println!("Test: Node {}: has leader {}", id, leader_id); | |
} | |
let leader = services.get(&leader_id).unwrap(); | |
let metrics = leader.get_metrics(); | |
println!("Test: Cluster is stable metrics={:?}", metrics); | |
sleep(Duration::from_millis(500)).await; | |
for id in node_ids[voter_count..node_ids.len()].iter() { | |
println!("Test: Adding node {} to main cluster as a learner", id); | |
let res = leader.raft.add_learner(*id, true).await.unwrap(); | |
let metrics = leader.get_metrics(); | |
println!("Test: add_learner log_id={:?} metrics={:?}", res, metrics); | |
} | |
let learner_id = node_ids.iter().find(|id| !voter_ids.contains(*id)).unwrap(); | |
let learner = services.get(learner_id).unwrap(); | |
let not_leader = voter_ids.iter().find(|id| **id != leader_id).unwrap(); | |
let path = base_dir.join(not_leader.to_string()).to_str().unwrap().to_string(); | |
let mut client = test_client(path.as_str()).await; | |
update(&mut client, "create table person (id int primary key, name string)").await; | |
update(&mut client, "insert into person (id, name) values (1, 'Hopper'), (2, 'Kay')").await; | |
let rbs = query(&mut client, "select * from person").await; | |
let expected = r#" | |
+----+--------+ | |
| id | name | | |
+----+--------+ | |
| 1 | Hopper | | |
| 2 | Kay | | |
+----+--------+ | |
"#; | |
assert_batches(expected, rbs); | |
sleep(Duration::from_secs(5)).await; | |
println!("Test: shutting down learner={:?}", learner.get_metrics()); | |
println!("Test: shutting down leader={:?}", leader.get_metrics()); | |
}; | |
futures.push(Box::pin(test)); | |
let _ = futures.next().await; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment