-
-
Save Laeeth/f1ad0be8d61c7d5a82ac to your computer and use it in GitHub Desktop.
Wrapper class to make an existing `File` stream usable in a vibe.d event loop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// NOTE: This class is now part of vibe.d, see http://vibed.org/api/vibe.stream.stdio/StdFileStream | |
class StdFileStream : ConnectionStream { | |
private { | |
std.stdio.File m_file; | |
TaskPipe m_readPipe; | |
TaskPipe m_writePipe; | |
Thread m_readThread; | |
Thread m_writeThread; | |
} | |
this(bool read, bool write) | |
{ | |
if (read) m_readPipe = new TaskPipe; | |
if (write) m_writePipe = new TaskPipe; | |
} | |
void setup(std.stdio.File file) | |
{ | |
logInfo("setup"); | |
m_file = file; | |
if (m_readPipe) { | |
logInfo("read thread"); | |
m_readThread = new Thread(&readThreadFunc); | |
m_readThread.name = "StdFileStream reader"; | |
m_readThread.start(); | |
} | |
if (m_writePipe) { | |
logInfo("write thread"); | |
m_writeThread = new Thread(&writeThreadFunc); | |
m_writeThread.name = "StdFileStream writer"; | |
m_writeThread.start(); | |
} | |
logInfo("done"); | |
} | |
override @property bool empty() { enforceReadable(); return m_readPipe.reader.empty; } | |
override @property ulong leastSize() | |
{ | |
enforceReadable(); | |
return m_readPipe.reader.leastSize; | |
} | |
override @property bool dataAvailableForRead() | |
{ | |
enforceReadable(); | |
return m_readPipe.reader.dataAvailableForRead; | |
} | |
override @property bool connected() const { return m_readPipe.connected; } | |
override void close() { m_writePipe.close(); } | |
override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); } | |
override const(ubyte)[] peek() | |
{ | |
enforceReadable(); | |
return m_readPipe.reader.peek(); | |
} | |
override void read(ubyte[] dst) | |
{ | |
enforceReadable(); | |
m_readPipe.reader.read(dst); | |
} | |
override void write(in ubyte[] bytes_) | |
{ | |
enforceWritable(); | |
m_writePipe.writer.write(bytes_); | |
} | |
override void flush() | |
{ | |
enforceWritable(); | |
m_writePipe.writer.flush(); | |
} | |
override void finalize() | |
{ | |
enforceWritable(); | |
if (!m_writePipe.connected) return; | |
flush(); | |
m_writePipe.writer.finalize(); | |
} | |
override void write(InputStream stream, ulong nbytes = 0) | |
{ | |
writeDefault(stream, nbytes); | |
} | |
void enforceReadable() { enforce(m_readPipe, "Stream is not readable!"); } | |
void enforceWritable() { enforce(m_writePipe, "Stream is not writable!"); } | |
private void readThreadFunc() | |
{ | |
bool loop_flag = false; | |
runTask({ | |
ubyte[1] buf; | |
scope(exit) { | |
if (m_file.isOpen) m_file.close(); | |
m_readPipe.writer.finalize(); | |
if (loop_flag) exitEventLoop(); | |
else loop_flag = true; | |
} | |
while (!m_file.eof) { | |
auto data = m_file.rawRead(buf); | |
if (!data.length) break; | |
m_readPipe.writer.write(data); | |
vibe.core.core.yield(); | |
} | |
}); | |
if (!loop_flag) { | |
loop_flag = true; | |
runEventLoop(); | |
} | |
} | |
private void writeThreadFunc() | |
{ | |
bool loop_flag = false; | |
runTask({ | |
ubyte[1024] buf; | |
scope(exit) { | |
if (m_file.isOpen) m_file.close(); | |
if (loop_flag) exitEventLoop(); | |
else loop_flag = true; | |
} | |
while (m_file.isOpen && !m_writePipe.reader.empty) { | |
auto len = min(buf.length, m_writePipe.reader.leastSize); | |
if (!len) break; | |
m_writePipe.reader.read(buf[0 .. len]); | |
m_file.rawWrite(buf[0 .. len]); | |
vibe.core.core.yield(); | |
} | |
}); | |
if (!loop_flag) { | |
loop_flag = true; | |
runEventLoop(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment