Skip to content

Instantly share code, notes, and snippets.

@Laeeth
Forked from s-ludwig/stdfilestream.d
Created December 3, 2015 18:23
Show Gist options
  • Save Laeeth/f1ad0be8d61c7d5a82ac to your computer and use it in GitHub Desktop.
Save Laeeth/f1ad0be8d61c7d5a82ac to your computer and use it in GitHub Desktop.
Wrapper class to make an existing `File` stream usable in a vibe.d event loop.
// NOTE: This class is now part of vibe.d, see http://vibed.org/api/vibe.stream.stdio/StdFileStream
class StdFileStream : ConnectionStream {
private {
std.stdio.File m_file;
TaskPipe m_readPipe;
TaskPipe m_writePipe;
Thread m_readThread;
Thread m_writeThread;
}
this(bool read, bool write)
{
if (read) m_readPipe = new TaskPipe;
if (write) m_writePipe = new TaskPipe;
}
void setup(std.stdio.File file)
{
logInfo("setup");
m_file = file;
if (m_readPipe) {
logInfo("read thread");
m_readThread = new Thread(&readThreadFunc);
m_readThread.name = "StdFileStream reader";
m_readThread.start();
}
if (m_writePipe) {
logInfo("write thread");
m_writeThread = new Thread(&writeThreadFunc);
m_writeThread.name = "StdFileStream writer";
m_writeThread.start();
}
logInfo("done");
}
override @property bool empty() { enforceReadable(); return m_readPipe.reader.empty; }
override @property ulong leastSize()
{
enforceReadable();
return m_readPipe.reader.leastSize;
}
override @property bool dataAvailableForRead()
{
enforceReadable();
return m_readPipe.reader.dataAvailableForRead;
}
override @property bool connected() const { return m_readPipe.connected; }
override void close() { m_writePipe.close(); }
override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); }
override const(ubyte)[] peek()
{
enforceReadable();
return m_readPipe.reader.peek();
}
override void read(ubyte[] dst)
{
enforceReadable();
m_readPipe.reader.read(dst);
}
override void write(in ubyte[] bytes_)
{
enforceWritable();
m_writePipe.writer.write(bytes_);
}
override void flush()
{
enforceWritable();
m_writePipe.writer.flush();
}
override void finalize()
{
enforceWritable();
if (!m_writePipe.connected) return;
flush();
m_writePipe.writer.finalize();
}
override void write(InputStream stream, ulong nbytes = 0)
{
writeDefault(stream, nbytes);
}
void enforceReadable() { enforce(m_readPipe, "Stream is not readable!"); }
void enforceWritable() { enforce(m_writePipe, "Stream is not writable!"); }
private void readThreadFunc()
{
bool loop_flag = false;
runTask({
ubyte[1] buf;
scope(exit) {
if (m_file.isOpen) m_file.close();
m_readPipe.writer.finalize();
if (loop_flag) exitEventLoop();
else loop_flag = true;
}
while (!m_file.eof) {
auto data = m_file.rawRead(buf);
if (!data.length) break;
m_readPipe.writer.write(data);
vibe.core.core.yield();
}
});
if (!loop_flag) {
loop_flag = true;
runEventLoop();
}
}
private void writeThreadFunc()
{
bool loop_flag = false;
runTask({
ubyte[1024] buf;
scope(exit) {
if (m_file.isOpen) m_file.close();
if (loop_flag) exitEventLoop();
else loop_flag = true;
}
while (m_file.isOpen && !m_writePipe.reader.empty) {
auto len = min(buf.length, m_writePipe.reader.leastSize);
if (!len) break;
m_writePipe.reader.read(buf[0 .. len]);
m_file.rawWrite(buf[0 .. len]);
vibe.core.core.yield();
}
});
if (!loop_flag) {
loop_flag = true;
runEventLoop();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment