Created
April 25, 2016 19:29
-
-
Save cramja/3b4d9683d95b80e1852402f81978bbe6 to your computer and use it in GitHub Desktop.
A snippet to show how to test the StorageManager interface using concurrency and brute-force. An even better test would be to automatically detect deadlock.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace storage_manager_test_internal { | |
static int client_id = 0; | |
static const int NUM_BLOBS_PER_CLIENT = 100; | |
static const int BLOB_SIZE_SLOTS = 1; | |
static const int CLIENT_CYCLES = 1000; | |
class StorageClient : public Thread { | |
public: | |
StorageClient(StorageManager &storage_manager) | |
: storage_manager_(storage_manager), | |
id_(client_id++) {} | |
void run() { | |
createBlobs(); | |
for (int i = 0; i < CLIENT_CYCLES; ++i) { | |
cycle(); | |
// if (id_ == 1) { | |
// std::ostringstream msg_stream; | |
// msg_stream << "Thread " << id_ << " completed cycle " << i << "\n"; | |
// LOG(INFO) << msg_stream.str(); | |
// } | |
} | |
deleteBlobs(); | |
} | |
private: | |
void createBlobs() { | |
for (int i = 0; i < NUM_BLOBS_PER_CLIENT; ++i) { | |
block_id new_blob = storage_manager_.createBlob(BLOB_SIZE_SLOTS); | |
blobs_.push_back(new_blob); | |
blobs_status_[new_blob] = false; | |
} | |
} | |
void cycle() { | |
// Shuffle the vector of references and ids. | |
std::random_shuffle(blob_refs_.begin(), blob_refs_.end()); | |
std::random_shuffle(blobs_.begin(), blobs_.end()); | |
// Release half of the checked out blobs. | |
while (blob_refs_.size() > NUM_BLOBS_PER_CLIENT/4) { | |
// TODO(marc): Blob may be marked dirty at this point. | |
blobs_status_[blob_refs_.back()->getID()] = false; | |
blob_refs_.pop_back(); | |
} | |
// Checkout blobs until we reach the threshold. | |
int blob_index = 0; | |
while (blob_refs_.size() < NUM_BLOBS_PER_CLIENT/2) { | |
CHECK(blob_index < blobs_.size()); | |
// See if we have not checked out this blob. If not, make a reference. | |
const block_id candidate_blob = blobs_[blob_index]; | |
if (!blobs_status_[candidate_blob]) { | |
blob_refs_.push_back(storage_manager_.getBlobMutable(candidate_blob)); | |
blobs_status_[candidate_blob] = true; | |
} | |
blob_index++; | |
} | |
} | |
void deleteBlobs() { | |
// Go through all the blobs and dereference them. | |
while(blob_refs_.size() > 0) { | |
blobs_status_[blob_refs_.back()->getID()] = false; | |
blob_refs_.pop_back(); | |
} | |
// Ensure everything has been checked in. | |
for (const auto& id_status : blobs_status_) { | |
CHECK(id_status.second == false); | |
} | |
// Delete all the blob files. | |
for (block_id bid : blobs_) { | |
storage_manager_.deleteBlockOrBlobFile(bid); | |
} | |
} | |
StorageManager &storage_manager_; | |
// Blobs created and owned by this client. | |
std::vector<block_id> blobs_; | |
// A value of true means that we have checked out the block. | |
std::unordered_map<block_id, bool> blobs_status_; | |
// Blob references by this client. | |
std::vector<MutableBlobReference> blob_refs_; | |
int id_; | |
}; | |
} // namespace storage_manager_test_internal | |
using namespace storage_manager_test_internal; | |
// Create a large number of threads which concurrently access the StorageManager, | |
// trying to force a bad interleaving. Test is meant to stress the storage manager | |
// but does not expose all possible interleavings. | |
TEST(StorageManagerTest, BruteForceDeadLockTest) { | |
std::chrono::time_point<std::chrono::steady_clock> start, end; | |
// Init StorageManager. | |
std::unique_ptr<StorageManager> storage_manager; | |
// Use a small number of slots. | |
storage_manager.reset(new StorageManager("temp_storage", 32)); | |
// Init some threads. | |
const int num_clients = 4; | |
PtrVector<StorageClient> clients; | |
for (int i = 0; i < num_clients; ++i) { | |
clients.push_back(new StorageClient(*storage_manager)); | |
} | |
start = std::chrono::steady_clock::now(); | |
// Start all threads. | |
for (int i = 0; i < num_clients; ++i) { | |
clients[i].start(); | |
} | |
// Wait for all threads to finish. | |
for (int i = 0; i < num_clients; ++i) { | |
clients[i].join(); | |
} | |
end = std::chrono::steady_clock::now(); | |
printf("BruteForce stress execution time: %g seconds\n", | |
std::chrono::duration<double>(end - start).count()); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment