Skip to content

Instantly share code, notes, and snippets.

@abdullin
Last active December 30, 2015 21:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save abdullin/7885433 to your computer and use it in GitHub Desktop.
Save abdullin/7885433 to your computer and use it in GitHub Desktop.
Projection replayer for Lokad.CQRS projects (legacy)
#region Copyright (c) 2006-2013 LOKAD SAS. All rights reserved
// This document is shared under BSD license
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using Lokad.Cqrs;
using Lokad.Cqrs.KeyValueStorage;
using Mono.Cecil;
namespace Hub.Worker
{
public class StartupProjectionRebuilder : IStartupTask
{
readonly IKeyValueStore _store;
readonly KeyValueStoreClient _hashStore;
readonly MessageStore _stream;
readonly Func<IKeyValueStore, IEnumerable<object>> _projectors;
readonly Logger _logger = LogManager.GetLoggerFor<StartupProjectionRebuilder>();
public StartupProjectionRebuilder(IKeyValueStore store, KeyValueStoreClient hashStore, MessageStore stream, Func<IKeyValueStore, IEnumerable<object>> projectors)
{
_store = store;
_hashStore = hashStore;
_stream = stream;
_projectors = projectors;
}
[DataContract]
public sealed class ProjectionHash
{
[DataMember(Order = 1)]
public Dictionary<string, string> BucketHashes { get; set; }
public ProjectionHash()
{
BucketHashes = new Dictionary<string, string>();
}
}
sealed class ProjectionInspectingStore : IKeyValueStore
{
readonly IKeyValueStore _real;
public ProjectionInspectingStore(IKeyValueStore real)
{
_real = real;
}
public readonly List<Projection> Projections = new List<Projection>();
public sealed class Projection
{
public Type EntityType;
public string StoreBucket;
}
public void ValidateSanity()
{
if (Projections.Count == 0)
throw new InvalidOperationException("There were no projections registered");
var viewsWithMultipleProjections = Projections.GroupBy(e => e.EntityType).Where(g => g.Count() > 1).ToList();
if (viewsWithMultipleProjections.Count > 0)
{
var builder = new StringBuilder();
builder.AppendLine("Please, define only one projection per view. These views were referenced more than once:");
foreach (var projection in viewsWithMultipleProjections)
{
builder.AppendLine(" " + projection.Key);
}
builder.AppendLine("NB: you can use partials or dynamics in edge cases");
throw new InvalidOperationException(builder.ToString());
}
var viewsWithSimilarBuckets = Projections
.GroupBy(e => e.StoreBucket.ToLowerInvariant())
.Where(g => g.Count() > 1)
.ToArray();
if (viewsWithSimilarBuckets.Length > 0)
{
var builder = new StringBuilder();
builder.AppendLine("Following views will be stored in same location, which will cause problems:");
foreach (var i in viewsWithSimilarBuckets)
{
var @join = string.Join(",", i.Select(x => x.EntityType));
builder.AppendFormat(" {0} : {1}", i.Key, @join).AppendLine();
}
throw new InvalidOperationException(builder.ToString());
}
}
public IKeyValueWriter<TKey, TEntity> GetWriter<TKey, TEntity>()
{
Projections.Add(new Projection()
{
EntityType = typeof(TEntity),
StoreBucket = _real.Strategy.GetValueBucket<TEntity>()
});
return _real.GetWriter<TKey, TEntity>();
}
public IKeyValueReader<TKey, TEntity> GetReader<TKey, TEntity>()
{
return _real.GetReader<TKey, TEntity>();
}
public IKeyValueStorageStrategy Strategy
{
get { return _real.Strategy; }
}
public IEnumerable<RawKeyValue> EnumerateRawContents(string bucket)
{
return _real.EnumerateRawContents(bucket);
}
public void WriteRawContents(string bucket, IEnumerable<RawKeyValue> records)
{
_real.WriteRawContents(bucket, records);
}
public void Reset(string bucket)
{
_real.Reset(bucket);
}
}
static string GetClassHash(Type type1)
{
var location = type1.Assembly.Location;
var mod = ModuleDefinition.ReadModule(location);
var builder = new StringBuilder();
var type = type1;
var typeDefinition = mod.GetType(type.FullName);
builder.AppendLine(typeDefinition.Name);
ProcessMembers(builder, typeDefinition);
// we include nested types
foreach (var nested in typeDefinition.NestedTypes)
{
ProcessMembers(builder, nested);
}
var content = builder.ToString();
using (var hasher = SHA1.Create())
{
var hash = hasher.ComputeHash(Encoding.UTF8.GetBytes(content));
var readableHash = BitConverter.ToString(hash).Replace("-", "");
return string.Format("{0}:{1}", type.FullName, readableHash);
}
}
static void ProcessMembers(StringBuilder builder, TypeDefinition typeDefinition)
{
foreach (var md in typeDefinition.Methods.OrderBy(m => m.ToString()))
{
builder.AppendLine(" " + md);
foreach (var instruction in md.Body.Instructions)
{
// we don't care about offsets
instruction.Offset = 0;
builder.AppendLine(" " + instruction);
}
}
foreach (var field in typeDefinition.Fields.OrderBy(f => f.ToString()))
{
builder.AppendLine(" " + field);
}
}
static void ObserveWhileCan(IEnumerable<StoreRecord> records, RedirectToDynamicEvent wire, CancellationToken token)
{
var watch = Stopwatch.StartNew();
int count = 0;
foreach (var record in records)
{
count += 1;
if (token.IsCancellationRequested)
return;
if (count % 50000 == 0)
{
SystemObserver.Notify("Observing {0} {1}", count,
Math.Round(watch.Elapsed.TotalSeconds, 2));
watch.Restart();
}
foreach (var message in record.Items)
{
if (message is IEvent)
{
try
{
wire.InvokeEvent(message);
}
catch (Exception ex)
{
var error = string.Format("Failed to invoke {0}: {1}", message.GetType().Name, message.ToString());
throw new InvalidOperationException(error, ex);
}
}
}
}
}
public string StartupTaskName { get { return "Projection rebuilder"; } }
public void ExecuteStartup(CancellationToken token)
{
var strategy = _store.Strategy;
var memoryContainer = new ObjectKeyValueStore(strategy);
var tracked = new ProjectionInspectingStore(memoryContainer);
var projections = new List<object>();
projections.AddRange(_projectors(tracked));
if (tracked.Projections.Count != projections.Count())
throw new InvalidOperationException("Count mismatch");
tracked.ValidateSanity();
var persistedHashes = new Dictionary<string, string>();
var name = "domain";
_hashStore.GetEntity<ProjectionHash>(name).IfValue(v => persistedHashes = v.BucketHashes);
var activeMemoryProjections = projections.Select((projection, i) =>
{
var proj = tracked.Projections[i];
var bucketName = proj.StoreBucket;
var viewType = proj.EntityType;
var projectionHash =
"Global change on 2013-03-05\r\n" +
GetClassHash(projection.GetType()) +
"\r\n " + GetClassHash(viewType) + "\r\n" + GetClassHash(strategy.GetType());
bool needsRebuild = !persistedHashes.ContainsKey(bucketName) || persistedHashes[bucketName] != projectionHash;
return new
{
bucketName,
projection,
hash = projectionHash,
needsRebuild
};
}).ToArray();
foreach (var memoryProjection in activeMemoryProjections)
{
if (memoryProjection.needsRebuild)
{
_logger.Info("{0} needs rebuild", memoryProjection.bucketName);
}
}
var needRebuild = activeMemoryProjections.Where(x => x.needsRebuild).ToArray();
if (needRebuild.Length == 0)
{
return;
}
var watch = Stopwatch.StartNew();
var wire = new RedirectToDynamicEvent();
needRebuild.ForEach(x => wire.WireToWhen(x.projection));
var handlersWatch = Stopwatch.StartNew();
_logger.Info("Starting rebuild");
ObserveWhileCan(_stream.EnumerateAllItems(0, int.MaxValue), wire, token);
if (token.IsCancellationRequested)
{
SystemObserver.Notify("[warn] Aborting projections before anything was changed");
return;
}
var timeTotal = watch.Elapsed.TotalSeconds;
var handlerTicks = handlersWatch.ElapsedTicks;
var timeInHandlers = Math.Round(TimeSpan.FromTicks(handlerTicks).TotalSeconds, 1);
_logger.Info("Total Elapsed: {0}sec ({1}sec in handlers)", Math.Round(timeTotal, 0), timeInHandlers);
// update projections that need rebuild
foreach (var b in needRebuild)
{
// server might shut down the process soon anyway, but we'll be
// in partially consistent mode (not all projections updated)
// so at least we blow up between projection buckets
token.ThrowIfCancellationRequested();
var bucketName = b.bucketName;
var bucketHash = b.hash;
_logger.Info("Wiping bucket contents {0}.{1}", name, bucketName);
_store.Reset(bucketName);
_logger.Info("Writing new versions {0}.{1}", name, bucketName);
var contents = memoryContainer.EnumerateRawContents(bucketName);
_store.WriteRawContents(bucketName, contents);
// update hash
_hashStore.UpdateEntityEnforcingNew<ProjectionHash>(name, x =>
{
x.BucketHashes[bucketName] = bucketHash;
});
_logger.Info("[good] Updated View bucket {0}.{1}", name, bucketName);
}
// Clean up obsolete views
var allBuckets = new HashSet<string>(activeMemoryProjections.Select(p => p.bucketName));
var obsoleteBuckets = persistedHashes.Where(s => !allBuckets.Contains(s.Key)).ToArray();
foreach (var hash in obsoleteBuckets)
{
// quit at this stage without any bad side effects
if (token.IsCancellationRequested)
return;
var bucketName = hash.Key;
_logger.Info("[warn] {0} is obsolete", bucketName);
_store.Reset(bucketName);
_hashStore.UpdateEntityEnforcingNew<ProjectionHash>(name, x => x.BucketHashes.Remove(bucketName));
_logger.Info("[good] Cleaned up obsolete view bucket {0}.{1}", name, bucketName);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment