Skip to content

Instantly share code, notes, and snippets.

@abdullin
Last active December 15, 2015 19:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abdullin/5308613 to your computer and use it in GitHub Desktop.
Save abdullin/5308613 to your computer and use it in GitHub Desktop.
Example of memory key-value storage (to be used together with ProjectionRebuilder of Lokad.CQRS) that allows really fast rebuilds by skipping serialization step.
public sealed class NonSerializingMemoryDocumentStore : IDocumentStore
{
readonly ConcurrentDictionary<string,IStoreInfo> _store = new ConcurrentDictionary<string, IStoreInfo>();
readonly IDocumentStrategy _strategy;
public interface IStoreInfo
{
IEnumerable<DocumentRecord> Enumerate();
}
public sealed class StoreInfo<TEntity> : IStoreInfo
{
public Action<TEntity, Stream> Serialize;
public Func<Stream,TEntity> Deserialize;
public readonly ConcurrentDictionary<string,TEntity> Values = new ConcurrentDictionary<string, TEntity>();
public IEnumerable<DocumentRecord> Enumerate()
{
return Values.Select(pair => new DocumentRecord(pair.Key, () =>
{
using (var mem = new MemoryStream())
{
Serialize(pair.Value, mem);
return mem.ToArray();
}
}));
}
}
public NonSerializingMemoryDocumentStore(IDocumentStrategy strategy)
{
_strategy = strategy;
}
public IDocumentWriter<TKey,TEntity> GetWriter<TKey,TEntity>()
{
var bucket = _strategy.GetEntityBucket<TEntity>();
var store = GetOrAdd<TEntity>(bucket);
return new MemoryDocumentReaderWriter<TKey, TEntity>(store.Values,_strategy);
}
StoreInfo<TEntity> GetOrAdd<TEntity>(string bucket)
{
return (StoreInfo<TEntity>)_store.GetOrAdd(bucket, s => new StoreInfo<TEntity>()
{
Deserialize = _strategy.Deserialize<TEntity>,
Serialize = _strategy.Serialize
});
}
public void WriteContents(string bucket, IEnumerable<DocumentRecord> records)
{
throw new NotImplementedException();
}
public void ResetAll()
{
_store.Clear();
}
public void Reset(string bucketNames)
{
IStoreInfo deletedValue;
_store.TryRemove(bucketNames, out deletedValue);
}
public IDocumentReader<TKey, TEntity> GetReader<TKey, TEntity>()
{
var bucket = _strategy.GetEntityBucket<TEntity>();
var store = GetOrAdd<TEntity>(bucket);
return new MemoryDocumentReaderWriter<TKey, TEntity>(store.Values,_strategy);
}
public IDocumentStrategy Strategy
{
get { return _strategy; }
}
public IEnumerable<DocumentRecord> EnumerateContents(string bucket)
{
IStoreInfo info;
if (! _store.TryGetValue(bucket, out info))
{
return Enumerable.Empty<DocumentRecord>();
}
return info.Enumerate();
}
}
public sealed class MemoryDocumentReaderWriter<TKey, TEntity> : IDocumentReader<TKey, TEntity>, IDocumentWriter<TKey, TEntity>
{
readonly IDocumentStrategy _strategy;
readonly ConcurrentDictionary<string, TEntity> _store;
public MemoryDocumentReaderWriter(ConcurrentDictionary<string, TEntity> store, IDocumentStrategy strategy)
{
_store = store;
_strategy = strategy;
}
string GetName(TKey key)
{
return _strategy.GetEntityLocation<TEntity>(key);
}
public bool TryGet(TKey key, out TEntity entity)
{
var name = GetName(key);
TEntity bytes;
if (_store.TryGetValue(name, out bytes))
{
entity = bytes;
return true;
}
entity = default(TEntity);
return false;
}
public TEntity AddOrUpdate(TKey key, Func<TEntity> addFactory, Func<TEntity, TEntity> update, AddOrUpdateHint hint)
{
return _store.AddOrUpdate(GetName(key), s => addFactory(), (s2, bytes) => update(bytes));
}
public bool TryDelete(TKey key)
{
TEntity bytes;
return _store.TryRemove(GetName(key), out bytes);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment