Skip to content

Instantly share code, notes, and snippets.

@danielearwicker
Created January 21, 2017 21:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielearwicker/ccb74cdf25d1c30e2caeef37617a6472 to your computer and use it in GitHub Desktop.
Save danielearwicker/ccb74cdf25d1c30e2caeef37617a6472 to your computer and use it in GitHub Desktop.
Cudo (Create, Update, Delete, Observe) - subscribers get Create events to recreate current contents, followed by subsequent updates
namespace Cudo
{
public class CudoEvent<TRecord>
{
public CudoEvent(CudoEventType type, TRecord record)
{
Type = type;
Record = record;
}
public CudoEventType Type { get; }
public TRecord Record { get; }
}
}
namespace Cudo
{
public enum CudoEventType
{
Create,
Update,
Delete
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace Cudo
{
public class CudoTable<TId, TRecord>
{
private readonly object _locker = new object();
// What this essentially is:
private readonly Dictionary<TId, TRecord> _records = new Dictionary<TId, TRecord>();
// Use a subject to broadcast our events to subscribers
private readonly ISubject<CudoEvent<TRecord>> _events;
// Get an ID from a record
private readonly Func<TRecord, TId> _getId;
// Produce the event that adds a record (used to initialise a subscriber)
private static CudoEvent<TRecord> EventForRecord(TRecord record)
{
return new CudoEvent<TRecord>(CudoEventType.Create, record);
}
// Passed to Observable.Defer
private IObservable<CudoEvent<TRecord>> Subscribe()
{
lock (_locker)
{
return _records.Values // Current set of records
.Select(EventForRecord) // sequence of Create events
.ToList() // freeze it
.ToObservable() // for subscription
.Concat(_events) // followed by hot event stream
.ObserveOn(Scheduler.Default);
}
}
public CudoTable(Func<TRecord, TId> getId)
{
_getId = getId;
_events = new Subject<CudoEvent<TRecord>>();
// Use Defer, so each subscriber gets an observable built on demand
EventStream = Observable.Defer(Subscribe);
}
// How clients "read" this table
public IObservable<CudoEvent<TRecord>> EventStream { get; }
public void Store(TRecord record)
{
var id = _getId(record);
lock (_locker)
{
TRecord existing;
if (_records.TryGetValue(id, out existing))
{
_records[id] = record;
_events.OnNext(new CudoEvent<TRecord>(CudoEventType.Update, record));
}
else
{
_records.Add(id, record);
_events.OnNext(new CudoEvent<TRecord>(CudoEventType.Create, record));
}
}
}
public void Delete(TId id)
{
lock (_locker)
{
TRecord existing;
if (!_records.TryGetValue(id, out existing))
{
return;
}
_records.Remove(id);
_events.OnNext(new CudoEvent<TRecord>(CudoEventType.Delete, existing));
}
}
}
}
@danielearwicker
Copy link
Author

I have an in-memory table of objects that are constantly being created/updated/deleted.

I want to be able to subscribe as an observer so I can maintain a mirror of the objects. It's a stream of events: Create, Update, Delete. CRUD without the "Read". Instead of reading (pulling) I Observe (am pushed to), hence CUDO.

When I first subscribe to such a table's EventStream, I want to receive a dump of Create events. This effectively replays the creation of the current state (but is not the same as a full replay - it's just the minimal events needed to replicate the current state, not all the history of create/update/delete that has been happening prior to my subscription).

A plain Subject of events serves as the hot/live stream of changes, but this is internal (_events).

The public EventStream for clients to subscribe to is created with Defer. This allows me to build each subscriber their own custom observable to which they are actually subscribing. See Subscribe, which builds the observer for a subscriber. The observer returned is basically the concatenation of (a) a stream of Create events to build the current state, and (b) the hot _events stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment