Skip to content

Instantly share code, notes, and snippets.

@ph3rin
Created January 20, 2020 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ph3rin/678bb1e9cb50d107644f513101fcff3d to your computer and use it in GitHub Desktop.
Save ph3rin/678bb1e9cb50d107644f513101fcff3d to your computer and use it in GitHub Desktop.
Generic Rewindable Stream
#pragma once
#include <deque>
#include <optional>
#include <stdexcept>
#include <cassert>
namespace rcc
{
template <typename T>
class Stream
{
private:
class Checkpoint
{
friend class Stream;
public:
~Checkpoint()
{
if (m_stream)
{
m_stream->m_stream_index = m_stream_index;
--m_stream->m_checkpoint_count;
}
}
Checkpoint(Checkpoint&& other) noexcept
{
if (&other != this)
{
m_stream_index = other.m_stream_index;
m_stream = other.m_stream;
other.m_stream = nullptr;
}
}
void commit()
{
assert(m_stream);
--m_stream->m_checkpoint_count;
m_stream->discard_all_unchecked_before_current_stream_index();
m_stream = nullptr;
}
private:
Checkpoint(Stream* stream, size_t index) : m_stream(stream), m_stream_index(index)
{
++stream->m_checkpoint_count;
}
Stream* m_stream;
size_t m_stream_index;
};
public:
Stream() = default;
virtual ~Stream() = default;
[[nodiscard]] bool end_of_stream() const
{
return stream_index_to_buffer_iterator() == m_buffer.cend() && !underlying_stream_is_valid();
}
const T& current()
{
if (end_of_stream())
{
throw std::out_of_range("Cannot dereference the end of a stream.");
}
sync_underlying_stream();
return *stream_index_to_buffer_iterator();
}
Stream& move_next()
{
// Calling current ensures that all elements before the next element is extracted.
(void)current();
++m_stream_index;
return *this;
}
Checkpoint create_checkpoint()
{
return Checkpoint(this, m_stream_index);
}
operator bool() const
{
return !end_of_stream();
}
Stream& operator++()
{
return move_next();
}
protected:
Stream(const Stream& other) = default;
Stream(Stream&& other) = default;
Stream& operator=(const Stream& other) = default;
Stream& operator=(Stream&&) = default;
virtual T consume() = 0;
[[nodiscard]] virtual bool underlying_stream_is_valid() const = 0;
private:
auto stream_index_to_buffer_iterator() const
{
// If the stream_index is unreachable from current position
if (m_stream_index < m_offset_of_buffer)
{
throw std::runtime_error("The stream index is unreachable.");
}
// Otherwise, if stream_index is in the future of the buffer
if (m_stream_index >= m_offset_of_buffer + m_buffer.size())
{
return m_buffer.cend();
}
// Otherwise, return the iterator to that element.
return m_buffer.cbegin() + (m_stream_index - m_offset_of_buffer);
}
void sync_underlying_stream()
{
// Consume and append to buffer until the current stream_index is in buffer
while (stream_index_to_buffer_iterator() == m_buffer.cend())
{
if (!underlying_stream_is_valid())
{
throw std::out_of_range("Cannot dereference the end of stream.");
}
m_buffer.emplace_back(consume());
}
}
[[nodiscard]] bool has_no_checkpoint() const
{
return m_checkpoint_count == 0;
}
void discard_all_unchecked_before_current_stream_index()
{
if (has_no_checkpoint())
{
while (m_offset_of_buffer < m_stream_index)
{
m_buffer.pop_front();
++m_offset_of_buffer;
}
}
}
std::deque<T> m_buffer;
/**
* \brief Represents the next position in stream to generate.
*/
size_t m_stream_index = 0u;
/**
* \brief Represents the stream index of the start of the buffer.
*/
size_t m_offset_of_buffer = 0u;
size_t m_checkpoint_count = 0u;
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment