Skip to content

Instantly share code, notes, and snippets.

@MarkPflug
Created May 16, 2024 19:57
Show Gist options
  • Save MarkPflug/caab490eae67364eadcefe50b89e4552 to your computer and use it in GitHub Desktop.
Save MarkPflug/caab490eae67364eadcefe50b89e4552 to your computer and use it in GitHub Desktop.
.NET Stream Inversion
// Include nupkgs: Sylvan.Data, Sylvan.Data.Csv
using Sylvan.Data;
using Sylvan.Data.Csv;
int count = 10000000;// controls how many records to write
const string file = "dump.csv";
// "invert" the WriteCsvToStream code into a readable stream.
using Stream stream = new InvertedStream((stream) => WriteCsvToStream(stream, count));
// copy the readable stream to a file to simulate the consumer
using (var o = File.Create(file))
{
stream.CopyTo(o);
}
var proc = System.Diagnostics.Process.GetCurrentProcess();
Console.WriteLine("Mem used: " + proc.PeakWorkingSet64);
Console.WriteLine("File size: " + new FileInfo(file).Length);
// This represents the code that pulls from the database and writes CSV data to a stream
static void WriteCsvToStream(Stream s, int count)
{
// get some data records
var data = new[]
{
new {Id = 1, Name = "Alpha", Date = new DateTime(2020, 1, 11) },
new {Id = 2, Name = "Beta", Date = new DateTime(2022, 2, 13) },
new {Id = 3, Name = "Delta", Date = new DateTime(2024, 11, 29) },
};
System.Data.Common.DbDataReader reader =
// repeat the 3 sample records count number of times
Enumerable.Range(0, count)
.SelectMany(_ => data)
// create a data reader of the records (Sylvan.Data)
.AsDataReader();
// Write the records to the provided Stream
// which will be inverted into a readable stream for the consuming code
using var tw = new StreamWriter(s);
using var w = CsvDataWriter.Create(tw);
w.Write(reader);
}
/// <summary>
/// Creates a Stream that can turn code that writes to a Stream, into a Stream that can be read from,
/// without requiring the entire stream be buffered in memory.
/// </summary>
public sealed class InvertedStream : Stream
{
// lock on the inner writerStream which is marshalbyref, but it will never be marshaled.
#pragma warning disable CA2002
readonly Action<Stream> writer;
readonly InvertedWriterStream writerStream;
readonly byte[] buffer;
int bufferPos;
int bufferEnd;
bool writerComplete;
Task? writerTask;
/// <summary>
/// Creates a new InvertedStream.
/// </summary>
/// <param name="writer">The code that writes the stream content.</param>
/// <param name="bufferSize">The size of internal buffer to use.</param>
public InvertedStream(Action<Stream> writer, int bufferSize = 0x10000)
{
this.writer = new Action<Stream>(writer);
this.writerStream = new InvertedWriterStream(this);
this.buffer = new byte[bufferSize];
this.bufferPos = this.bufferEnd = 0;
this.writerComplete = false;
}
/// <inheritdoc/>
public override bool CanRead => true;
/// <inheritdoc/>
public override bool CanSeek => false;
/// <inheritdoc/>
public override bool CanWrite => false;
/// <inheritdoc/>
public override long Length => throw new NotSupportedException();
/// <inheritdoc/>
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc/>
public override void Flush()
{
// doesn't make sense to flush a reader stream, but also should be harmless to call.
}
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset + count > buffer.Length) throw new ArgumentOutOfRangeException(nameof(count));
if (this.writerTask == null)
{
this.writerTask = Task.Run(() => writer(this.writerStream));
}
lock (this.writerStream)
{
int c = 0;
while (c < count)
{
if (bufferPos == bufferEnd)
{
if (writerComplete)
{
return c;
}
// give the writer a chance to fill the buffer.
Monitor.Pulse(this.writerStream);
Monitor.Wait(this.writerStream);
}
var avail = bufferEnd - bufferPos;
var len = Math.Min(avail, count - c);
Buffer.BlockCopy(this.buffer, this.bufferPos, buffer, offset, len);
offset += len;
c += len;
this.bufferPos += len;
}
return c;
}
}
#region NotSupported
/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc/>
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
#endregion
sealed class InvertedWriterStream : Stream
{
readonly InvertedStream s;
public InvertedWriterStream(InvertedStream s)
{
this.s = s;
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
lock (this)
{
while (count > 0)
{
if (s.bufferPos > 0)
{
// shift any unread bytes to the start of the buffer
if (s.bufferPos == s.bufferEnd)
{
s.bufferEnd = 0;
}
else
{
var len = s.bufferEnd - s.bufferPos;
// TODO: verify that this works correctly with overlap
Buffer.BlockCopy(s.buffer, s.bufferPos, s.buffer, 0, len);
s.bufferEnd -= len;
}
s.bufferPos = 0;
}
var avail = s.buffer.Length - s.bufferEnd;
var c = Math.Min(avail, count);
Buffer.BlockCopy(buffer, offset, s.buffer, s.bufferEnd, c);
s.bufferEnd += c;
count -= c;
if (c == avail)
{
// we've exhausted the space in the buffer, and the reader needs a chance to consume it
Monitor.Pulse(this);
Monitor.Wait(this);
}
}
}
}
public override void Flush()
{
lock (this)
{
if (s.bufferPos < s.bufferEnd)
{
// there is content in the buffer that the reader needs to consume.
Monitor.Pulse(this);
Monitor.Wait(this);
}
}
}
public override void Close()
{
lock (this)
{
s.writerComplete = true;
// reader needs see that writing is complete
Monitor.Pulse(this);
Monitor.Wait(this);
}
}
#region NotSupported
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
#endregion
}
}
@MarkPflug
Copy link
Author

Note to self: Write should always be happening in the context of a locked Read. Are the buffer copies even necessary, or can the writer be made to write directly into the read buffer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment