Created
January 20, 2020 20:17
-
-
Save ph3rin/678bb1e9cb50d107644f513101fcff3d to your computer and use it in GitHub Desktop.
Generic Rewindable Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <deque> | |
#include <optional> | |
#include <stdexcept> | |
#include <cassert> | |
namespace rcc | |
{ | |
template <typename T> | |
class Stream | |
{ | |
private: | |
class Checkpoint | |
{ | |
friend class Stream; | |
public: | |
~Checkpoint() | |
{ | |
if (m_stream) | |
{ | |
m_stream->m_stream_index = m_stream_index; | |
--m_stream->m_checkpoint_count; | |
} | |
} | |
Checkpoint(Checkpoint&& other) noexcept | |
{ | |
if (&other != this) | |
{ | |
m_stream_index = other.m_stream_index; | |
m_stream = other.m_stream; | |
other.m_stream = nullptr; | |
} | |
} | |
void commit() | |
{ | |
assert(m_stream); | |
--m_stream->m_checkpoint_count; | |
m_stream->discard_all_unchecked_before_current_stream_index(); | |
m_stream = nullptr; | |
} | |
private: | |
Checkpoint(Stream* stream, size_t index) : m_stream(stream), m_stream_index(index) | |
{ | |
++stream->m_checkpoint_count; | |
} | |
Stream* m_stream; | |
size_t m_stream_index; | |
}; | |
public: | |
Stream() = default; | |
virtual ~Stream() = default; | |
[[nodiscard]] bool end_of_stream() const | |
{ | |
return stream_index_to_buffer_iterator() == m_buffer.cend() && !underlying_stream_is_valid(); | |
} | |
const T& current() | |
{ | |
if (end_of_stream()) | |
{ | |
throw std::out_of_range("Cannot dereference the end of a stream."); | |
} | |
sync_underlying_stream(); | |
return *stream_index_to_buffer_iterator(); | |
} | |
Stream& move_next() | |
{ | |
// Calling current ensures that all elements before the next element is extracted. | |
(void)current(); | |
++m_stream_index; | |
return *this; | |
} | |
Checkpoint create_checkpoint() | |
{ | |
return Checkpoint(this, m_stream_index); | |
} | |
operator bool() const | |
{ | |
return !end_of_stream(); | |
} | |
Stream& operator++() | |
{ | |
return move_next(); | |
} | |
protected: | |
Stream(const Stream& other) = default; | |
Stream(Stream&& other) = default; | |
Stream& operator=(const Stream& other) = default; | |
Stream& operator=(Stream&&) = default; | |
virtual T consume() = 0; | |
[[nodiscard]] virtual bool underlying_stream_is_valid() const = 0; | |
private: | |
auto stream_index_to_buffer_iterator() const | |
{ | |
// If the stream_index is unreachable from current position | |
if (m_stream_index < m_offset_of_buffer) | |
{ | |
throw std::runtime_error("The stream index is unreachable."); | |
} | |
// Otherwise, if stream_index is in the future of the buffer | |
if (m_stream_index >= m_offset_of_buffer + m_buffer.size()) | |
{ | |
return m_buffer.cend(); | |
} | |
// Otherwise, return the iterator to that element. | |
return m_buffer.cbegin() + (m_stream_index - m_offset_of_buffer); | |
} | |
void sync_underlying_stream() | |
{ | |
// Consume and append to buffer until the current stream_index is in buffer | |
while (stream_index_to_buffer_iterator() == m_buffer.cend()) | |
{ | |
if (!underlying_stream_is_valid()) | |
{ | |
throw std::out_of_range("Cannot dereference the end of stream."); | |
} | |
m_buffer.emplace_back(consume()); | |
} | |
} | |
[[nodiscard]] bool has_no_checkpoint() const | |
{ | |
return m_checkpoint_count == 0; | |
} | |
void discard_all_unchecked_before_current_stream_index() | |
{ | |
if (has_no_checkpoint()) | |
{ | |
while (m_offset_of_buffer < m_stream_index) | |
{ | |
m_buffer.pop_front(); | |
++m_offset_of_buffer; | |
} | |
} | |
} | |
std::deque<T> m_buffer; | |
/** | |
* \brief Represents the next position in stream to generate. | |
*/ | |
size_t m_stream_index = 0u; | |
/** | |
* \brief Represents the stream index of the start of the buffer. | |
*/ | |
size_t m_offset_of_buffer = 0u; | |
size_t m_checkpoint_count = 0u; | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment