Last active
June 8, 2018 11:59
-
-
Save discosultan/a28fdaf6cb11b0df3bbd3d531403bced to your computer and use it in GitHub Desktop.
Observable.FromEventPattern verbose example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Threading.Tasks; | |
namespace Pronoodle.Products | |
{ | |
/// <summary> | |
/// An in-memory implementation of the <see cref="IProductRepository"/> contract. | |
/// </summary> | |
public class InMemoryProductRepository : IProductRepository | |
{ | |
// Must be > 0. | |
const int StreamExistingChunkSize = 100; | |
const int StreamNewChunkSize = 2; | |
static readonly TimeSpan StreamAllChunkDelay = TimeSpan.FromMilliseconds(20); | |
static readonly TimeSpan AddOrUpdateDelay = TimeSpan.FromMilliseconds(200); | |
readonly Dictionary<string, Product> _repository = new Dictionary<string, Product>(); | |
event Action<IEnumerable<Product>> _productAddedOrUpdated; | |
readonly object _syncRoot = new object(); | |
/// <summary> | |
/// Streams all the existing as well as future products (both added and modified). | |
/// </summary> | |
/// <returns>A stream of products.</returns> | |
public IObservable<IEnumerable<Product>> StreamAll() | |
{ | |
// Stream existing products. | |
return Observable | |
.Create<IEnumerable<Product>>(observer => | |
{ | |
// We need to synchronize both read and write acccess to the dictionary as well | |
// as write access to subject to enable parallel concurrency. | |
lock (_syncRoot) | |
// Stream in chunks. | |
foreach (var chunk in Chunkify(_repository.Values, StreamExistingChunkSize)) | |
observer.OnNext(chunk); | |
observer.OnCompleted(); | |
return Disposable.Empty; | |
}) | |
// Start streaming new/updated products. | |
.Concat(Observable.FromEvent<IEnumerable<Product>>( | |
h => _productAddedOrUpdated += h, | |
h => _productAddedOrUpdated -= h)) | |
// Simulate slow loading. | |
.Select(products => Observable.Timer(StreamAllChunkDelay).Select(_ => products)) | |
.Concat(); | |
} | |
/// <summary> | |
/// Adds new or updates existing products (in case products with same key exist). | |
/// </summary> | |
/// <param name="products">Products to add or update.</param> | |
/// <returns>A task signaling the completion of this async operation.</returns> | |
public async Task AddOrUpdate(IEnumerable<Product> products) | |
{ | |
if (products == null) | |
throw new ArgumentNullException(nameof(products)); | |
lock (_syncRoot) | |
{ | |
foreach (Product product in products) | |
_repository[product.Key] = product; | |
foreach (var chunk in Chunkify(products, StreamNewChunkSize)) | |
_productAddedOrUpdated?.Invoke(chunk); | |
} | |
// Simulate slow processing. | |
await Task.Delay(AddOrUpdateDelay); | |
} | |
// TODO: A good candidate for generalisation and moving to a util/extension. | |
static IEnumerable<List<Product>> Chunkify(IEnumerable<Product> products, int size) | |
{ | |
var chunk = new List<Product>(size); | |
foreach (var product in products) | |
{ | |
chunk.Add(product); | |
if (chunk.Count == size) | |
{ | |
yield return chunk; | |
chunk = new List<Product>(size); | |
} | |
} | |
// Send last chunk if partial. | |
if (chunk.Count > 0) yield return chunk; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment