Skip to content

Instantly share code, notes, and snippets.

@barthap
Created July 9, 2023 21:20
Show Gist options
  • Save barthap/059f54c64e6918cf2dadacc281adf85e to your computer and use it in GitHub Desktop.
Save barthap/059f54c64e6918cf2dadacc281adf85e to your computer and use it in GitHub Desktop.
Testing new blob service db schema
diff --git a/services/blob/src/database/client.rs b/services/blob/src/database/client.rs
index 8b8098793..08122a663 100644
--- a/services/blob/src/database/client.rs
+++ b/services/blob/src/database/client.rs
@@ -19,6 +19,8 @@ use crate::constants::db::*;
use super::errors::{BlobDBError, Error as DBError};
use super::types::*;
+pub mod tests;
+
#[derive(Clone)]
pub struct DatabaseClient {
ddb: Arc<aws_sdk_dynamodb::Client>,
diff --git a/services/blob/src/main.rs b/services/blob/src/main.rs
index 15e048220..eca426942 100644
--- a/services/blob/src/main.rs
+++ b/services/blob/src/main.rs
@@ -42,8 +42,6 @@ async fn main() -> Result<()> {
},
);
- tokio::select! {
- http_result = crate::http::run_http_server(service) => http_result,
- grpc_result = crate::grpc::run_grpc_server(db, s3) => grpc_result,
- }
+ crate::database::client::tests::run_database_tests(&aws_config).await;
+ Ok(())
}
//
// services/blob/src/database/client/tests.rs
//
use std::collections::HashMap;
use anyhow::Result;
use aws_sdk_dynamodb::types::{
AttributeValue, DeleteRequest, PutRequest, WriteRequest,
};
use tonic::codegen::BoxFuture;
use tracing::{debug, error, info};
use crate::constants::db::*;
use crate::database::types::UncheckedKind;
use crate::{database::types::HolderAssignmentRow, s3::S3Path};
use super::DatabaseClient;
use crate::database::types::{BlobItemInput, BlobItemRow, DBRow, PrimaryKey};
async fn propagate_test_data(ddb: &aws_sdk_dynamodb::Client) -> Result<()> {
let data: Vec<DBRow> = vec![
// blob 1
DBRow::BlobItem(BlobItemRow {
blob_hash: "hash1".to_string(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
s3_path: S3Path::from_full_path("bucket1/object1").unwrap(),
unchecked: false,
}),
DBRow::HolderAssignment(HolderAssignmentRow {
blob_hash: "hash1".to_string(),
holder: "holder1A".to_string(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
unchecked: false,
}),
DBRow::HolderAssignment(HolderAssignmentRow {
blob_hash: "hash1".to_string(),
holder: "holder1B".to_string(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
unchecked: false,
}),
// blob 2
DBRow::BlobItem(BlobItemRow {
blob_hash: "hash2".to_string(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
s3_path: S3Path::from_full_path("bucket1/object2").unwrap(),
unchecked: false,
}),
DBRow::HolderAssignment(HolderAssignmentRow {
blob_hash: "hash2".to_string(),
holder: "holder2A".to_string(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
unchecked: false,
}),
// blob without holdrs
DBRow::BlobItem(BlobItemRow {
blob_hash: "no_holders".into(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
s3_path: S3Path::from_full_path("bucket1/object3").unwrap(),
unchecked: true,
}),
// holders for no blob
DBRow::HolderAssignment(HolderAssignmentRow {
blob_hash: "no_blob".into(),
holder: "holderX1".into(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
unchecked: true,
}),
DBRow::HolderAssignment(HolderAssignmentRow {
blob_hash: "no_blob".into(),
holder: "holderX2".into(),
created_at: chrono::Utc::now(),
last_modified: chrono::Utc::now(),
unchecked: true,
}),
];
let mut write_requests = Vec::new();
for row in data {
match row {
DBRow::BlobItem(BlobItemRow {
blob_hash,
created_at,
last_modified,
s3_path,
unchecked,
}) => {
let mut item = HashMap::new();
item.insert("blob_hash".into(), AttributeValue::S(blob_hash));
item.insert("holder".into(), AttributeValue::S("_".into()));
item.insert(
"created_at".into(),
AttributeValue::N(created_at.timestamp_millis().to_string()),
);
item.insert(
"last_modified".into(),
AttributeValue::N(last_modified.timestamp_millis().to_string()),
);
item
.insert("s3_path".into(), AttributeValue::S(s3_path.to_full_path()));
if unchecked {
item.insert("unchecked".into(), AttributeValue::S("blob".into()));
}
let put_req = PutRequest::builder().set_item(Some(item)).build();
let write_req = WriteRequest::builder().put_request(put_req).build();
write_requests.push(write_req);
}
DBRow::HolderAssignment(HolderAssignmentRow {
blob_hash,
holder,
created_at,
last_modified,
unchecked,
}) => {
let mut item = HashMap::new();
item.insert("blob_hash".into(), AttributeValue::S(blob_hash));
item.insert("holder".into(), AttributeValue::S(holder));
item.insert(
"created_at".into(),
AttributeValue::N(created_at.timestamp_millis().to_string()),
);
item.insert(
"last_modified".into(),
AttributeValue::N(last_modified.timestamp_millis().to_string()),
);
if unchecked {
item.insert("unchecked".into(), AttributeValue::S("holder".into()));
}
let put_req = PutRequest::builder().set_item(Some(item)).build();
let write_req = WriteRequest::builder().put_request(put_req).build();
write_requests.push(write_req);
}
}
}
ddb
.batch_write_item()
.request_items("blob-service-blobs", write_requests)
.send()
.await?;
Ok(())
}
async fn test_get_item(db: DatabaseClient) -> Result<(), anyhow::Error> {
let item = db.get_blob_item("hash1").await?;
assert!(item.is_some());
let item = db.get_blob_item("not_exists").await?;
assert!(item.is_none());
Ok(())
}
async fn test_put_already_existing_item(db: DatabaseClient) -> Result<()> {
let err = db
.put_blob_item(BlobItemInput {
blob_hash: "hash1".into(),
s3_path: S3Path::from_full_path("a/b")?,
})
.await
.expect_err("Did not fail");
info!("blob err: {:?}", err);
let err = db
.put_holder_assignment("hash1", "holder1A")
.await
.expect_err("Did not fail");
info!("holder err: {:?}", err);
// should succeed this time
db.put_holder_assignment("hash1", "holder1C")
.await
.expect("shouder");
Ok(())
}
async fn test_delete_holder_makes_unchecked(db: DatabaseClient) -> Result<()> {
let item = db.get_blob_item("hash1").await?.expect("no item");
assert!(!item.unchecked);
db.delete_holder_assignment("hash1", "holder1A").await?;
let item = db.get_blob_item("hash1").await?;
assert!(item.is_some());
let item = item.unwrap();
assert!(item.unchecked);
Ok(())
}
async fn test_find_unchecked(db: DatabaseClient) -> Result<()> {
let holders = db
.find_unchecked_items(UncheckedKind::Holder, chrono::Duration::zero())
.await?;
debug!(?holders, "holders");
let holders = holders.into_iter().map(|pk| pk.holder).collect::<Vec<_>>();
assert!(holders.contains(&"holderX1".into()));
assert!(holders.contains(&"holderX2".into()));
let blobs = db
.find_unchecked_items(UncheckedKind::Blob, chrono::Duration::zero())
.await?;
debug!(?blobs, "blobs");
let blobs = blobs.into_iter().map(|pk| pk.blob_hash).collect::<Vec<_>>();
assert!(blobs.contains(&"no_holders".to_string()));
Ok(())
}
async fn test_mark_checked(db: DatabaseClient) -> Result<()> {
let primary_keys = vec![
PrimaryKey::for_blob_item("hash1"), // this isn't unchecked so will be unmodified
PrimaryKey::for_blob_item("ho_holders"),
PrimaryKey {
blob_hash: "no_blob".into(),
holder: "holderX1".into(),
},
];
db.batch_mark_checked(primary_keys.clone()).await?;
let items = db.get_multiple_items(primary_keys).await?;
for item in items {
match item {
DBRow::BlobItem(BlobItemRow { unchecked, .. }) => {
assert!(!unchecked);
}
DBRow::HolderAssignment(HolderAssignmentRow { unchecked, .. }) => {
assert!(!unchecked);
}
}
}
// the unmentioned holderX2 should still be unchecked
let item = db
.get_raw_item(PrimaryKey::new("no_blob".into(), "holderX2".into()))
.await?
.expect("item should exist");
let unchecked_attr = item
.get(ATTR_UNCHECKED)
.expect("should have unchecked attr")
.as_s()
.expect("should be a string");
assert_eq!(unchecked_attr, "holder");
Ok(())
}
pub async fn run_database_tests(aws_config: &aws_types::SdkConfig) {
let tests: Vec<(&str, fn(DatabaseClient) -> BoxFuture<(), anyhow::Error>)> = vec![
(stringify!(test_get_item), |db| Box::pin(test_get_item(db))),
(stringify!(test_delete_holder_makes_unchecked), |db| {
Box::pin(test_delete_holder_makes_unchecked(db))
}),
(stringify!(test_put_already_existing_item), |db| {
Box::pin(test_put_already_existing_item(db))
}),
(stringify!(test_find_unchecked), |db| {
Box::pin(test_find_unchecked(db))
}),
(stringify!(test_mark_checked), |db| {
Box::pin(test_mark_checked(db))
}),
];
let db = DatabaseClient::new(&aws_config);
println!("Running database tests...");
for (test_name, test_fn) in tests {
println!("Running test: {}", test_name);
propagate_test_data(&db.ddb)
.await
.expect("Failed to propagate test data");
if let Err(err) = test_fn(db.clone()).await {
error!("Test failed: {:?}", err);
}
delete_all_items(&db.ddb).await.expect("Failed to clear db");
}
println!("Done!");
}
async fn delete_all_items(
ddb: &aws_sdk_dynamodb::Client,
) -> anyhow::Result<()> {
// delete all items from the table
let scan_output = ddb.scan().table_name("blob-service-blobs").send().await?;
let mut delete_requests = Vec::new();
for item in scan_output.items.unwrap_or_default() {
let req = DeleteRequest::builder()
.key(
"blob_hash",
item.get("blob_hash").expect("missing blobhash").clone(),
)
.key(
"holder",
item.get("holder").expect("missing holder").clone(),
)
.build();
let write_req = WriteRequest::builder().delete_request(req).build();
delete_requests.push(write_req);
}
if delete_requests.is_empty() {
debug!("No items to clear");
return Ok(());
}
ddb
.batch_write_item()
.request_items("blob-service-blobs", delete_requests)
.send()
.await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment