Created
July 15, 2017 17:30
-
-
Save BenjaminHolland/65da9acbcbea64a02e794533c6beeb10 to your computer and use it in GitHub Desktop.
Reactive Observable Asynchronous Stream Adapter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using RJCP.IO.Ports; | |
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace ConsoleApp4 | |
{ | |
public static class StreamObservable | |
{ | |
private static async Task<byte[]> ReadBufferFromStreamAsync(Stream stream,CancellationToken ct) | |
{ | |
int BUFFER_SIZE = 128; | |
byte[] _buffer = new byte[128]; | |
//Task.Delay actually does async work, so the continuation runs on a (possibly) different context than the one it started on. | |
await Task.Delay(100).ConfigureAwait(false); | |
//This read doesn't actually do any asynchronous work, so the continuation runs immediately. | |
int read = await stream.ReadAsync(_buffer, 0, 1,ct).ConfigureAwait(false); | |
//Copy the read data into a different buffer so the internal read buffer can't be modified | |
byte[] outBuffer; | |
if (read != BUFFER_SIZE) | |
{ | |
outBuffer = new byte[read]; | |
Buffer.BlockCopy(_buffer, 0, outBuffer, 0, read); | |
} | |
else | |
{ | |
outBuffer = (byte[])_buffer.Clone(); | |
} | |
return outBuffer; | |
} | |
public static IObservable<byte> ToObservable(this Stream stream,bool ownsStream=true) | |
{ | |
//Create a subject we can use to feed data back into our buffer stream. | |
var _readFeedback = new BehaviorSubject<int>(-1); | |
//Wrap up our disposal nicely so our observable query does't need blocks. | |
//(we can dispose both resources in one line later on) | |
var disposableResources = Disposable.Create(() => | |
{ | |
if (ownsStream) stream.Dispose(); | |
_readFeedback.Dispose(); | |
}); | |
//Create and return an observable that will read bytes from the stream until there are no more bytes. | |
return | |
Observable.While(() => stream.CanRead && _readFeedback.FirstAsync().Wait() != 0, //Continue reading until the read doesn't succeed | |
Observable.FromAsync(ct => ReadBufferFromStreamAsync(stream, ct)). //Read a buffer from the stream | |
Do(buffer => _readFeedback.OnNext(buffer.Length))). //Inform the buffer stream about the result of the last read. | |
SelectMany(bytes => bytes). //Flatten out the buffers. | |
Finally(() =>disposableResources.Dispose()); //Clean up after ourselves; | |
} | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
MemoryStream ms = new MemoryStream(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }); | |
MemoryStream ms2 = new MemoryStream(new byte[] { 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 }); | |
var full = ms.ToObservable().Merge(ms2.ToObservable()); | |
ManualResetEventSlim mrs = new ManualResetEventSlim(); | |
IDisposable sub = full. | |
Subscribe( | |
onNext: value => Console.WriteLine(value), | |
onError: ex => Console.WriteLine(ex.Message), | |
onCompleted: () => { Console.WriteLine("Done!"); mrs.Set(); }); | |
mrs.Wait(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment