Skip to content

Instantly share code, notes, and snippets.

@3noch
Forked from qzchenwl/Chan.hpp
Last active September 14, 2015 20:01
Show Gist options
  • Save 3noch/a583e43ead2a8b008ddd to your computer and use it in GitHub Desktop.
Save 3noch/a583e43ead2a8b008ddd to your computer and use it in GitHub Desktop.
Haskell's MVar and Chan in C++
#include <memory>
#include "MVar.hpp"
template <typename T>
struct Item;
template <typename T>
struct Stream
{
typedef MVar<Item<T>> type;
};
template <typename T>
struct Item
{
typedef typename Stream<T>::type StreamType;
typedef std::shared_ptr<StreamType> StreamPtr;
T m_v;
StreamPtr m_tail;
Item(const T& v, const StreamPtr& tail): m_v(v), m_tail(tail) {}
};
template <typename T>
class Chan
{
private:
typedef typename Stream<T>::type StreamType;
typedef MVar<StreamType> StreamMVar;
typedef std::shared_ptr<StreamMVar> StreamMVarPtr;
StreamMVarPtr m_readEnd;
StreamMVarPtr m_writeEnd;
public:
Chan()
{
m_readEnd = std::make_shared<StreamMVar>();
m_writeEnd = std::make_shared<StreamMVar>();
auto hole = std::make_shared<StreamType>();
m_readEnd->put(hole);
m_writeEnd->put(hole);
}
Chan(const StreamMVarPtr& readEnd, const StreamMVarPtr& writeEnd)
: m_readEnd(readEnd), m_writeEnd(writeEnd) {}
T read()
{
auto stream = m_readEnd->take();
auto item = stream->read();
m_readEnd->put(item->m_tail);
return item->m_v;
}
void write(const T& v)
{
auto newHole = std::make_shared<StreamType>();
auto oldHole = m_writeEnd->take();
oldHole->put(std::make_shared<Item<T>>(v, newHole));
m_writeEnd->put(newHole);
}
Chan<T> dup()
{
auto hole = m_writeEnd->read();
auto newReadEnd = std::make_shared<StreamMVar>(hole);
return Chan(newReadEnd, m_writeEnd);
}
};
#include <iostream>
#include <thread>
#include <chrono>
#include "Chan.hpp"
using namespace std;
class Value
{
public:
Value(int x): v(x)
{
}
int v;
};
int main(int argc, char* argv[])
{
Chan<Value> c;
thread t2([&] {
for(int i = 0; i < 1000; ++i)
{
this_thread::sleep_for(chrono::milliseconds(1000));
c.write(Value(i));
}
});
thread t1([&] {
while(true)
{
auto x = c.read();
cout << "--- read " << x.v << endl;
}
});
this_thread::sleep_for(chrono::milliseconds(5000));
Chan<Value> c2 = c.dup();
thread t3([&] {
while(true)
{
auto x = c2.read();
cout << "### read " << x.v << endl;
}
});
thread t4([&] {
for(int i = 0; i < 1000; ++i)
{
this_thread::sleep_for(chrono::milliseconds(500));
c2.write(Value(i + 1000));
}
});
t1.join();
t2.join();
t3.join();
t4.join();
cin.get();
}
#pragma once
#include <memory>
#include <condition_variable>
#include <mutex>
#include "readwrite_mutex.hpp"
template <typename T>
class MVar
{
public:
MVar(void);
MVar(const std::shared_ptr<T>& value);
~MVar(void);
std::shared_ptr<T> take();
std::shared_ptr<T> read();
void put(const std::shared_ptr<T>& value);
private:
std::mutex m_mutex;
readwrite_mutex m_rw_mutex;
std::condition_variable m_put_cond;
std::condition_variable m_take_cond;
std::condition_variable m_read_cond;
std::shared_ptr<T> m_value;
};
template <typename T>
MVar<T>::MVar(void)
{
}
template <typename T>
MVar<T>::MVar(const std::shared_ptr<T>& value)
{
put(value);
}
template <typename T>
MVar<T>::~MVar(void)
{
}
template <typename T>
std::shared_ptr<T> MVar<T>::take()
{
std::unique_lock<mutex> lock(m_mutex);
while(!m_value) { m_take_cond.wait(lock); }
std::lock_guard<write_mutex> w_lock(m_rw_mutex);
auto token_value = m_value;
m_value.reset();
m_put_cond.notify_one();
return token_value;
}
template <typename T>
std::shared_ptr<T> MVar<T>::read()
{
std::lock_guard<read_mutex> r_lock(m_rw_mutex);
std::mutex m; std::unique_lock<mutex> lock(m);
while(!m_value) { m_read_cond.wait(lock); }
return m_value;
}
template <typename T>
void MVar<T>::put(const std::shared_ptr<T>& value)
{
if (!value) return;
std::unique_lock<mutex> lock(m_mutex);
while(m_value) { m_put_cond.wait(lock); }
m_value = value;
m_read_cond.notify_all();
m_take_cond.notify_one();
}
#include <mutex>
#include <condition_variable>
#include <thread>
class base_mutex
{
public:
base_mutex(): m_state(0) {}
// Exclusive ownership
void lock_exclusive()
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_state & m_write_entered)
{
m_gate1.wait(lock);
}
m_state |= m_write_entered;
while (m_state & m_n_readers)
{
m_gate2.wait(lock);
}
}
void unlock_exclusive()
{
{
std::lock_guard<std::mutex> _(m_mutex);
m_state = 0;
}
m_gate1.notify_all();
}
// Shared ownership
void lock_shared()
{
std::unique_lock<std::mutex> lock(m_mutex);
while ((m_state & m_write_entered) || ((m_state & m_n_readers) == m_n_readers))
{
m_gate1.wait(lock);
}
auto num_readers = (m_state & m_n_readers) + 1;
m_state &= ~m_n_readers;
m_state |= num_readers;
}
void unlock_shared()
{
std::lock_guard<std::mutex> _(m_mutex);
auto num_readers = (m_state & m_n_readers) - 1;
m_state &= ~m_n_readers;
m_state |= num_readers;
if (m_state & m_write_entered)
{
if (num_readers == 0)
{
m_gate2.notify_one();
}
}
else
{
if (num_readers == m_n_readers - 1)
{
m_gate1.notify_one();
}
}
}
private:
std::mutex m_mutex;
std::condition_variable m_gate1;
std::condition_variable m_gate2;
unsigned m_state;
static const unsigned m_write_entered = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned m_n_readers = ~m_write_entered;
};
class read_mutex : public virtual base_mutex
{
public:
void lock()
{
base_mutex::lock_shared();
}
void unlock()
{
base_mutex::unlock_shared();
}
};
class write_mutex : public virtual base_mutex
{
public:
void lock()
{
base_mutex::lock_exclusive();
}
void unlock()
{
base_mutex::unlock_exclusive();
}
};
class readwrite_mutex : public read_mutex, public write_mutex
{
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment