Skip to content

Instantly share code, notes, and snippets.

@cjhanks
Created December 5, 2017 01:31
Show Gist options
  • Save cjhanks/3bd6ebdaf088554c23bbf2d91aec6ddb to your computer and use it in GitHub Desktop.
Save cjhanks/3bd6ebdaf088554c23bbf2d91aec6ddb to your computer and use it in GitHub Desktop.
Blocking Chanel
#include "queue.h"
#include <iostream>
std::size_t
BlockingPipe::Read(std::uint8_t* data, std::size_t length)
{
std::size_t rc = length;
while (length > 0) {
auto& node = Head();
std::size_t read_length = std::min(length, node.bytes.size() - node.used);
std::copy(node.bytes.begin() + node.used,
node.bytes.begin() + node.used + read_length,
data);
if (read_length + node.used >= node.bytes.size())
PopFront();
else
node.used += read_length;
data += read_length;
length -= read_length;
}
return rc;
}
void
BlockingPipe::Write(std::vector<std::uint8_t>&& bytes)
{
Node node(std::move(bytes));
std::lock_guard<std::mutex> guard(list_lock);
nodes.emplace_back(std::move(node));
++nodes_size;
list_condition.notify_one();
}
void
BlockingPipe::Write(const std::vector<std::uint8_t>& bytes)
{
Node node(bytes);
std::lock_guard<std::mutex> guard(list_lock);
nodes.emplace_back(std::move(node));
++nodes_size;
list_condition.notify_one();
}
BlockingPipe::Node&
BlockingPipe::Head()
{
std::unique_lock<std::mutex> guard(list_lock);
list_condition.wait(guard, [&]{ return nodes_size > 0; });
return nodes.front();
}
void
BlockingPipe::PopFront()
{
std::lock_guard<std::mutex> guard(list_lock);
nodes.pop_front();
--nodes_size;
}
#if 1
BlockingPipe pipe;
void
PUB()
{
for (std::size_t i = 0; i < 256 ; ++i) {
std::vector<std::uint8_t> data;
for (std::size_t j = 0; j < 32; ++j) {
data.push_back(i);
}
pipe.Write(std::move(data));
}
}
#include <cassert>
#include <thread>
int
main()
{
std::thread T(PUB);
std::size_t i = 0;
std::vector<std::uint8_t> buffer(17);
for (std::size_t n = 0; n < 256 * 32; ++n) {
pipe.Read(&buffer[0], buffer.size());
for (std::size_t k = 0; k < buffer.size(); ++k) {
std::size_t value = i++ / 32;
std::cerr << value << ":" << unsigned(buffer[k]) << "\n";
assert(value == buffer[k]);
}
}
}
#endif
#ifndef QUEUE_H_
#define QUEUE_H_
#include <atomic>
#include <condition_variable>
#include <list>
#include <mutex>
#include <vector>
class BlockingPipe {
public:
BlockingPipe() = default;
std::size_t
Read(std::uint8_t* data, std::size_t length);
void
Write(std::vector<std::uint8_t>&& bytes);
void
Write(const std::vector<std::uint8_t>& bytes);
private:
struct Node {
Node()
: used(0)
{}
Node(const std::vector<std::uint8_t>& bytes)
: used(0)
, bytes(bytes)
{}
Node(std::vector<std::uint8_t>&& bytes)
: used(0)
, bytes(std::move(bytes))
{}
std::size_t used;
std::vector<std::uint8_t> bytes;
};
std::condition_variable list_condition;
std::mutex list_lock;
std::list<Node> nodes;
std::atomic<std::size_t> nodes_size;
Node&
Head();
void
PopFront();
};
#endif // QUEUE_H_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment