Skip to content

Instantly share code, notes, and snippets.

@frmdstryr
Created December 6, 2019 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frmdstryr/617710ab4eface00cc4681e85556e401 to your computer and use it in GitHub Desktop.
Save frmdstryr/617710ab4eface00cc4681e85556e401 to your computer and use it in GitHub Desktop.
Zig read tests
// Run with zig run --release-fast readtest.zig
const std = @import("std");
const net = std.net;
const os = std.os;
const math = std.math;
const mem = std.mem;
const File = std.fs.File;
const assert = std.debug.assert;
//pub const io_mode = .evented;
const buffer_size = 16384;
const READ_FN = 7;
pub fn main() anyerror!void {
const allocator = std.heap.direct_allocator;
const req_listen_addr = try net.Address.parseIp4("127.0.0.1", 9002);
//std.event.Loop.instance.?.beginOneEvent();
var server = net.StreamServer.init(.{});
defer server.deinit();
try server.listen(req_listen_addr);
std.debug.warn("listening at {}\n", server.listen_address);
var readFn = switch(READ_FN) {
1 => readUnbuffered,
2 => readSlice,
3 => readBufferedInStream,
4 => readRaw,
5 => readRawStream,
6 => readPassthroughStream, // ~2.5GB/s
7 => readBufferedReaderSingle, // ~600MB/s
8 => readBufferedReaderDirect, // ~1.4GB/s
9 => readBufferedReaderDirectInline, // ~1.3 GB/s
else => @compileError("Invalid choice"),
};
const cmd = [_][]const u8{
"timeout", "-s", "SIGINT", "10s", "bash", "-c",
"dd bs=64k count=10G if=/dev/zero iflag=count_bytes | nc -v -N 127.0.0.1 9002",
};
var process = try std.ChildProcess.init(cmd[0..], std.heap.page_allocator);
defer process.deinit();
try process.spawn();
defer waitIgnore(process);
const conn = try server.accept();
std.debug.warn("connected to {}\n", conn.address);
std.debug.warn("Gonna send it: ");
try readFn(conn);
}
pub fn waitIgnore(process: *std.ChildProcess) void {
// DILLIGAF what the result is
var term = process.wait() catch unreachable;
}
pub fn readUnbuffered(conn: net.StreamServer.Connection) !void {
std.debug.warn("readUnbuffered\n");
const in_stream = &conn.file.inStream().stream;
while (true) {
var c = try in_stream.readByte();
}
}
pub fn readSlice(conn: net.StreamServer.Connection) !void {
std.debug.warn("readSlice\n");
const in_stream = &conn.file.inStream().stream;
var buffer: [8096]u8 = undefined;
while (true) {
var c = try in_stream.read(buffer[0..]);
if (c == 0) return;
}
}
pub fn readBufferedInStream(conn: net.StreamServer.Connection) !void {
std.debug.warn("readBufferedInStream\n");
const in_stream = &std.io.BufferedInStreamCustom(8096, File.ReadError).init(
&conn.file.inStream().stream).stream;
while (true) {
var c = try in_stream.readByte();
}
}
pub fn readBufferedInStreamMixin(conn: net.StreamServer.Connection) !void {
std.debug.warn("readBufferedInStreamMixin\n");
const in_stream = &std.io.BufferedInStream(File).init(conn.file);
while (true) {
var c = try in_stream.readByte();
}
}
pub fn readBufferedReader(conn: net.StreamServer.Connection) !void {
std.debug.warn("readBufferedReader\n");
const in_stream = &BufferedReader.init(&conn.file.inStream().stream).stream;
while (true) {
var c = try in_stream.readByte();
}
}
pub fn readBufferedReaderSingle(conn: net.StreamServer.Connection) !void {
std.debug.warn("readBufferedReaderSingle\n");
var stream = &conn.file.inStream().stream;
var reader = BufferedReader.init(stream);
const in_stream = &reader.stream;
while (true) {
var dest: [1]u8 = undefined;
var c = try BufferedReader.readFnSingle(in_stream, dest[0..]);
if (c == 0) return;
}
}
pub fn readBufferedReaderDirect(conn: net.StreamServer.Connection) !void {
std.debug.warn("readBufferedReaderDirect\n");
var stream = &conn.file.inStream().stream;
var reader = BufferedReader.init(stream);
const in_stream = &reader.stream;
while (true) {
var dest: [1]u8 = undefined;
var c = try BufferedReader.readDirect(&reader, dest[0..]);
if (c == 0) return;
}
}
pub fn readBufferedReaderDirectInline(conn: net.StreamServer.Connection) !void {
std.debug.warn("readBufferedReaderDirectInline\n");
var stream = &conn.file.inStream().stream;
var reader = BufferedReader.init(stream);
const in_stream = &reader.stream;
while (true) {
var dest: [1]u8 = undefined;
var c = try @inlineCall(BufferedReader.readDirect, &reader, dest[0..]);
if (c == 0) return;
}
}
pub fn readPassthroughStream(conn: net.StreamServer.Connection) !void {
std.debug.warn("readPassthroughStream\n");
const in_stream = &PassthroughStream.init(&conn.file.inStream().stream).stream;
while (true) {
var dest: [8096]u8 = undefined;
var c = try in_stream.read(dest[0..]);
if (c == 0) return;
}
}
pub fn readRaw(conn: net.StreamServer.Connection) !void {
std.debug.warn("readRaw\n");
var buffer: [buffer_size]u8 = undefined;
var start_index: usize = buffer_size;
var end_index: usize = buffer_size;
while (true) {
var dest: [1]u8 = undefined;
if (start_index == end_index) {
start_index = 0;
end_index = try os.read(conn.file.handle, buffer[0..]);
if (end_index == 0) return;
}
dest[0] = buffer[start_index];
start_index += 1;
}
}
pub fn readRawStream(conn: net.StreamServer.Connection) !void {
std.debug.warn("readRawStream\n");
const in_stream = &conn.file.inStream().stream;
var buffer: [buffer_size]u8 = undefined;
var start_index: usize = buffer_size;
var end_index: usize = buffer_size;
while (true) {
var dest: [1]u8 = undefined;
if (start_index == end_index) {
start_index = 0;
end_index = try in_stream.read(buffer[0..]);
if (end_index == 0) return;
}
dest[0] = buffer[start_index];
start_index += 1;
}
}
pub fn readRawIntoBuffer(conn: net.StreamServer.Connection) !void {
var buffer: [buffer_size]u8 = undefined;
var buf = try std.Buffer.initCapacity(std.heap.page_allocator, 5000);
var start_index: usize = buffer_size;
var end_index: usize = buffer_size;
while (true) {
var dest: [1]u8 = undefined;
if (start_index == end_index) {
start_index = 0;
try buf.resize(0);
end_index = try os.read(conn.file.handle, buffer[0..]);
if (end_index == 0) return;
}
try buf.appendByte(buffer[start_index]);
start_index += 1;
}
}
const PassthroughStream = struct {
pub const Stream = File.InStream.Stream;
unbuffered_in_stream: *Stream,
stream: Stream,
pub fn init(unbuffered_in_stream: *Stream) PassthroughStream {
return PassthroughStream{
.unbuffered_in_stream = unbuffered_in_stream,
.stream = Stream{ .readFn = readFn },
};
}
fn readFn(in_stream: *Stream, dest: []u8) !usize {
var self = @fieldParentPtr(PassthroughStream, "stream", in_stream);
var n = try self.unbuffered_in_stream.read(dest);
var i: usize = dest.len;
while (i > 0) : (i += 1) {
// Shoot the breeze
self = @fieldParentPtr(PassthroughStream, "stream", in_stream);
var c = dest[i];
}
return n;
}
};
const BufferedReader = struct {
pub const Stream = File.InStream.Stream;
unbuffered_in_stream: *Stream,
stream: Stream,
buffer: [buffer_size]u8,
start_index: usize,
end_index: usize,
ncalls: usize = 0,
pub fn init(unbuffered_in_stream: *Stream) BufferedReader {
return BufferedReader{
.unbuffered_in_stream = unbuffered_in_stream,
.buffer = undefined,
// Initialize these two fields to buffer_size so that
// in `readFn` we treat the state as being able to read
// more from the unbuffered stream. If we set them to 0
// and 0, the code would think we already hit EOF.
.start_index = buffer_size,
.end_index = buffer_size,
.stream = Stream{ .readFn = readFnSingle },
};
}
pub fn readFnSingle(in_stream: *Stream, dest: []u8) !usize {
const self = @fieldParentPtr(BufferedReader, "stream", in_stream);
if (self.start_index == self.end_index) {
self.start_index = 0;
self.end_index = try self.unbuffered_in_stream.read(self.buffer[0..]);
if (self.end_index == 0) return 0;
}
dest[0] = self.buffer[self.start_index];
self.start_index += 1;
self.ncalls += 1;
return 1;
}
pub fn readDirect(self: *BufferedReader, dest: []u8) !usize {
//const self = @fieldParentPtr(BufferedReader, "stream", in_stream);
if (self.start_index == self.end_index) {
self.start_index = 0;
self.end_index = try self.unbuffered_in_stream.read(self.buffer[0..]);
if (self.end_index == 0) return 0;
}
dest[0] = self.buffer[self.start_index];
self.start_index += 1;
self.ncalls += 1;
return 1;
}
pub fn readFn(in_stream: *Stream, dest: []u8) !usize {
const self = @fieldParentPtr(BufferedReader, "stream", in_stream);
// Hot path for one byte reads
if (dest.len == 1 and self.end_index > self.start_index) {
dest[0] = self.buffer[self.start_index];
self.start_index += 1;
return 1;
}
var dest_index: usize = 0;
while (true) {
const dest_space = dest.len - dest_index;
if (dest_space == 0) {
return dest_index;
}
const amt_buffered = self.end_index - self.start_index;
if (amt_buffered == 0) {
assert(self.end_index <= buffer_size);
// Make sure the last read actually gave us some data
if (self.end_index == 0) {
// reading from the unbuffered stream returned nothing
// so we have nothing left to read.
return dest_index;
}
// we can read more data from the unbuffered stream
if (dest_space < buffer_size) {
self.start_index = 0;
self.end_index = try self.unbuffered_in_stream.read(self.buffer[0..]);
// Shortcut
if (self.end_index >= dest_space) {
mem.copy(u8, dest[dest_index..], self.buffer[0..dest_space]);
self.start_index = dest_space;
return dest.len;
}
} else {
// asking for so much data that buffering is actually less efficient.
// forward the request directly to the unbuffered stream
const amt_read = try self.unbuffered_in_stream.read(dest[dest_index..]);
return dest_index + amt_read;
}
}
const copy_amount = math.min(dest_space, amt_buffered);
const copy_end_index = self.start_index + copy_amount;
mem.copy(u8, dest[dest_index..], self.buffer[self.start_index..copy_end_index]);
self.start_index = copy_end_index;
dest_index += copy_amount;
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment