Skip to content

Instantly share code, notes, and snippets.

@ronag
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ronag/0ab4dfc16624f0354bcd to your computer and use it in GitHub Desktop.
Save ronag/0ab4dfc16624f0354bcd to your computer and use it in GitHub Desktop.
struct loop_mapping::impl final
{
struct region
{
void* ptr = nullptr;
std::int64_t offset = 0;
std::size_t size = 0;
bool write = false;
std::array<mapped_region, 2> views = {};
void flush()
{
views[0].flush();
views[1].flush();
}
};
typedef boost::scoped_thread<boost::interrupt_and_join_if_joinable> flush_thread_t;
typedef tbb::concurrent_bounded_queue<std::shared_ptr<region>> flush_queue_t;
bool write_ = false;
file_mapping file_;
std::size_t file_size_ = 0;
std::size_t region_size_ = 0;
flush_queue_t flush_queue_;
flush_thread_t flush_thread_;
std::list<std::shared_ptr<region>> regions_;
public:
impl(std::tr2::sys::path path, bool write)
: write_(write)
, file_(path.string().c_str(), write_ ? read_write : read_only)
, file_size_(floor(fs::file_size(path), mapped_region::get_page_size()))
, region_size_(std::min(32'000'000ULL, fs::file_size(path) / 8))
, flush_thread_([this]
{
try
{
while (!boost::this_thread::interruption_requested())
{
std::shared_ptr<region> r;
while (flush_queue_.try_pop(r))
r->flush();
boost::this_thread::sleep_for(boost::chrono::duration<int, boost::milli>(500));
}
}
catch (boost::thread_interrupted&)
{
}
})
{
REQUIRE(file_size_ > mapped_region::get_page_size());
flush_queue_.set_capacity(16);
}
auto data(std::int64_t offset, std::size_t size, boost::optional<bool> write_opt) -> std::shared_ptr<void>
{
return map(offset, size, write_opt);
}
void flush(bool async)
{
std::for_each(std::begin(regions_), std::end(regions_), [=](auto r)
{
flush_queue_.push(r);
});
}
private:
auto normalize_offset(std::int64_t offset) -> std::int64_t
{
return (file_size_ + (offset % file_size_)) % file_size_;
}
auto map(std::int64_t offset, std::size_t size, boost::optional<bool> write_opt) -> std::shared_ptr<void>
{
REQUIRE(size < file_size_ / 2);
offset = normalize_offset(offset);
const auto write = write_opt.get_value_or(write_);
auto it = std::find_if(std::begin(regions_), std::end(regions_), [&](const auto& r)
{
return r->offset <= offset && r->offset + r->size >= offset + size && (!write || r->write);
});
auto r = [&]
{
if (it == std::end(regions_))
return create(offset, size, write);
auto r = *it;
regions_.erase(it);
return r;
}();
regions_.push_front(r);
while (regions_.size() > (write ? 1 : 16))
{
flush_queue_.push(std::move(regions_.back()));
regions_.pop_back();
}
return std::shared_ptr<void>(reinterpret_cast<char*>(r->ptr) + offset - r->offset, [=](auto) mutable
{
r.reset();
});
}
auto create(std::int64_t offset, std::size_t size, bool write) -> std::shared_ptr<region>
{
REQUIRE(offset >= 0);
REQUIRE(static_cast<std::size_t>(offset) < file_size_);
// We map half the region size backwards so that backwards writes/reads don't suffer to badly in terms of performance.
auto data_align = ceil(region_size_/2, mapped_region::get_page_size());
auto data_offset = floor(offset, data_align);
auto data_size = static_cast<std::size_t>(data_align*2);
REQUIRE(offset >= data_offset && offset + size <= data_offset + data_size);
REQUIRE(!write || write_);
// Calculate distance until eof.
auto view0_size = std::min<std::size_t>(data_size, file_size_ - data_offset);
// Calculate loop around distance.
auto view1_size = data_size > view0_size ? data_size - view0_size : 0;
for (auto n = 0; n < 64; ++n)
{
try
{
auto r = std::make_shared<region>();
const auto ptr0 = [&]
{
auto view = mapped_region{ file_, read_only, 0, data_size };
return reinterpret_cast<char*>(view.get_address());
}();
ASSERT(reinterpret_cast<std::intptr_t>(ptr0) % mapped_region::get_page_size() == 0);
r->views[0] = mapped_region{ file_, write ? read_write : read_only, data_offset, view0_size, ptr0 };
r->views[0].advise(mapped_region::advice_sequential);
auto ptr1 = ptr0 + r->views[0].get_size();
ASSERT(reinterpret_cast<std::intptr_t>(ptr1) % mapped_region::get_page_size() == 0);
if (view1_size > 0)
{
r->views[1] = mapped_region{ file_, write ? read_write : read_only, 0, view1_size, ptr1 };
r->views[1].advise(mapped_region::advice_sequential);
}
r->ptr = ptr0;
r->offset = data_offset;
r->size = view0_size + view1_size;
r->write = write;
ASSERT(r->ptr && r->offset <= offset && r->size >= size);
return r;
}
catch (boost::interprocess::interprocess_exception&)
{
LOG(debug) << "Mapping failed.";
boost::this_thread::sleep(boost::posix_time::milliseconds(n*n));
}
}
THROW(runtime_error_t());
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment