Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yiwu-arbug/34c561d0daf4f9c293bb7b2825d77bf2 to your computer and use it in GitHub Desktop.
Save yiwu-arbug/34c561d0daf4f9c293bb7b2825d77bf2 to your computer and use it in GitHub Desktop.
diff --git a/src/blob_file_size_collector.cc b/src/blob_file_size_collector.cc
index e3825a8..c73c1f5 100644
--- a/src/blob_file_size_collector.cc
+++ b/src/blob_file_size_collector.cc
@@ -55,6 +55,7 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */,
if (!s.ok()) {
return s;
}
+ printf("add key %lu %lu %lu\n", index.file_number, index.blob_handle.size, index.blob_handle.offset);
auto iter = blob_files_size_.find(index.file_number);
if (iter == blob_files_size_.end()) {
diff --git a/src/db_impl.cc b/src/db_impl.cc
index 24d23be..0ecf239 100644
--- a/src/db_impl.cc
+++ b/src/db_impl.cc
@@ -1,5 +1,7 @@
#include "db_impl.h"
+#include "util/sync_point.h"
+
#include "base_db_listener.h"
#include "blob_file_builder.h"
#include "blob_file_iterator.h"
@@ -166,6 +168,10 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
// Avoid flush here because we haven't replaced the table factory yet.
db_options_.avoid_flush_during_recovery = true;
s = DB::Open(db_options_, dbname_, base_descs, handles, &db_);
+ TEST_SYNC_POINT_CALLBACK("TitanDBImpl::Open:AfterFirstOpen", static_cast<void*>(db_));
+ printf("first open\n");
+ TEST_SYNC_POINT("TitanDBImpl::Open:AfterFirstOpen:1");
+ TEST_SYNC_POINT("TitanDBImpl::Open:AfterFirstOpen:2");
if (s.ok()) {
for (size_t i = 0; i < descs.size(); i++) {
auto handle = (*handles)[i];
@@ -679,6 +685,13 @@ void TitanDBImpl::OnCompactionCompleted(
calc_bfs(compaction_job_info.input_files, -1, false);
calc_bfs(compaction_job_info.output_files, 1, true);
+ for (auto& input : inputs) {
+ printf("input %lu\n", input);
+ }
+ for (auto& output : outputs) {
+ printf("output %lu\n", output);
+ }
+
{
MutexLock l(&mutex_);
auto bs = vset_->GetBlobStorage(compaction_job_info.cf_id).lock();
diff --git a/src/db_impl.h b/src/db_impl.h
index d19b3a2..dd558d8 100644
--- a/src/db_impl.h
+++ b/src/db_impl.h
@@ -87,6 +87,8 @@ class TitanDBImpl : public TitanDB {
void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info);
void StartBackgroundTasks();
+
+ Status TEST_StartGC(uint32_t column_family_id);
private:
class FileManager;
@@ -128,7 +130,6 @@ class TitanDBImpl : public TitanDB {
static void BGWorkGC(void* db);
void BackgroundCallGC();
Status BackgroundGC(LogBuffer* log_buffer);
- Status TEST_StartGC(uint32_t column_family_id);
void PurgeObsoleteFiles();
diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc
index 5896524..43f407f 100644
--- a/src/titan_db_test.cc
+++ b/src/titan_db_test.cc
@@ -6,6 +6,7 @@
#include "blob_file_reader.h"
#include "db_impl.h"
#include "db_iter.h"
+#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "titan/db.h"
#include "titan_fault_injection_test_env.h"
@@ -697,6 +698,47 @@ TEST_F(TitanDBTest, BlobRunModeBasic) {
version.clear();
}
+TEST_F(TitanDBTest, CompactionCrash) {
+ options_.disable_auto_compactions = true;
+ options_.disable_background_gc = true;
+ options_.min_blob_size = 0;
+ options_.blob_file_discardable_ratio = 0.01;
+ Open();
+ ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
+ ASSERT_OK(db_->Put(WriteOptions(), "bar", "v1"));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_OK(db_impl_->TEST_StartGC(db_->DefaultColumnFamily()->GetID()));
+ Close();
+
+ DB* inner_db = nullptr;
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "TitanDBImpl::Open:AfterFirstOpen",
+ [&](void* arg) { inner_db = reinterpret_cast<DB*>(arg); });
+
+ SyncPoint::GetInstance()->LoadDependency({
+ {"TitanDBImpl::Open:AfterFirstOpen:1",
+ "CompactionCrash:1"},
+ {"CompactionCrash:2",
+ "TitanDBImpl::Open:AfterFirstOpen:2"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ auto t = new port::Thread([&]() {
+ TEST_SYNC_POINT("CompactionCrash:1");
+ printf("compact\n");
+ ASSERT_OK(inner_db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ TEST_SYNC_POINT("CompactionCrash:2");
+ });
+
+ Open();
+ printf("verifying\n");
+ ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+}
+
} // namespace titandb
} // namespace rocksdb
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment