-
-
Save ronag/0ab4dfc16624f0354bcd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct loop_mapping::impl final | |
{ | |
struct region | |
{ | |
void* ptr = nullptr; | |
std::int64_t offset = 0; | |
std::size_t size = 0; | |
bool write = false; | |
std::array<mapped_region, 2> views = {}; | |
void flush() | |
{ | |
views[0].flush(); | |
views[1].flush(); | |
} | |
}; | |
typedef boost::scoped_thread<boost::interrupt_and_join_if_joinable> flush_thread_t; | |
typedef tbb::concurrent_bounded_queue<std::shared_ptr<region>> flush_queue_t; | |
bool write_ = false; | |
file_mapping file_; | |
std::size_t file_size_ = 0; | |
std::size_t region_size_ = 0; | |
flush_queue_t flush_queue_; | |
flush_thread_t flush_thread_; | |
std::list<std::shared_ptr<region>> regions_; | |
public: | |
impl(std::tr2::sys::path path, bool write) | |
: write_(write) | |
, file_(path.string().c_str(), write_ ? read_write : read_only) | |
, file_size_(floor(fs::file_size(path), mapped_region::get_page_size())) | |
, region_size_(std::min(32'000'000ULL, fs::file_size(path) / 8)) | |
, flush_thread_([this] | |
{ | |
try | |
{ | |
while (!boost::this_thread::interruption_requested()) | |
{ | |
std::shared_ptr<region> r; | |
while (flush_queue_.try_pop(r)) | |
r->flush(); | |
boost::this_thread::sleep_for(boost::chrono::duration<int, boost::milli>(500)); | |
} | |
} | |
catch (boost::thread_interrupted&) | |
{ | |
} | |
}) | |
{ | |
REQUIRE(file_size_ > mapped_region::get_page_size()); | |
flush_queue_.set_capacity(16); | |
} | |
auto data(std::int64_t offset, std::size_t size, boost::optional<bool> write_opt) -> std::shared_ptr<void> | |
{ | |
return map(offset, size, write_opt); | |
} | |
void flush(bool async) | |
{ | |
std::for_each(std::begin(regions_), std::end(regions_), [=](auto r) | |
{ | |
flush_queue_.push(r); | |
}); | |
} | |
private: | |
auto normalize_offset(std::int64_t offset) -> std::int64_t | |
{ | |
return (file_size_ + (offset % file_size_)) % file_size_; | |
} | |
auto map(std::int64_t offset, std::size_t size, boost::optional<bool> write_opt) -> std::shared_ptr<void> | |
{ | |
REQUIRE(size < file_size_ / 2); | |
offset = normalize_offset(offset); | |
const auto write = write_opt.get_value_or(write_); | |
auto it = std::find_if(std::begin(regions_), std::end(regions_), [&](const auto& r) | |
{ | |
return r->offset <= offset && r->offset + r->size >= offset + size && (!write || r->write); | |
}); | |
auto r = [&] | |
{ | |
if (it == std::end(regions_)) | |
return create(offset, size, write); | |
auto r = *it; | |
regions_.erase(it); | |
return r; | |
}(); | |
regions_.push_front(r); | |
while (regions_.size() > (write ? 1 : 16)) | |
{ | |
flush_queue_.push(std::move(regions_.back())); | |
regions_.pop_back(); | |
} | |
return std::shared_ptr<void>(reinterpret_cast<char*>(r->ptr) + offset - r->offset, [=](auto) mutable | |
{ | |
r.reset(); | |
}); | |
} | |
auto create(std::int64_t offset, std::size_t size, bool write) -> std::shared_ptr<region> | |
{ | |
REQUIRE(offset >= 0); | |
REQUIRE(static_cast<std::size_t>(offset) < file_size_); | |
// We map half the region size backwards so that backwards writes/reads don't suffer to badly in terms of performance. | |
auto data_align = ceil(region_size_/2, mapped_region::get_page_size()); | |
auto data_offset = floor(offset, data_align); | |
auto data_size = static_cast<std::size_t>(data_align*2); | |
REQUIRE(offset >= data_offset && offset + size <= data_offset + data_size); | |
REQUIRE(!write || write_); | |
// Calculate distance until eof. | |
auto view0_size = std::min<std::size_t>(data_size, file_size_ - data_offset); | |
// Calculate loop around distance. | |
auto view1_size = data_size > view0_size ? data_size - view0_size : 0; | |
for (auto n = 0; n < 64; ++n) | |
{ | |
try | |
{ | |
auto r = std::make_shared<region>(); | |
const auto ptr0 = [&] | |
{ | |
auto view = mapped_region{ file_, read_only, 0, data_size }; | |
return reinterpret_cast<char*>(view.get_address()); | |
}(); | |
ASSERT(reinterpret_cast<std::intptr_t>(ptr0) % mapped_region::get_page_size() == 0); | |
r->views[0] = mapped_region{ file_, write ? read_write : read_only, data_offset, view0_size, ptr0 }; | |
r->views[0].advise(mapped_region::advice_sequential); | |
auto ptr1 = ptr0 + r->views[0].get_size(); | |
ASSERT(reinterpret_cast<std::intptr_t>(ptr1) % mapped_region::get_page_size() == 0); | |
if (view1_size > 0) | |
{ | |
r->views[1] = mapped_region{ file_, write ? read_write : read_only, 0, view1_size, ptr1 }; | |
r->views[1].advise(mapped_region::advice_sequential); | |
} | |
r->ptr = ptr0; | |
r->offset = data_offset; | |
r->size = view0_size + view1_size; | |
r->write = write; | |
ASSERT(r->ptr && r->offset <= offset && r->size >= size); | |
return r; | |
} | |
catch (boost::interprocess::interprocess_exception&) | |
{ | |
LOG(debug) << "Mapping failed."; | |
boost::this_thread::sleep(boost::posix_time::milliseconds(n*n)); | |
} | |
} | |
THROW(runtime_error_t()); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment