Created
January 21, 2017 21:52
-
-
Save danielearwicker/ccb74cdf25d1c30e2caeef37617a6472 to your computer and use it in GitHub Desktop.
Cudo (Create, Update, Delete, Observe) - subscribers get Create events to recreate current contents, followed by subsequent updates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Cudo | |
{ | |
public class CudoEvent<TRecord> | |
{ | |
public CudoEvent(CudoEventType type, TRecord record) | |
{ | |
Type = type; | |
Record = record; | |
} | |
public CudoEventType Type { get; } | |
public TRecord Record { get; } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Cudo | |
{ | |
public enum CudoEventType | |
{ | |
Create, | |
Update, | |
Delete | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace Cudo | |
{ | |
public class CudoTable<TId, TRecord> | |
{ | |
private readonly object _locker = new object(); | |
// What this essentially is: | |
private readonly Dictionary<TId, TRecord> _records = new Dictionary<TId, TRecord>(); | |
// Use a subject to broadcast our events to subscribers | |
private readonly ISubject<CudoEvent<TRecord>> _events; | |
// Get an ID from a record | |
private readonly Func<TRecord, TId> _getId; | |
// Produce the event that adds a record (used to initialise a subscriber) | |
private static CudoEvent<TRecord> EventForRecord(TRecord record) | |
{ | |
return new CudoEvent<TRecord>(CudoEventType.Create, record); | |
} | |
// Passed to Observable.Defer | |
private IObservable<CudoEvent<TRecord>> Subscribe() | |
{ | |
lock (_locker) | |
{ | |
return _records.Values // Current set of records | |
.Select(EventForRecord) // sequence of Create events | |
.ToList() // freeze it | |
.ToObservable() // for subscription | |
.Concat(_events) // followed by hot event stream | |
.ObserveOn(Scheduler.Default); | |
} | |
} | |
public CudoTable(Func<TRecord, TId> getId) | |
{ | |
_getId = getId; | |
_events = new Subject<CudoEvent<TRecord>>(); | |
// Use Defer, so each subscriber gets an observable built on demand | |
EventStream = Observable.Defer(Subscribe); | |
} | |
// How clients "read" this table | |
public IObservable<CudoEvent<TRecord>> EventStream { get; } | |
public void Store(TRecord record) | |
{ | |
var id = _getId(record); | |
lock (_locker) | |
{ | |
TRecord existing; | |
if (_records.TryGetValue(id, out existing)) | |
{ | |
_records[id] = record; | |
_events.OnNext(new CudoEvent<TRecord>(CudoEventType.Update, record)); | |
} | |
else | |
{ | |
_records.Add(id, record); | |
_events.OnNext(new CudoEvent<TRecord>(CudoEventType.Create, record)); | |
} | |
} | |
} | |
public void Delete(TId id) | |
{ | |
lock (_locker) | |
{ | |
TRecord existing; | |
if (!_records.TryGetValue(id, out existing)) | |
{ | |
return; | |
} | |
_records.Remove(id); | |
_events.OnNext(new CudoEvent<TRecord>(CudoEventType.Delete, existing)); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have an in-memory table of objects that are constantly being created/updated/deleted.
I want to be able to subscribe as an observer so I can maintain a mirror of the objects. It's a stream of events: Create, Update, Delete. CRUD without the "Read". Instead of reading (pulling) I Observe (am pushed to), hence CUDO.
When I first subscribe to such a table's
EventStream
, I want to receive a dump ofCreate
events. This effectively replays the creation of the current state (but is not the same as a full replay - it's just the minimal events needed to replicate the current state, not all the history of create/update/delete that has been happening prior to my subscription).A plain
Subject
of events serves as the hot/live stream of changes, but this is internal (_events
).The public
EventStream
for clients to subscribe to is created withDefer
. This allows me to build each subscriber their own custom observable to which they are actually subscribing. SeeSubscribe
, which builds the observer for a subscriber. The observer returned is basically the concatenation of (a) a stream ofCreate
events to build the current state, and (b) the hot_events
stream.