Skip to content

Instantly share code, notes, and snippets.

@cwharris
Last active July 18, 2018 22:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cwharris/5dbb4e4971505dc9a53c71b34217d6f7 to your computer and use it in GitHub Desktop.
Save cwharris/5dbb4e4971505dc9a53c71b34217d6f7 to your computer and use it in GitHub Desktop.
Rx Grouping by continuity of a key
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
namespace ConsoleApplication1
{
internal class Program
{
public static void Main(string[] args)
{
var updates = Observable.Return(new Dictionary<string, string>());
var continuousGroupings =
updates.GroupValuesByContinuity(
x => x.Key,
x => x.Value
);
}
}
public static class ObservableExtensions
{
public static IObservable<IGroupedObservable<TKey, TValue>> GroupValuesByContinuity<T, TKey, TValue>(
this IObservable<ICollection<T>> source,
Func<T, TKey> keySelector,
Func<T, TValue> valueSelector
)
{
return source
.Aggregate(
ContinuityTrackingCollection<TKey, TValue>.Seed,
Aggregate
)
.SelectMany(ToContinuityIndications)
.GroupByUntil(
x => x.Key,
g => g.Where(
x => x.Exists == false
)
)
.SelectMany(
g => g.GroupBy(
x => g.Key,
x => x.Value
)
);
ContinuityTrackingCollection<TKey, TValue> Aggregate(
ContinuityTrackingCollection<TKey, TValue> prev,
ICollection<T> latest
)
{
var removedItems =
latest
.Select(keySelector)
.Where(key => prev.Items.Keys.Contains(key) == false);
var items =
latest
.ToDictionary(
keySelector,
valueSelector
);
return new ContinuityTrackingCollection<TKey, TValue>(items, removedItems);
}
}
private static IEnumerable<ContinuityIndication<TKey, TValue>> ToContinuityIndications<TKey, TValue>(
ContinuityTrackingCollection<TKey, TValue> continuityTrackingCollection
)
{
foreach (var key in continuityTrackingCollection.RemovedKeys)
{
yield return ContinuityIndication<TKey, TValue>.WithoutValue(key);
}
foreach (var value in continuityTrackingCollection.Items)
{
yield return ContinuityIndication<TKey, TValue>.WithValue(value.Key, value.Value);
}
}
private class ContinuityTrackingCollection<TKey, TValue>
{
public IDictionary<TKey, TValue> Items { get; }
public HashSet<TKey> RemovedKeys { get; }
public ContinuityTrackingCollection(
IDictionary<TKey, TValue> items,
IEnumerable<TKey> removedKeys
)
{
Items = items;
RemovedKeys = new HashSet<TKey>(removedKeys);
}
public static ContinuityTrackingCollection<TKey, TValue> Seed =>
new ContinuityTrackingCollection<TKey, TValue>(
new Dictionary<TKey, TValue>(),
new List<TKey>()
);
}
private class ContinuityIndication<TKey, T>
{
public TKey Key { get; }
public T Value { get; }
public bool Exists { get; }
private ContinuityIndication(TKey key, T value, bool exists)
{
Key = key;
Value = value;
Exists = exists;
}
public static ContinuityIndication<TKey, T> WithValue(TKey key, T value)
{
return new ContinuityIndication<TKey, T>(key, value, true);
}
public static ContinuityIndication<TKey, T> WithoutValue(TKey key)
{
return new ContinuityIndication<TKey, T>(key, default(T), false);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment