Skip to content

Instantly share code, notes, and snippets.

@UkoeHB
Last active May 17, 2023 15:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save UkoeHB/4b3528a5c3a3134bd82d19a2bc6a8e87 to your computer and use it in GitHub Desktop.
Save UkoeHB/4b3528a5c3a3134bd82d19a2bc6a8e87 to your computer and use it in GitHub Desktop.
Demo: optimized monero scanning design
// Copyright (c) 2022, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// NOT FOR PRODUCTION
//paired header
//local headers
#include "async/misc_utils.h"
#include "async/threadpool.h"
#include "common/variant.h"
#include "misc_log_ex.h"
#include "ringct/rctTypes.h"
#include "seraphis_main/scan_core_types.h"
#include "seraphis_main/scan_ledger_chunk_async.h"
#include "seraphis_main/scan_ledger_chunk_simple.h"
#include "seraphis_main/scanning_context.h"
//third party headers
//standard headers
#include <future>
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "seraphis"
namespace sp
{
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
struct PendingChunk final
{
scanning::PendingChunkContext pending_context;
scanning::PendingChunkData pending_data;
};
using PendingChunkVariant = tools::variant<PendingChunk, scanning::ChunkContext>;
bool is_end_chunk(const PendingChunkVariant &variant)
{
struct visitor final : public tools::variant_static_visitor<bool>
{
using variant_static_visitor::operator(); //for blank overload
bool operator()(const PendingChunk &chunk) const
{
return async::future_is_ready(chunk.pending_context.chunk_context) &&
chunk_is_empty(chunk.pending_context.chunk_context.get());
}
bool operator()(const scanning::ChunkContext &context) const
{
return chunk_is_empty(context);
}
};
return variant.visit(visitor{});
}
const scanning::ChunkContext* try_ref_chunk_context(const PendingChunkVariant &variant)
{
struct visitor final : public tools::variant_static_visitor<const scanning::ChunkContext*>
{
using variant_static_visitor::operator(); //for blank overload
const scanning::ChunkContext* operator()(const PendingChunk &chunk) const
{
if (!async::future_is_ready(chunk.pending_context.chunk_context))
return nullptr;
return &chunk.pending_context.chunk_context.get();
}
const scanning::ChunkContext* operator()(const scanning::ChunkContext &context) const
{
return &context;
}
};
return variant.visit(visitor{});
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
PendingChunk launch_chunk_task(const std::uint64_t chunk_start_index, const std::uint64_t requested_chunk_size)
{
async::Threadpool &threadpool{async::get_default_threadpool()};
// prepare chunk task
std::promise<void> context_stop_signal{};
std::promise<void> data_stop_signal{};
std::promise<scanning::ChunkContext> chunk_context_handle{};
std::promise<scanning::ChunkData> chunk_data_handle{};
std::shared_future<scanning::ChunkContext> chunk_context_future = chunk_context_handle.get_future().share();
std::shared_future<scanning::ChunkData> chunk_data_future = chunk_data_handle.get_future().share();
async::join_signal_t context_join_signal = threadpool.make_join_signal();
async::join_signal_t data_join_signal = threadpool.make_join_signal();
async::join_token_t context_join_token = threadpool.get_join_token(context_join_signal);
async::join_token_t data_join_token = threadpool.get_join_token(data_join_signal);
auto task =
[
&threadpool,
l_context_stop_flag = context_stop_signal.get_future().share(),
l_data_stop_flag = data_stop_signal.get_future().share(),
l_chunk_start_index = chunk_start_index,
l_requested_chunk_size = requested_chunk_size,
l_chunk_context = std::move(chunk_context_handle),
l_chunk_data = std::move(chunk_data_handle),
l_context_join_token = context_join_token,
l_data_join_token = data_join_token
]
() mutable
{
// check if canceled
if (async::future_is_ready(l_context_stop_flag))
return;
// daemon query + parse response
raw_chunk_data_t raw_chunk_data{
get_chunk(std::move(l_context_stop_flag),
l_chunk_start_index,
l_requested_chunk_size,
threadpool)
};
// set context
l_chunk_context.set_value(raw_chunk_data.context);
l_context_join_token = nullptr;
// check if canceled
if (async::future_is_ready(l_data_stop_flag))
return;
// find-received-scan raw data
// set data
// - note: process chunk data can 'do nothing' if the chunk is empty (i.e. don't launch any tasks)
l_chunk_data.set_value(process_chunk_data(std::move(l_data_stop_flag), raw_chunk_data.data, threadpool));
l_data_join_token = nullptr;
};
// launch the task
threadpool.submit(async::make_simple_task(async::DefaultPriorityLevels::MEDIUM, std::move(task)));
// return pending chunk for caller to deal with as needed
async::join_condition_t chunk_context_join_condition{
threadpool.get_join_condition(std::move(context_join_signal), std::move(context_join_token))
};
async::join_condition_t chunk_data_join_condition{
threadpool.get_join_condition(std::move(data_join_signal), std::move(data_join_token))
};
return PendingChunk{
.pending_context = scanning::PendingChunkContext{
.stop_signal = std::move(context_stop_signal),
.chunk_context = std::move(chunk_context_future),
.context_join_condition = std::move(chunk_context_join_condition)
},
.pending_data = scanning::PendingChunkData{
.stop_signal = std::move(data_stop_signal),
.chunk_data = std::move(chunk_data_future),
.data_join_condition = std::move(chunk_data_join_condition)
}
};
}
////
// WARNING: if the chunk size increment exceeds the max chunk size obtainable from the raw chunk data source, then
// this will be less efficient because it will need to 'gap fill' continuously
///
class AsyncScanningContext final : public scanning::ScanningContextLedger
{
public:
AsyncScanningContext(const std::uint64_t pending_chunk_queue_size, const std::uint64_t chunk_size_increment) :
m_pending_chunk_queue_size{pending_chunk_queue_size},
m_chunk_size_increment{chunk_size_increment}
{
assert(m_pending_chunk_queue_size > 0);
assert(m_chunk_size_increment > 0);
}
void initialize_scanning(const std::uint64_t start_index)
{
// cancel all pending tasks
m_pending_chunks = {};
// launch new chunks
// - note: take care with increment
for (std::uint64_t i{0}; i < m_pending_chunk_queue_size; ++i)
{
m_pending_chunks.emplace_back(launch_chunk_task(start_index + i*m_chunk_size_increment,
m_chunk_size_increment));
}
// save end index of requested chunks for use when launching the next chunk request
m_projected_end_index = start_index + m_pending_chunk_queue_size*m_chunk_size_increment;
}
std::unique_ptr<scanning::LedgerChunk> get_onchain_chunk()
{
assert(m_pending_chunks.size() > 0);
// end condition
if (m_pending_chunks.front().is_type<scanning::ChunkContext>())
return std::make_unique<scanning::LedgerChunkEmpty>(m_pending_chunks.front().unwrap<scanning::ChunkContext>());
/// update queue
// todo: clean up this ugly code
// wait until the first chunk's context is ready
async::get_default_threadpool().work_while_waiting(
m_pending_chunks.front().unwrap<PendingChunk>().pending_context.context_join_condition,
async::DefaultPriorityLevels::MAX);
assert(async::future_is_ready(chunk_out.chunk_context)); //should be ready at this point
// find an end chunk
auto end_chunk_it =
std::find_if(m_pending_chunks.begin(),
m_pending_chunks.end(),
[](const PendingChunkVariant &chunk) -> bool
{
return is_end_chunk(chunk);
});
// add a simple end chunk to the end of the queue if we found an end chunk
if (end_chunk_it != m_pending_chunks.end() && !m_pending_chunks.back().is_type<scanning::ChunkContext>())
{
if (const scanning::ChunkContext *context = try_ref_chunk_context(*end_chunk_it))
m_pending_chunks.emplace_back(*context);
else
assert(false); //this shouldn't happen
}
// remove pending chunks between the end chunk we found and the end of the queue
if (end_chunk_it != m_pending_chunks.end())
{
for (auto pending_it = std::next(end_chunk_it); pending_it != m_pending_chunks.end();)
{
// erase pending chunks
if (pending_it->is_type<PendingChunk>())
pending_it = m_pending_chunks.erase(pending_it);
else
++pending_it;
}
}
// otherwise, launch a new chunk request
else
{
m_pending_chunks.emplace_back(launch_chunk_task(m_projected_end_index, m_chunk_size_increment));
m_projected_end_index += m_chunk_size_increment;
}
// look for gaps between pending chunks, and schedule new pending chunks to fill them
for (auto pending_it = m_pending_chunks.begin(); pending_it != m_pending_chunks.end(); ++pending_it)
{
// if we found an end chunk, then no need to examine the rest
if (is_end_chunk(*pending_it))
break;
// skip chunks with unavailable chunk contexts
const scanning::ChunkContext *chunk_context{try_ref_chunk_context(*pending_it)};
if (!chunk_context)
continue;
// skip chunks that have no apparent gap to the next chunk
const std::uint64_t current_chunk_end_index{chunk_context->start_index + scanning::chunk_size(*chunk_context)};
const std::uint64_t gap_to_next_chunk{
((m_projected_end_index - current_chunk_end_index) % m_chunk_size_increment)
};
if (gap_to_next_chunk == 0)
continue;
// skip if the next chunk is an end chunk that connects to our current chunk
const auto next_chunk_it = std::next(pending_it);
if (next_chunk_it != m_pending_chunks.end())
{
const scanning::ChunkContext *next_chunk_context{try_ref_chunk_context(*next_chunk_it)};
if (is_end_chunk(*next_chunk_it) &&
next_chunk_context &&
next_chunk_context->start_index == current_chunk_end_index)
continue;
}
// add task to fill gap
pending_it =
m_pending_chunks.emplace(next_chunk_it, launch_chunk_task(current_chunk_end_index, gap_to_next_chunk));
}
// extract the first chunk
PendingChunkVariant first_chunk{std::move(m_pending_chunks.front())};
m_pending_chunks.pop_front();
assert(first_chunk.is_type<PendingChunk>());
return std::make_unique<scanning::AsyncLedgerChunk>(
async::get_default_threadpool(),
std::move(first_chunk.unwrap<PendingChunk>().pending_context),
std::vector<scanning::PendingChunkData>{std::move(first_chunk.unwrap<PendingChunk>().pending_data)},
std::vector<rct::key>{rct::zero()}
);
}
private:
/// config
const std::uint64_t m_pending_chunk_queue_size;
const std::uint64_t m_chunk_size_increment;
/// pending chunks
std::deque<PendingChunkVariant> m_pending_chunks;
std::uint64_t m_projected_end_index{};
};
} //namespace sp
@UkoeHB
Copy link
Author

UkoeHB commented May 16, 2023

Follow up (notes on an improved design):

  • instead of spawning 10 tasks to start, spawn 1 task
  • the task has a shared ptr to an atomic counter, which increments when a task starts and decrements when it ends
  • at the start of a task, submit a new task to the threadpool with the task’s follow-up task (next chunk) IF the counter is less than the max concurrent tasks allowed
  • if the follow up was not submitted at the start of a task, submit it at the end as the task continuation; if there is a gap filler then submit the gap filler as continuation and inside the gap filler submit the follow up if it’s needed (the gap filler task should have the same contents as the normal task - easy enough)
  • task results go in a token queue (I have one in /async)
  • the control thread collects results from the result queue and stitches then together; if the next chunk needed is missing then the control thread work-waits on trying to get a result from the queue (you'll need to figure out a solid way to know when a result is missing)
  • to abort when an end chunk is encountered, use a stop signal: a shared future which when set will contain the end chunk block index; worker tasks can check that future and if it is set and the index is below its chunk start index then cancel the task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment