-
-
Save UkoeHB/4b3528a5c3a3134bd82d19a2bc6a8e87 to your computer and use it in GitHub Desktop.
Demo: optimized monero scanning design
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2022, The Monero Project | |
// | |
// All rights reserved. | |
// | |
// Redistribution and use in source and binary forms, with or without modification, are | |
// permitted provided that the following conditions are met: | |
// | |
// 1. Redistributions of source code must retain the above copyright notice, this list of | |
// conditions and the following disclaimer. | |
// | |
// 2. Redistributions in binary form must reproduce the above copyright notice, this list | |
// of conditions and the following disclaimer in the documentation and/or other | |
// materials provided with the distribution. | |
// | |
// 3. Neither the name of the copyright holder nor the names of its contributors may be | |
// used to endorse or promote products derived from this software without specific | |
// prior written permission. | |
// | |
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY | |
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF | |
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL | |
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | |
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, | |
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, | |
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF | |
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
// NOT FOR PRODUCTION | |
//paired header | |
//local headers | |
#include "async/misc_utils.h" | |
#include "async/threadpool.h" | |
#include "common/variant.h" | |
#include "misc_log_ex.h" | |
#include "ringct/rctTypes.h" | |
#include "seraphis_main/scan_core_types.h" | |
#include "seraphis_main/scan_ledger_chunk_async.h" | |
#include "seraphis_main/scan_ledger_chunk_simple.h" | |
#include "seraphis_main/scanning_context.h" | |
//third party headers | |
//standard headers | |
#include <future> | |
#undef MONERO_DEFAULT_LOG_CATEGORY | |
#define MONERO_DEFAULT_LOG_CATEGORY "seraphis" | |
namespace sp | |
{ | |
//------------------------------------------------------------------------------------------------------------------- | |
//------------------------------------------------------------------------------------------------------------------- | |
struct PendingChunk final | |
{ | |
scanning::PendingChunkContext pending_context; | |
scanning::PendingChunkData pending_data; | |
}; | |
using PendingChunkVariant = tools::variant<PendingChunk, scanning::ChunkContext>; | |
bool is_end_chunk(const PendingChunkVariant &variant) | |
{ | |
struct visitor final : public tools::variant_static_visitor<bool> | |
{ | |
using variant_static_visitor::operator(); //for blank overload | |
bool operator()(const PendingChunk &chunk) const | |
{ | |
return async::future_is_ready(chunk.pending_context.chunk_context) && | |
chunk_is_empty(chunk.pending_context.chunk_context.get()); | |
} | |
bool operator()(const scanning::ChunkContext &context) const | |
{ | |
return chunk_is_empty(context); | |
} | |
}; | |
return variant.visit(visitor{}); | |
} | |
const scanning::ChunkContext* try_ref_chunk_context(const PendingChunkVariant &variant) | |
{ | |
struct visitor final : public tools::variant_static_visitor<const scanning::ChunkContext*> | |
{ | |
using variant_static_visitor::operator(); //for blank overload | |
const scanning::ChunkContext* operator()(const PendingChunk &chunk) const | |
{ | |
if (!async::future_is_ready(chunk.pending_context.chunk_context)) | |
return nullptr; | |
return &chunk.pending_context.chunk_context.get(); | |
} | |
const scanning::ChunkContext* operator()(const scanning::ChunkContext &context) const | |
{ | |
return &context; | |
} | |
}; | |
return variant.visit(visitor{}); | |
} | |
//------------------------------------------------------------------------------------------------------------------- | |
//------------------------------------------------------------------------------------------------------------------- | |
PendingChunk launch_chunk_task(const std::uint64_t chunk_start_index, const std::uint64_t requested_chunk_size) | |
{ | |
async::Threadpool &threadpool{async::get_default_threadpool()}; | |
// prepare chunk task | |
std::promise<void> context_stop_signal{}; | |
std::promise<void> data_stop_signal{}; | |
std::promise<scanning::ChunkContext> chunk_context_handle{}; | |
std::promise<scanning::ChunkData> chunk_data_handle{}; | |
std::shared_future<scanning::ChunkContext> chunk_context_future = chunk_context_handle.get_future().share(); | |
std::shared_future<scanning::ChunkData> chunk_data_future = chunk_data_handle.get_future().share(); | |
async::join_signal_t context_join_signal = threadpool.make_join_signal(); | |
async::join_signal_t data_join_signal = threadpool.make_join_signal(); | |
async::join_token_t context_join_token = threadpool.get_join_token(context_join_signal); | |
async::join_token_t data_join_token = threadpool.get_join_token(data_join_signal); | |
auto task = | |
[ | |
&threadpool, | |
l_context_stop_flag = context_stop_signal.get_future().share(), | |
l_data_stop_flag = data_stop_signal.get_future().share(), | |
l_chunk_start_index = chunk_start_index, | |
l_requested_chunk_size = requested_chunk_size, | |
l_chunk_context = std::move(chunk_context_handle), | |
l_chunk_data = std::move(chunk_data_handle), | |
l_context_join_token = context_join_token, | |
l_data_join_token = data_join_token | |
] | |
() mutable | |
{ | |
// check if canceled | |
if (async::future_is_ready(l_context_stop_flag)) | |
return; | |
// daemon query + parse response | |
raw_chunk_data_t raw_chunk_data{ | |
get_chunk(std::move(l_context_stop_flag), | |
l_chunk_start_index, | |
l_requested_chunk_size, | |
threadpool) | |
}; | |
// set context | |
l_chunk_context.set_value(raw_chunk_data.context); | |
l_context_join_token = nullptr; | |
// check if canceled | |
if (async::future_is_ready(l_data_stop_flag)) | |
return; | |
// find-received-scan raw data | |
// set data | |
// - note: process chunk data can 'do nothing' if the chunk is empty (i.e. don't launch any tasks) | |
l_chunk_data.set_value(process_chunk_data(std::move(l_data_stop_flag), raw_chunk_data.data, threadpool)); | |
l_data_join_token = nullptr; | |
}; | |
// launch the task | |
threadpool.submit(async::make_simple_task(async::DefaultPriorityLevels::MEDIUM, std::move(task))); | |
// return pending chunk for caller to deal with as needed | |
async::join_condition_t chunk_context_join_condition{ | |
threadpool.get_join_condition(std::move(context_join_signal), std::move(context_join_token)) | |
}; | |
async::join_condition_t chunk_data_join_condition{ | |
threadpool.get_join_condition(std::move(data_join_signal), std::move(data_join_token)) | |
}; | |
return PendingChunk{ | |
.pending_context = scanning::PendingChunkContext{ | |
.stop_signal = std::move(context_stop_signal), | |
.chunk_context = std::move(chunk_context_future), | |
.context_join_condition = std::move(chunk_context_join_condition) | |
}, | |
.pending_data = scanning::PendingChunkData{ | |
.stop_signal = std::move(data_stop_signal), | |
.chunk_data = std::move(chunk_data_future), | |
.data_join_condition = std::move(chunk_data_join_condition) | |
} | |
}; | |
} | |
//// | |
// WARNING: if the chunk size increment exceeds the max chunk size obtainable from the raw chunk data source, then | |
// this will be less efficient because it will need to 'gap fill' continuously | |
/// | |
class AsyncScanningContext final : public scanning::ScanningContextLedger | |
{ | |
public: | |
AsyncScanningContext(const std::uint64_t pending_chunk_queue_size, const std::uint64_t chunk_size_increment) : | |
m_pending_chunk_queue_size{pending_chunk_queue_size}, | |
m_chunk_size_increment{chunk_size_increment} | |
{ | |
assert(m_pending_chunk_queue_size > 0); | |
assert(m_chunk_size_increment > 0); | |
} | |
void initialize_scanning(const std::uint64_t start_index) | |
{ | |
// cancel all pending tasks | |
m_pending_chunks = {}; | |
// launch new chunks | |
// - note: take care with increment | |
for (std::uint64_t i{0}; i < m_pending_chunk_queue_size; ++i) | |
{ | |
m_pending_chunks.emplace_back(launch_chunk_task(start_index + i*m_chunk_size_increment, | |
m_chunk_size_increment)); | |
} | |
// save end index of requested chunks for use when launching the next chunk request | |
m_projected_end_index = start_index + m_pending_chunk_queue_size*m_chunk_size_increment; | |
} | |
std::unique_ptr<scanning::LedgerChunk> get_onchain_chunk() | |
{ | |
assert(m_pending_chunks.size() > 0); | |
// end condition | |
if (m_pending_chunks.front().is_type<scanning::ChunkContext>()) | |
return std::make_unique<scanning::LedgerChunkEmpty>(m_pending_chunks.front().unwrap<scanning::ChunkContext>()); | |
/// update queue | |
// todo: clean up this ugly code | |
// wait until the first chunk's context is ready | |
async::get_default_threadpool().work_while_waiting( | |
m_pending_chunks.front().unwrap<PendingChunk>().pending_context.context_join_condition, | |
async::DefaultPriorityLevels::MAX); | |
assert(async::future_is_ready(chunk_out.chunk_context)); //should be ready at this point | |
// find an end chunk | |
auto end_chunk_it = | |
std::find_if(m_pending_chunks.begin(), | |
m_pending_chunks.end(), | |
[](const PendingChunkVariant &chunk) -> bool | |
{ | |
return is_end_chunk(chunk); | |
}); | |
// add a simple end chunk to the end of the queue if we found an end chunk | |
if (end_chunk_it != m_pending_chunks.end() && !m_pending_chunks.back().is_type<scanning::ChunkContext>()) | |
{ | |
if (const scanning::ChunkContext *context = try_ref_chunk_context(*end_chunk_it)) | |
m_pending_chunks.emplace_back(*context); | |
else | |
assert(false); //this shouldn't happen | |
} | |
// remove pending chunks between the end chunk we found and the end of the queue | |
if (end_chunk_it != m_pending_chunks.end()) | |
{ | |
for (auto pending_it = std::next(end_chunk_it); pending_it != m_pending_chunks.end();) | |
{ | |
// erase pending chunks | |
if (pending_it->is_type<PendingChunk>()) | |
pending_it = m_pending_chunks.erase(pending_it); | |
else | |
++pending_it; | |
} | |
} | |
// otherwise, launch a new chunk request | |
else | |
{ | |
m_pending_chunks.emplace_back(launch_chunk_task(m_projected_end_index, m_chunk_size_increment)); | |
m_projected_end_index += m_chunk_size_increment; | |
} | |
// look for gaps between pending chunks, and schedule new pending chunks to fill them | |
for (auto pending_it = m_pending_chunks.begin(); pending_it != m_pending_chunks.end(); ++pending_it) | |
{ | |
// if we found an end chunk, then no need to examine the rest | |
if (is_end_chunk(*pending_it)) | |
break; | |
// skip chunks with unavailable chunk contexts | |
const scanning::ChunkContext *chunk_context{try_ref_chunk_context(*pending_it)}; | |
if (!chunk_context) | |
continue; | |
// skip chunks that have no apparent gap to the next chunk | |
const std::uint64_t current_chunk_end_index{chunk_context->start_index + scanning::chunk_size(*chunk_context)}; | |
const std::uint64_t gap_to_next_chunk{ | |
((m_projected_end_index - current_chunk_end_index) % m_chunk_size_increment) | |
}; | |
if (gap_to_next_chunk == 0) | |
continue; | |
// skip if the next chunk is an end chunk that connects to our current chunk | |
const auto next_chunk_it = std::next(pending_it); | |
if (next_chunk_it != m_pending_chunks.end()) | |
{ | |
const scanning::ChunkContext *next_chunk_context{try_ref_chunk_context(*next_chunk_it)}; | |
if (is_end_chunk(*next_chunk_it) && | |
next_chunk_context && | |
next_chunk_context->start_index == current_chunk_end_index) | |
continue; | |
} | |
// add task to fill gap | |
pending_it = | |
m_pending_chunks.emplace(next_chunk_it, launch_chunk_task(current_chunk_end_index, gap_to_next_chunk)); | |
} | |
// extract the first chunk | |
PendingChunkVariant first_chunk{std::move(m_pending_chunks.front())}; | |
m_pending_chunks.pop_front(); | |
assert(first_chunk.is_type<PendingChunk>()); | |
return std::make_unique<scanning::AsyncLedgerChunk>( | |
async::get_default_threadpool(), | |
std::move(first_chunk.unwrap<PendingChunk>().pending_context), | |
std::vector<scanning::PendingChunkData>{std::move(first_chunk.unwrap<PendingChunk>().pending_data)}, | |
std::vector<rct::key>{rct::zero()} | |
); | |
} | |
private: | |
/// config | |
const std::uint64_t m_pending_chunk_queue_size; | |
const std::uint64_t m_chunk_size_increment; | |
/// pending chunks | |
std::deque<PendingChunkVariant> m_pending_chunks; | |
std::uint64_t m_projected_end_index{}; | |
}; | |
} //namespace sp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Follow up (notes on an improved design):