-
-
Save barthap/059f54c64e6918cf2dadacc281adf85e to your computer and use it in GitHub Desktop.
Testing new blob service db schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs | |
index 8b8098793..08122a663 100644 | |
--- a/services/blob/src/database/client.rs | |
+++ b/services/blob/src/database/client.rs | |
@@ -19,6 +19,8 @@ use crate::constants::db::*; | |
use super::errors::{BlobDBError, Error as DBError}; | |
use super::types::*; | |
+pub mod tests; | |
+ | |
#[derive(Clone)] | |
pub struct DatabaseClient { | |
ddb: Arc<aws_sdk_dynamodb::Client>, | |
diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs | |
index 15e048220..eca426942 100644 | |
--- a/services/blob/src/main.rs | |
+++ b/services/blob/src/main.rs | |
@@ -42,8 +42,6 @@ async fn main() -> Result<()> { | |
}, | |
); | |
- tokio::select! { | |
- http_result = crate::http::run_http_server(service) => http_result, | |
- grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result, | |
- } | |
+ crate::database::client::tests::run_database_tests(&aws_config).await; | |
+ Ok(()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// services/blob/src/database/client/tests.rs | |
// | |
use std::collections::HashMap; | |
use anyhow::Result; | |
use aws_sdk_dynamodb::types::{ | |
AttributeValue, DeleteRequest, PutRequest, WriteRequest, | |
}; | |
use tonic::codegen::BoxFuture; | |
use tracing::{debug, error, info}; | |
use crate::constants::db::*; | |
use crate::database::types::UncheckedKind; | |
use crate::{database::types::HolderAssignmentRow, s3::S3Path}; | |
use super::DatabaseClient; | |
use crate::database::types::{BlobItemInput, BlobItemRow, DBRow, PrimaryKey}; | |
async fn propagate_test_data(ddb: &aws_sdk_dynamodb::Client) -> Result<()> { | |
let data: Vec<DBRow> = vec![ | |
// blob 1 | |
DBRow::BlobItem(BlobItemRow { | |
blob_hash: "hash1".to_string(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
s3_path: S3Path::from_full_path("bucket1/object1").unwrap(), | |
unchecked: false, | |
}), | |
DBRow::HolderAssignment(HolderAssignmentRow { | |
blob_hash: "hash1".to_string(), | |
holder: "holder1A".to_string(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
unchecked: false, | |
}), | |
DBRow::HolderAssignment(HolderAssignmentRow { | |
blob_hash: "hash1".to_string(), | |
holder: "holder1B".to_string(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
unchecked: false, | |
}), | |
// blob 2 | |
DBRow::BlobItem(BlobItemRow { | |
blob_hash: "hash2".to_string(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
s3_path: S3Path::from_full_path("bucket1/object2").unwrap(), | |
unchecked: false, | |
}), | |
DBRow::HolderAssignment(HolderAssignmentRow { | |
blob_hash: "hash2".to_string(), | |
holder: "holder2A".to_string(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
unchecked: false, | |
}), | |
// blob without holdrs | |
DBRow::BlobItem(BlobItemRow { | |
blob_hash: "no_holders".into(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
s3_path: S3Path::from_full_path("bucket1/object3").unwrap(), | |
unchecked: true, | |
}), | |
// holders for no blob | |
DBRow::HolderAssignment(HolderAssignmentRow { | |
blob_hash: "no_blob".into(), | |
holder: "holderX1".into(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
unchecked: true, | |
}), | |
DBRow::HolderAssignment(HolderAssignmentRow { | |
blob_hash: "no_blob".into(), | |
holder: "holderX2".into(), | |
created_at: chrono::Utc::now(), | |
last_modified: chrono::Utc::now(), | |
unchecked: true, | |
}), | |
]; | |
let mut write_requests = Vec::new(); | |
for row in data { | |
match row { | |
DBRow::BlobItem(BlobItemRow { | |
blob_hash, | |
created_at, | |
last_modified, | |
s3_path, | |
unchecked, | |
}) => { | |
let mut item = HashMap::new(); | |
item.insert("blob_hash".into(), AttributeValue::S(blob_hash)); | |
item.insert("holder".into(), AttributeValue::S("_".into())); | |
item.insert( | |
"created_at".into(), | |
AttributeValue::N(created_at.timestamp_millis().to_string()), | |
); | |
item.insert( | |
"last_modified".into(), | |
AttributeValue::N(last_modified.timestamp_millis().to_string()), | |
); | |
item | |
.insert("s3_path".into(), AttributeValue::S(s3_path.to_full_path())); | |
if unchecked { | |
item.insert("unchecked".into(), AttributeValue::S("blob".into())); | |
} | |
let put_req = PutRequest::builder().set_item(Some(item)).build(); | |
let write_req = WriteRequest::builder().put_request(put_req).build(); | |
write_requests.push(write_req); | |
} | |
DBRow::HolderAssignment(HolderAssignmentRow { | |
blob_hash, | |
holder, | |
created_at, | |
last_modified, | |
unchecked, | |
}) => { | |
let mut item = HashMap::new(); | |
item.insert("blob_hash".into(), AttributeValue::S(blob_hash)); | |
item.insert("holder".into(), AttributeValue::S(holder)); | |
item.insert( | |
"created_at".into(), | |
AttributeValue::N(created_at.timestamp_millis().to_string()), | |
); | |
item.insert( | |
"last_modified".into(), | |
AttributeValue::N(last_modified.timestamp_millis().to_string()), | |
); | |
if unchecked { | |
item.insert("unchecked".into(), AttributeValue::S("holder".into())); | |
} | |
let put_req = PutRequest::builder().set_item(Some(item)).build(); | |
let write_req = WriteRequest::builder().put_request(put_req).build(); | |
write_requests.push(write_req); | |
} | |
} | |
} | |
ddb | |
.batch_write_item() | |
.request_items("blob-service-blobs", write_requests) | |
.send() | |
.await?; | |
Ok(()) | |
} | |
async fn test_get_item(db: DatabaseClient) -> Result<(), anyhow::Error> { | |
let item = db.get_blob_item("hash1").await?; | |
assert!(item.is_some()); | |
let item = db.get_blob_item("not_exists").await?; | |
assert!(item.is_none()); | |
Ok(()) | |
} | |
async fn test_put_already_existing_item(db: DatabaseClient) -> Result<()> { | |
let err = db | |
.put_blob_item(BlobItemInput { | |
blob_hash: "hash1".into(), | |
s3_path: S3Path::from_full_path("a/b")?, | |
}) | |
.await | |
.expect_err("Did not fail"); | |
info!("blob err: {:?}", err); | |
let err = db | |
.put_holder_assignment("hash1", "holder1A") | |
.await | |
.expect_err("Did not fail"); | |
info!("holder err: {:?}", err); | |
// should succeed this time | |
db.put_holder_assignment("hash1", "holder1C") | |
.await | |
.expect("shouder"); | |
Ok(()) | |
} | |
async fn test_delete_holder_makes_unchecked(db: DatabaseClient) -> Result<()> { | |
let item = db.get_blob_item("hash1").await?.expect("no item"); | |
assert!(!item.unchecked); | |
db.delete_holder_assignment("hash1", "holder1A").await?; | |
let item = db.get_blob_item("hash1").await?; | |
assert!(item.is_some()); | |
let item = item.unwrap(); | |
assert!(item.unchecked); | |
Ok(()) | |
} | |
async fn test_find_unchecked(db: DatabaseClient) -> Result<()> { | |
let holders = db | |
.find_unchecked_items(UncheckedKind::Holder, chrono::Duration::zero()) | |
.await?; | |
debug!(?holders, "holders"); | |
let holders = holders.into_iter().map(|pk| pk.holder).collect::<Vec<_>>(); | |
assert!(holders.contains(&"holderX1".into())); | |
assert!(holders.contains(&"holderX2".into())); | |
let blobs = db | |
.find_unchecked_items(UncheckedKind::Blob, chrono::Duration::zero()) | |
.await?; | |
debug!(?blobs, "blobs"); | |
let blobs = blobs.into_iter().map(|pk| pk.blob_hash).collect::<Vec<_>>(); | |
assert!(blobs.contains(&"no_holders".to_string())); | |
Ok(()) | |
} | |
async fn test_mark_checked(db: DatabaseClient) -> Result<()> { | |
let primary_keys = vec![ | |
PrimaryKey::for_blob_item("hash1"), // this isn't unchecked so will be unmodified | |
PrimaryKey::for_blob_item("ho_holders"), | |
PrimaryKey { | |
blob_hash: "no_blob".into(), | |
holder: "holderX1".into(), | |
}, | |
]; | |
db.batch_mark_checked(primary_keys.clone()).await?; | |
let items = db.get_multiple_items(primary_keys).await?; | |
for item in items { | |
match item { | |
DBRow::BlobItem(BlobItemRow { unchecked, .. }) => { | |
assert!(!unchecked); | |
} | |
DBRow::HolderAssignment(HolderAssignmentRow { unchecked, .. }) => { | |
assert!(!unchecked); | |
} | |
} | |
} | |
// the unmentioned holderX2 should still be unchecked | |
let item = db | |
.get_raw_item(PrimaryKey::new("no_blob".into(), "holderX2".into())) | |
.await? | |
.expect("item should exist"); | |
let unchecked_attr = item | |
.get(ATTR_UNCHECKED) | |
.expect("should have unchecked attr") | |
.as_s() | |
.expect("should be a string"); | |
assert_eq!(unchecked_attr, "holder"); | |
Ok(()) | |
} | |
pub async fn run_database_tests(aws_config: &aws_types::SdkConfig) { | |
let tests: Vec<(&str, fn(DatabaseClient) -> BoxFuture<(), anyhow::Error>)> = vec![ | |
(stringify!(test_get_item), |db| Box::pin(test_get_item(db))), | |
(stringify!(test_delete_holder_makes_unchecked), |db| { | |
Box::pin(test_delete_holder_makes_unchecked(db)) | |
}), | |
(stringify!(test_put_already_existing_item), |db| { | |
Box::pin(test_put_already_existing_item(db)) | |
}), | |
(stringify!(test_find_unchecked), |db| { | |
Box::pin(test_find_unchecked(db)) | |
}), | |
(stringify!(test_mark_checked), |db| { | |
Box::pin(test_mark_checked(db)) | |
}), | |
]; | |
let db = DatabaseClient::new(&aws_config); | |
println!("Running database tests..."); | |
for (test_name, test_fn) in tests { | |
println!("Running test: {}", test_name); | |
propagate_test_data(&db.ddb) | |
.await | |
.expect("Failed to propagate test data"); | |
if let Err(err) = test_fn(db.clone()).await { | |
error!("Test failed: {:?}", err); | |
} | |
delete_all_items(&db.ddb).await.expect("Failed to clear db"); | |
} | |
println!("Done!"); | |
} | |
async fn delete_all_items( | |
ddb: &aws_sdk_dynamodb::Client, | |
) -> anyhow::Result<()> { | |
// delete all items from the table | |
let scan_output = ddb.scan().table_name("blob-service-blobs").send().await?; | |
let mut delete_requests = Vec::new(); | |
for item in scan_output.items.unwrap_or_default() { | |
let req = DeleteRequest::builder() | |
.key( | |
"blob_hash", | |
item.get("blob_hash").expect("missing blobhash").clone(), | |
) | |
.key( | |
"holder", | |
item.get("holder").expect("missing holder").clone(), | |
) | |
.build(); | |
let write_req = WriteRequest::builder().delete_request(req).build(); | |
delete_requests.push(write_req); | |
} | |
if delete_requests.is_empty() { | |
debug!("No items to clear"); | |
return Ok(()); | |
} | |
ddb | |
.batch_write_item() | |
.request_items("blob-service-blobs", delete_requests) | |
.send() | |
.await?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment