Skip to content

Instantly share code, notes, and snippets.

@BenjaminHolland
Created July 15, 2017 17:30
Show Gist options
  • Save BenjaminHolland/65da9acbcbea64a02e794533c6beeb10 to your computer and use it in GitHub Desktop.
Save BenjaminHolland/65da9acbcbea64a02e794533c6beeb10 to your computer and use it in GitHub Desktop.
Reactive Observable Asynchronous Stream Adapter
using RJCP.IO.Ports;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp4
{
public static class StreamObservable
{
private static async Task<byte[]> ReadBufferFromStreamAsync(Stream stream,CancellationToken ct)
{
int BUFFER_SIZE = 128;
byte[] _buffer = new byte[128];
//Task.Delay actually does async work, so the continuation runs on a (possibly) different context than the one it started on.
await Task.Delay(100).ConfigureAwait(false);
//This read doesn't actually do any asynchronous work, so the continuation runs immediately.
int read = await stream.ReadAsync(_buffer, 0, 1,ct).ConfigureAwait(false);
//Copy the read data into a different buffer so the internal read buffer can't be modified
byte[] outBuffer;
if (read != BUFFER_SIZE)
{
outBuffer = new byte[read];
Buffer.BlockCopy(_buffer, 0, outBuffer, 0, read);
}
else
{
outBuffer = (byte[])_buffer.Clone();
}
return outBuffer;
}
public static IObservable<byte> ToObservable(this Stream stream,bool ownsStream=true)
{
//Create a subject we can use to feed data back into our buffer stream.
var _readFeedback = new BehaviorSubject<int>(-1);
//Wrap up our disposal nicely so our observable query does't need blocks.
//(we can dispose both resources in one line later on)
var disposableResources = Disposable.Create(() =>
{
if (ownsStream) stream.Dispose();
_readFeedback.Dispose();
});
//Create and return an observable that will read bytes from the stream until there are no more bytes.
return
Observable.While(() => stream.CanRead && _readFeedback.FirstAsync().Wait() != 0, //Continue reading until the read doesn't succeed
Observable.FromAsync(ct => ReadBufferFromStreamAsync(stream, ct)). //Read a buffer from the stream
Do(buffer => _readFeedback.OnNext(buffer.Length))). //Inform the buffer stream about the result of the last read.
SelectMany(bytes => bytes). //Flatten out the buffers.
Finally(() =>disposableResources.Dispose()); //Clean up after ourselves;
}
}
class Program
{
static void Main(string[] args)
{
MemoryStream ms = new MemoryStream(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
MemoryStream ms2 = new MemoryStream(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 });
var full = ms.ToObservable().Merge(ms2.ToObservable());
ManualResetEventSlim mrs = new ManualResetEventSlim();
IDisposable sub = full.
Subscribe(
onNext: value => Console.WriteLine(value),
onError: ex => Console.WriteLine(ex.Message),
onCompleted: () => { Console.WriteLine("Done!"); mrs.Set(); });
mrs.Wait();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment