Skip to content

Instantly share code, notes, and snippets.

@igor-egorov
Created April 10, 2019 12:38
Show Gist options
  • Save igor-egorov/058dd929475370fda561730791a808a7 to your computer and use it in GitHub Desktop.
Save igor-egorov/058dd929475370fda561730791a808a7 to your computer and use it in GitHub Desktop.
debuggable pending transactions storage
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#include "pending_txs_storage/impl/pending_txs_storage_impl.hpp"
#include <sstream>
#include "interfaces/transaction.hpp"
#include "multi_sig_transactions/state/mst_state.hpp"
namespace iroha {
PendingTransactionStorageImpl::PendingTransactionStorageImpl(
StateObservable updated_batches,
BatchObservable prepared_batch,
BatchObservable expired_batch) {
updated_batches_subscription_ =
updated_batches.subscribe([this](const SharedState &batches) {
this->updatedBatchesHandler(batches);
});
prepared_batch_subscription_ =
prepared_batch.subscribe([this](const SharedBatch &preparedBatch) {
this->removeBatch(preparedBatch);
});
expired_batch_subscription_ =
expired_batch.subscribe([this](const SharedBatch &expiredBatch) {
this->removeBatch(expiredBatch);
});
}
PendingTransactionStorageImpl::~PendingTransactionStorageImpl() {
updated_batches_subscription_.unsubscribe();
prepared_batch_subscription_.unsubscribe();
expired_batch_subscription_.unsubscribe();
}
PendingTransactionStorageImpl::SharedTxsCollectionType
PendingTransactionStorageImpl::getPendingTransactions(
const AccountIdType &account_id) const {
printStorageState("getPendingTransactions");
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto creator_it = storage_.index.find(account_id);
if (storage_.index.end() != creator_it) {
auto &batch_hashes = creator_it->second;
SharedTxsCollectionType result;
auto &batches = storage_.batches;
for (const auto &batch_hash : batch_hashes) {
auto batch_it = batches.find(batch_hash);
if (batches.end() != batch_it) {
auto &txs = batch_it->second->transactions();
result.insert(result.end(), txs.begin(), txs.end());
}
}
return result;
}
return {};
}
void PendingTransactionStorageImpl::printStorageState(
std::string comment) const {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
std::stringstream out;
auto splitter = "===============================";
out << splitter << std::endl << comment << std::endl;
for (auto author : storage_.index) {
out << "Creator: " << author.first << std::endl;
for (auto hash : author.second) {
out << "\t" << hash.toString() << std::endl;
auto batch = storage_.batches.find(hash);
if (batch != storage_.batches.end()) {
out << "\tHas all signatures: " << batch->second->hasAllSignatures()
<< std::endl;
for (auto tx : batch->second->transactions()) {
out << "\t\t" << tx->toString() << std::endl;
}
} else {
out << "\tStorage is inconsistent state!" << std::endl;
}
}
}
out << splitter << std::endl;
std::cout << out.str() << std::endl;
}
std::set<PendingTransactionStorageImpl::AccountIdType>
PendingTransactionStorageImpl::batchCreators(const TransactionBatch &batch) {
std::set<AccountIdType> creators;
for (const auto &transaction : batch.transactions()) {
creators.insert(transaction->creatorAccountId());
}
return creators;
}
void PendingTransactionStorageImpl::updatedBatchesHandler(
const SharedState &updated_batches) {
printStorageState("+ updatedBatchesHandler");
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
updated_batches->iterateBatches([this](const auto &batch) {
auto hash = batch->reducedHash();
auto it = storage_.batches.find(hash);
if (storage_.batches.end() == it) {
for (const auto &creator : batchCreators(*batch)) {
storage_.index[creator].insert(hash);
}
}
storage_.batches[hash] = batch;
});
}
printStorageState("- updatedBatchesHandler");
}
void PendingTransactionStorageImpl::removeBatch(const SharedBatch &batch) {
auto creators = batchCreators(*batch);
auto hash = batch->reducedHash();
printStorageState("+ removeBatch");
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
storage_.batches.erase(hash);
for (const auto &creator : creators) {
auto &index = storage_.index;
auto index_it = index.find(creator);
if (index.end() != index_it) {
auto &creator_set = index_it->second;
creator_set.erase(hash);
if (creator_set.empty()) {
index.erase(index_it);
}
}
}
}
printStorageState("- removeBatch");
}
} // namespace iroha
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment