Skip to content

Instantly share code, notes, and snippets.

@tgnm
Created March 24, 2014 10:30
Show Gist options
  • Save tgnm/9737831 to your computer and use it in GitHub Desktop.
Save tgnm/9737831 to your computer and use it in GitHub Desktop.
Observable stream that tries to control back pressure
public IObservable<byte[]> GetFileData(string filePath, IObservable<bool> continueReading)
{
AsyncSubject<byte[]> ret = new AsyncSubject<byte[]>();
Observable.Start(() =>
{
using (var stream = new MemoryStream())
{
int streamPos = 0;
byte[] buffer = new byte[128];
continueReading.Subscribe(_ =>
{
stream.Read(buffer, streamPos, buffer.Length);
streamPos += buffer.Length;
ret.OnNext(buffer);
}, Scheduler.CurrentThread);
}
});
return ret;
}
public void StartSyncing()
{
Upload(GetFileData(_dbPath), new AsyncSubject<bool>());
}
public void Upload(IObservable<byte[]> fileStream, AsyncSubject<bool> continueReading )
{
fileStream.Subscribe(bytes =>
{
using (var dropboxStream = new MemoryStream())
{
dropboxStream.Write(bytes);
continueReading.OnNext(true);
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment