Last active
August 29, 2015 14:27
C# stream reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ReadContext : IDisposable | |
{ | |
public Stream Stream { get; private set; } | |
public Subject<Byte[]> Subject { get; private set; } | |
public Byte[] Buffer { get; private set; } | |
public Exception Error { get; private set; } | |
public DateTimeOffset Start { get; private set; } | |
public DateTimeOffset End { get; private set; } | |
public ReadContext(Stream s) | |
{ | |
Stream = s; | |
Subject = new Subject<Byte[]>(); | |
Buffer = new Byte[1024]; | |
Subject.Subscribe(_ => | |
{ | |
} | |
, ex => | |
{ | |
End = DateTimeOffset.Now; | |
Error = ex; | |
} | |
, () => | |
{ | |
End = DateTimeOffset.Now; | |
}); | |
} | |
public IObservable<MemoryStream> Aggregate() | |
{ | |
var ms = new MemoryStream(); | |
var observable = new AsyncSubject<MemoryStream>(); | |
Subject.Aggregate(ms, (acc, bytes) => | |
{ | |
acc.Write(bytes, 0, bytes.Length); | |
return acc; | |
}) | |
.Subscribe(x => | |
{ | |
} | |
, () => | |
{ | |
ms.Seek(0, SeekOrigin.Begin); | |
observable.OnNext(ms); | |
observable.OnCompleted(); | |
}); | |
return observable; | |
} | |
public void Begin() | |
{ | |
Start = DateTimeOffset.Now; | |
BeginRead(this); | |
} | |
static void OnRead(IAsyncResult ar) | |
{ | |
var c = ar.AsyncState as ReadContext; | |
var readCount = default(int); | |
try | |
{ | |
readCount = c.Stream.EndRead(ar); | |
} | |
catch (Exception ex) | |
{ | |
c.Subject.OnError(ex); | |
return; | |
} | |
if (readCount == 0) | |
{ | |
c.Subject.OnCompleted(); | |
} | |
else | |
{ | |
var bytes = c.Buffer.Take(readCount).ToArray(); | |
c.Subject.OnNext(bytes); | |
BeginRead(c); | |
} | |
} | |
static void BeginRead(ReadContext context) | |
{ | |
try | |
{ | |
context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnRead, context); | |
} | |
catch (Exception ex) | |
{ | |
context.Subject.OnError(ex); | |
} | |
} | |
#region IDisposable Support | |
private bool disposedValue = false; // 重複する呼び出しを検出するには | |
protected virtual void Dispose(bool disposing) | |
{ | |
if (!disposedValue) | |
{ | |
if (disposing) | |
{ | |
// TODO: マネージ状態を破棄します (マネージ オブジェクト)。 | |
Subject.Dispose(); | |
Stream.Dispose(); | |
} | |
// TODO: アンマネージ リソース (アンマネージ オブジェクト) を解放し、下のファイナライザーをオーバーライドします。 | |
// TODO: 大きなフィールドを null に設定します。 | |
disposedValue = true; | |
} | |
} | |
// TODO: 上の Dispose(bool disposing) にアンマネージ リソースを解放するコードが含まれる場合にのみ、ファイナライザーをオーバーライドします。 | |
// ~ReadContext() { | |
// // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。 | |
// Dispose(false); | |
// } | |
// このコードは、破棄可能なパターンを正しく実装できるように追加されました。 | |
public void Dispose() | |
{ | |
// このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。 | |
Dispose(true); | |
// TODO: 上のファイナライザーがオーバーライドされる場合は、次の行のコメントを解除してください。 | |
// GC.SuppressFinalize(this); | |
} | |
#endregion | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment