Skip to content

Instantly share code, notes, and snippets.

@ousttrue
Last active August 29, 2015 14:27
Show Gist options
  • Save ousttrue/0fc720bbd5ac9f435d68 to your computer and use it in GitHub Desktop.
Save ousttrue/0fc720bbd5ac9f435d68 to your computer and use it in GitHub Desktop.
C# stream reader
class ReadContext : IDisposable
{
public Stream Stream { get; private set; }
public Subject<Byte[]> Subject { get; private set; }
public Byte[] Buffer { get; private set; }
public Exception Error { get; private set; }
public DateTimeOffset Start { get; private set; }
public DateTimeOffset End { get; private set; }
public ReadContext(Stream s)
{
Stream = s;
Subject = new Subject<Byte[]>();
Buffer = new Byte[1024];
Subject.Subscribe(_ =>
{
}
, ex =>
{
End = DateTimeOffset.Now;
Error = ex;
}
, () =>
{
End = DateTimeOffset.Now;
});
}
public IObservable<MemoryStream> Aggregate()
{
var ms = new MemoryStream();
var observable = new AsyncSubject<MemoryStream>();
Subject.Aggregate(ms, (acc, bytes) =>
{
acc.Write(bytes, 0, bytes.Length);
return acc;
})
.Subscribe(x =>
{
}
, () =>
{
ms.Seek(0, SeekOrigin.Begin);
observable.OnNext(ms);
observable.OnCompleted();
});
return observable;
}
public void Begin()
{
Start = DateTimeOffset.Now;
BeginRead(this);
}
static void OnRead(IAsyncResult ar)
{
var c = ar.AsyncState as ReadContext;
var readCount = default(int);
try
{
readCount = c.Stream.EndRead(ar);
}
catch (Exception ex)
{
c.Subject.OnError(ex);
return;
}
if (readCount == 0)
{
c.Subject.OnCompleted();
}
else
{
var bytes = c.Buffer.Take(readCount).ToArray();
c.Subject.OnNext(bytes);
BeginRead(c);
}
}
static void BeginRead(ReadContext context)
{
try
{
context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnRead, context);
}
catch (Exception ex)
{
context.Subject.OnError(ex);
}
}
#region IDisposable Support
private bool disposedValue = false; // 重複する呼び出しを検出するには
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: マネージ状態を破棄します (マネージ オブジェクト)。
Subject.Dispose();
Stream.Dispose();
}
// TODO: アンマネージ リソース (アンマネージ オブジェクト) を解放し、下のファイナライザーをオーバーライドします。
// TODO: 大きなフィールドを null に設定します。
disposedValue = true;
}
}
// TODO: 上の Dispose(bool disposing) にアンマネージ リソースを解放するコードが含まれる場合にのみ、ファイナライザーをオーバーライドします。
// ~ReadContext() {
// // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
// Dispose(false);
// }
// このコードは、破棄可能なパターンを正しく実装できるように追加されました。
public void Dispose()
{
// このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
Dispose(true);
// TODO: 上のファイナライザーがオーバーライドされる場合は、次の行のコメントを解除してください。
// GC.SuppressFinalize(this);
}
#endregion
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment