Skip to content

Instantly share code, notes, and snippets.

@pierre3
Last active December 16, 2015 02:38
Show Gist options
  • Save pierre3/5363558 to your computer and use it in GitHub Desktop.
Save pierre3/5363558 to your computer and use it in GitHub Desktop.
async /await で複数の値を返す場合は IObservable
// 複数のファイルをまとめて非同期に読む
// Observable.Create がいいらしい
// IObservable<T> Create<T>(Func<IObserver<T>,CancellationToken, Task> subscribe)
// このオーバーロードがない(Rx2.0?)ので自作してみる
private IObservable<Tuple<string, string>> OpenFiles(string[] fileNames)
{
return ObservableEx.Create<Tuple<string, string>>(async (observer, ct) =>
{
try
{
foreach (var name in fileNames)
{
if (ct.IsCancellationRequested)
{ return; }
using (var reader = new System.IO.StreamReader(name))
{
var text = await reader.ReadToEndAsync().ConfigureAwait(false);
observer.OnNext(Tuple.Create(System.IO.Path.GetFileName(name), text));
}
await Task.Delay(2000);//動作確認用
}
observer.OnCompleted();
}
catch (Exception e)
{
observer.OnError(e);
}
});
}
private async void FileOpened(string[] fileNames)
{
StatusMessage = "*** Opened Files ***" + Environment.NewLine;
var disposable = OpenFiles(fileNames).Subscribe(file =>
{
StatusMessage += file.Item1 + Environment.NewLine;
FileContentCollection.Add(file);
},
e=> StatusMessage += "Error: " + e.Message,
() => StatusMessage += "Completed.");
//キャンセルしてみる
await Task.Delay(4000);
disposable.Dispose();
}
//非同期処理用のObservable.Create
public static class ObservableEx
{
public static IObservable<T> Create<T>(Func<IObserver<T>,CancellationToken, Task> subscribe)
{
return Observable.Create<T>(observer =>
{
var cts = new CancellationTokenSource();
var subscription = subscribe(observer,cts.Token).ToObservable()
.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
return new CompositeDisposable(subscription,new CancellationDisposable(cts));
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment