Skip to content

Instantly share code, notes, and snippets.

@discosultan
Last active June 8, 2018 11:59
Show Gist options
  • Save discosultan/a28fdaf6cb11b0df3bbd3d531403bced to your computer and use it in GitHub Desktop.
Save discosultan/a28fdaf6cb11b0df3bbd3d531403bced to your computer and use it in GitHub Desktop.
Observable.FromEventPattern verbose example
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace Pronoodle.Products
{
/// <summary>
/// An in-memory implementation of the <see cref="IProductRepository"/> contract.
/// </summary>
public class InMemoryProductRepository : IProductRepository
{
// Must be > 0.
const int StreamExistingChunkSize = 100;
const int StreamNewChunkSize = 2;
static readonly TimeSpan StreamAllChunkDelay = TimeSpan.FromMilliseconds(20);
static readonly TimeSpan AddOrUpdateDelay = TimeSpan.FromMilliseconds(200);
readonly Dictionary<string, Product> _repository = new Dictionary<string, Product>();
event Action<IEnumerable<Product>> _productAddedOrUpdated;
readonly object _syncRoot = new object();
/// <summary>
/// Streams all the existing as well as future products (both added and modified).
/// </summary>
/// <returns>A stream of products.</returns>
public IObservable<IEnumerable<Product>> StreamAll()
{
// Stream existing products.
return Observable
.Create<IEnumerable<Product>>(observer =>
{
// We need to synchronize both read and write acccess to the dictionary as well
// as write access to subject to enable parallel concurrency.
lock (_syncRoot)
// Stream in chunks.
foreach (var chunk in Chunkify(_repository.Values, StreamExistingChunkSize))
observer.OnNext(chunk);
observer.OnCompleted();
return Disposable.Empty;
})
// Start streaming new/updated products.
.Concat(Observable.FromEvent<IEnumerable<Product>>(
h => _productAddedOrUpdated += h,
h => _productAddedOrUpdated -= h))
// Simulate slow loading.
.Select(products => Observable.Timer(StreamAllChunkDelay).Select(_ => products))
.Concat();
}
/// <summary>
/// Adds new or updates existing products (in case products with same key exist).
/// </summary>
/// <param name="products">Products to add or update.</param>
/// <returns>A task signaling the completion of this async operation.</returns>
public async Task AddOrUpdate(IEnumerable<Product> products)
{
if (products == null)
throw new ArgumentNullException(nameof(products));
lock (_syncRoot)
{
foreach (Product product in products)
_repository[product.Key] = product;
foreach (var chunk in Chunkify(products, StreamNewChunkSize))
_productAddedOrUpdated?.Invoke(chunk);
}
// Simulate slow processing.
await Task.Delay(AddOrUpdateDelay);
}
// TODO: A good candidate for generalisation and moving to a util/extension.
static IEnumerable<List<Product>> Chunkify(IEnumerable<Product> products, int size)
{
var chunk = new List<Product>(size);
foreach (var product in products)
{
chunk.Add(product);
if (chunk.Count == size)
{
yield return chunk;
chunk = new List<Product>(size);
}
}
// Send last chunk if partial.
if (chunk.Count > 0) yield return chunk;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment