Skip to content

Instantly share code, notes, and snippets.

@bboyle1234
Created August 22, 2019 09:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bboyle1234/ddfea297465ce0faa3039491c3645713 to your computer and use it in GitHub Desktop.
Save bboyle1234/ddfea297465ce0faa3039491c3645713 to your computer and use it in GitHub Desktop.
ReactiveExperiments
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Nito.AsyncEx;
using Rx.Net.Plus;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp5 {
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
#pragma warning disable ConfigureAwaitEnforcer // ConfigureAwaitEnforcer
interface IDeviceView : ICloneable {
Guid DeviceId { get; }
}
class DeviceTotalView : IDeviceView {
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public int Currents { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceVoltagesUpdateView : IDeviceView {
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceCurrentsUpdateView : IDeviceView {
public Guid DeviceId { get; set; }
public int Current { get; set; }
public object Clone() => this.MemberwiseClone();
}
class DeviceUpdateEvent {
public DeviceTotalView View;
public IDeviceView LastUpdate;
}
static class DeviceUtils {
public static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {
if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");
var view = (DeviceTotalView)previousUpdate.View.Clone();
switch (update) {
case DeviceVoltagesUpdateView x: {
view.Voltage = x.Voltage;
break;
}
case DeviceCurrentsUpdateView x: {
view.Currents = x.Current;
break;
}
}
return new DeviceUpdateEvent { View = view, LastUpdate = update };
}
}
interface IDeviceService {
/// <summary>
/// Gets an observable that produces aggregated update events for the device with the given deviceId.
/// On subscription, the most recent event is immediately pushed to the subscriber.
/// There can be multiple subscribers.
/// </summary>
IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);
}
public static class AsyncExExtensions {
public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {
return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);
}
}
class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {
readonly AsyncProducerConsumerQueue<T> Queue;
long _isConnected = 0;
ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;
public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {
Queue = queue;
}
public IDisposable Connect() {
if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");
var cts = new CancellationTokenSource();
var token = cts.Token;
Task.Run(async () => {
try {
while (true) {
token.ThrowIfCancellationRequested();
var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);
foreach (var observer in Observers)
observer.OnNext(@event);
}
} catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {
foreach (var observer in Observers)
observer.OnCompleted();
}
});
return Disposable.Create(() => {
cts.Cancel();
cts.Dispose();
});
}
readonly object subscriberListMutex = new object();
public IDisposable Subscribe(IObserver<T> observer) {
lock (subscriberListMutex) {
Observers = Observers.Add(observer);
}
return Disposable.Create(() => {
lock (subscriberListMutex) {
Observers = Observers.Remove(observer);
}
});
}
}
class DeviceService : IDeviceService {
readonly IObservable<IDeviceView> Source;
public DeviceService(IConnectableObservable<IDeviceView> source) {
/// When injected here, "source" is cold in the sense that it won't produce events until the first time it is subscribed.
/// However, for all subsequent subscriptions, it will be "hot" in the sense that it won't deliver any historical events, only new events.
/// "source" will throw an exception if its "Subscribe" method is called more than once as it is intended to have only one observer and
/// be read all the way from the beginning.
Source = source;
/// Callers of the "Subscribe" method below will expect data to be preloaded and will expect to be immediately delivered the most
/// recent event. So we need to immediately subscribe to "source" and start preloading the aggregate streams.
/// I'm assuming there is going to need to be a groupby to split the stream by device id.
var groups = source.GroupBy(x => x.DeviceId);
/// Now somehow we need to perform the aggregrate function on each grouping.
/// And create an observable that immediately delivers the most recent aggregated event when "Subscribe" is called below.
source.Connect();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
/// How do we implement this? The observable that we return must be pre-loaded with the latest
throw new NotImplementedException();
}
}
[TestClass]
public class Tests {
[TestMethod]
public async Task Test1() {
DeviceUpdateEvent deviceView1 = null;
DeviceUpdateEvent deviceView2 = null;
var input = new AsyncProducerConsumerQueue<IDeviceView>();
var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });
var service = new DeviceService(source);
service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);
service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);
/// I believe there is no need to pause here because the Subscribe method calls above
/// block until the events have all been pushed into the subscribers above.
Assert.AreEqual(deviceView1.View.DeviceId, id1);
Assert.AreEqual(deviceView2.View.DeviceId, id2);
Assert.AreEqual(deviceView1.View.Voltage, 2);
Assert.AreEqual(deviceView2.View.Voltage, 100);
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });
await Task.Delay(100); /// Give the event time to propagate.
Assert.AreEqual(deviceView2.View.Voltage, 101);
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" />
<PackageReference Include="MSTest.TestAdapter" Version="1.4.0" />
<PackageReference Include="MSTest.TestFramework" Version="1.4.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="Nito.AsyncEx" Version="5.0.0" />
<PackageReference Include="Rx.Net.Plus" Version="1.1.10" />
<PackageReference Include="System.Reactive" Version="4.1.6" />
</ItemGroup>
</Project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment