Created
July 17, 2020 03:27
-
-
Save noseratio/6143efc9ad2311b423551df4ccae69b5 to your computer and use it in GitHub Desktop.
Combine multiple Rx.NET observables and read them as IAsyncEnumerable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.ComponentModel; | |
using System.Linq; | |
using System.Reactive.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Windows.Forms; | |
#nullable enable | |
namespace AsyncEvents | |
{ | |
class Program | |
{ | |
/// <summary>process the async stream of events</summary> | |
static async Task ReadEventStreamAsync( | |
IAsyncEnumerable<(Component, string)> source, | |
CancellationToken token) | |
{ | |
await foreach (var (component, text) in source.WithCancellation(token)) | |
{ | |
// e.g., delay processing | |
await Task.Delay(100, token); | |
Console.WriteLine($"{component.GetType().Name}: {text}"); | |
} | |
} | |
/// <summary>Creates the UI and runs the event loop</summary> | |
static void RunApplication(CancellationToken token) | |
{ | |
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token); | |
// create a form | |
using var form = new Form { Text = typeof(Program).FullName, Width = 640, Height = 480 }; | |
form.FormClosing += (s, e) => cts.Cancel(); | |
// with a panel | |
var panel = new TableLayoutPanel { Dock = DockStyle.Fill }; | |
form.Controls.Add(panel); | |
// with a button | |
var button = new Button { Text = "Click me", Anchor = AnchorStyles.None, AutoSize = true }; | |
panel.Controls.Add(button); | |
Console.WriteLine("Window created."); | |
// create a timer | |
using var timer = new System.Windows.Forms.Timer { Interval = 1000 }; | |
timer.Start(); | |
// main event loop | |
async void runEventLoopAsync(object? _) | |
{ | |
try | |
{ | |
// observe Click events | |
var clickObservable = Observable | |
.FromEventPattern( | |
handler => | |
button!.Click += handler, | |
handler => button!.Click -= handler) | |
.Select(_ => (button as Component, $"Clicked on {DateTime.Now}")); | |
// observe Tick events | |
var tickObservable = Observable | |
.FromEventPattern( | |
handler => timer.Tick += handler, | |
handler => timer.Tick -= handler) | |
.Select(_ => (timer as Component, $"Ticked on {DateTime.Now}")); | |
// merge two observables | |
var mergedObservable = Observable.Merge(clickObservable, tickObservable); | |
// process events as async stream via ToAsyncEnumerable(), | |
// that's when the actual subscriptions happen, i.e., | |
// the event handlers get connected to their corresponding events | |
await ReadEventStreamAsync(mergedObservable.ToAsyncEnumerable(), cts.Token); | |
} | |
catch (OperationCanceledException) | |
{ | |
Console.WriteLine("Cancelled."); | |
} | |
finally | |
{ | |
form.Close(); | |
} | |
} | |
SynchronizationContext.Current!.Post(runEventLoopAsync, null); | |
Application.Run(form); | |
} | |
[STAThread] | |
static void Main() | |
{ | |
Console.Title = typeof(Program).FullName; | |
Application.SetHighDpiMode(HighDpiMode.PerMonitorV2); | |
Application.SetUnhandledExceptionMode(UnhandledExceptionMode.ThrowException); | |
SynchronizationContext.SetSynchronizationContext(new WindowsFormsSynchronizationContext()); | |
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); | |
RunApplication(cts.Token); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment