Last active
December 30, 2015 21:09
-
-
Save abdullin/7885433 to your computer and use it in GitHub Desktop.
Projection replayer for Lokad.CQRS projects (legacy)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#region Copyright (c) 2006-2013 LOKAD SAS. All rights reserved | |
// This document is shared under BSD license | |
#endregion | |
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Runtime.Serialization; | |
using System.Security.Cryptography; | |
using System.Text; | |
using System.Threading; | |
using Lokad.Cqrs; | |
using Lokad.Cqrs.KeyValueStorage; | |
using Mono.Cecil; | |
namespace Hub.Worker | |
{ | |
public class StartupProjectionRebuilder : IStartupTask | |
{ | |
readonly IKeyValueStore _store; | |
readonly KeyValueStoreClient _hashStore; | |
readonly MessageStore _stream; | |
readonly Func<IKeyValueStore, IEnumerable<object>> _projectors; | |
readonly Logger _logger = LogManager.GetLoggerFor<StartupProjectionRebuilder>(); | |
public StartupProjectionRebuilder(IKeyValueStore store, KeyValueStoreClient hashStore, MessageStore stream, Func<IKeyValueStore, IEnumerable<object>> projectors) | |
{ | |
_store = store; | |
_hashStore = hashStore; | |
_stream = stream; | |
_projectors = projectors; | |
} | |
[DataContract] | |
public sealed class ProjectionHash | |
{ | |
[DataMember(Order = 1)] | |
public Dictionary<string, string> BucketHashes { get; set; } | |
public ProjectionHash() | |
{ | |
BucketHashes = new Dictionary<string, string>(); | |
} | |
} | |
sealed class ProjectionInspectingStore : IKeyValueStore | |
{ | |
readonly IKeyValueStore _real; | |
public ProjectionInspectingStore(IKeyValueStore real) | |
{ | |
_real = real; | |
} | |
public readonly List<Projection> Projections = new List<Projection>(); | |
public sealed class Projection | |
{ | |
public Type EntityType; | |
public string StoreBucket; | |
} | |
public void ValidateSanity() | |
{ | |
if (Projections.Count == 0) | |
throw new InvalidOperationException("There were no projections registered"); | |
var viewsWithMultipleProjections = Projections.GroupBy(e => e.EntityType).Where(g => g.Count() > 1).ToList(); | |
if (viewsWithMultipleProjections.Count > 0) | |
{ | |
var builder = new StringBuilder(); | |
builder.AppendLine("Please, define only one projection per view. These views were referenced more than once:"); | |
foreach (var projection in viewsWithMultipleProjections) | |
{ | |
builder.AppendLine(" " + projection.Key); | |
} | |
builder.AppendLine("NB: you can use partials or dynamics in edge cases"); | |
throw new InvalidOperationException(builder.ToString()); | |
} | |
var viewsWithSimilarBuckets = Projections | |
.GroupBy(e => e.StoreBucket.ToLowerInvariant()) | |
.Where(g => g.Count() > 1) | |
.ToArray(); | |
if (viewsWithSimilarBuckets.Length > 0) | |
{ | |
var builder = new StringBuilder(); | |
builder.AppendLine("Following views will be stored in same location, which will cause problems:"); | |
foreach (var i in viewsWithSimilarBuckets) | |
{ | |
var @join = string.Join(",", i.Select(x => x.EntityType)); | |
builder.AppendFormat(" {0} : {1}", i.Key, @join).AppendLine(); | |
} | |
throw new InvalidOperationException(builder.ToString()); | |
} | |
} | |
public IKeyValueWriter<TKey, TEntity> GetWriter<TKey, TEntity>() | |
{ | |
Projections.Add(new Projection() | |
{ | |
EntityType = typeof(TEntity), | |
StoreBucket = _real.Strategy.GetValueBucket<TEntity>() | |
}); | |
return _real.GetWriter<TKey, TEntity>(); | |
} | |
public IKeyValueReader<TKey, TEntity> GetReader<TKey, TEntity>() | |
{ | |
return _real.GetReader<TKey, TEntity>(); | |
} | |
public IKeyValueStorageStrategy Strategy | |
{ | |
get { return _real.Strategy; } | |
} | |
public IEnumerable<RawKeyValue> EnumerateRawContents(string bucket) | |
{ | |
return _real.EnumerateRawContents(bucket); | |
} | |
public void WriteRawContents(string bucket, IEnumerable<RawKeyValue> records) | |
{ | |
_real.WriteRawContents(bucket, records); | |
} | |
public void Reset(string bucket) | |
{ | |
_real.Reset(bucket); | |
} | |
} | |
static string GetClassHash(Type type1) | |
{ | |
var location = type1.Assembly.Location; | |
var mod = ModuleDefinition.ReadModule(location); | |
var builder = new StringBuilder(); | |
var type = type1; | |
var typeDefinition = mod.GetType(type.FullName); | |
builder.AppendLine(typeDefinition.Name); | |
ProcessMembers(builder, typeDefinition); | |
// we include nested types | |
foreach (var nested in typeDefinition.NestedTypes) | |
{ | |
ProcessMembers(builder, nested); | |
} | |
var content = builder.ToString(); | |
using (var hasher = SHA1.Create()) | |
{ | |
var hash = hasher.ComputeHash(Encoding.UTF8.GetBytes(content)); | |
var readableHash = BitConverter.ToString(hash).Replace("-", ""); | |
return string.Format("{0}:{1}", type.FullName, readableHash); | |
} | |
} | |
static void ProcessMembers(StringBuilder builder, TypeDefinition typeDefinition) | |
{ | |
foreach (var md in typeDefinition.Methods.OrderBy(m => m.ToString())) | |
{ | |
builder.AppendLine(" " + md); | |
foreach (var instruction in md.Body.Instructions) | |
{ | |
// we don't care about offsets | |
instruction.Offset = 0; | |
builder.AppendLine(" " + instruction); | |
} | |
} | |
foreach (var field in typeDefinition.Fields.OrderBy(f => f.ToString())) | |
{ | |
builder.AppendLine(" " + field); | |
} | |
} | |
static void ObserveWhileCan(IEnumerable<StoreRecord> records, RedirectToDynamicEvent wire, CancellationToken token) | |
{ | |
var watch = Stopwatch.StartNew(); | |
int count = 0; | |
foreach (var record in records) | |
{ | |
count += 1; | |
if (token.IsCancellationRequested) | |
return; | |
if (count % 50000 == 0) | |
{ | |
SystemObserver.Notify("Observing {0} {1}", count, | |
Math.Round(watch.Elapsed.TotalSeconds, 2)); | |
watch.Restart(); | |
} | |
foreach (var message in record.Items) | |
{ | |
if (message is IEvent) | |
{ | |
try | |
{ | |
wire.InvokeEvent(message); | |
} | |
catch (Exception ex) | |
{ | |
var error = string.Format("Failed to invoke {0}: {1}", message.GetType().Name, message.ToString()); | |
throw new InvalidOperationException(error, ex); | |
} | |
} | |
} | |
} | |
} | |
public string StartupTaskName { get { return "Projection rebuilder"; } } | |
public void ExecuteStartup(CancellationToken token) | |
{ | |
var strategy = _store.Strategy; | |
var memoryContainer = new ObjectKeyValueStore(strategy); | |
var tracked = new ProjectionInspectingStore(memoryContainer); | |
var projections = new List<object>(); | |
projections.AddRange(_projectors(tracked)); | |
if (tracked.Projections.Count != projections.Count()) | |
throw new InvalidOperationException("Count mismatch"); | |
tracked.ValidateSanity(); | |
var persistedHashes = new Dictionary<string, string>(); | |
var name = "domain"; | |
_hashStore.GetEntity<ProjectionHash>(name).IfValue(v => persistedHashes = v.BucketHashes); | |
var activeMemoryProjections = projections.Select((projection, i) => | |
{ | |
var proj = tracked.Projections[i]; | |
var bucketName = proj.StoreBucket; | |
var viewType = proj.EntityType; | |
var projectionHash = | |
"Global change on 2013-03-05\r\n" + | |
GetClassHash(projection.GetType()) + | |
"\r\n " + GetClassHash(viewType) + "\r\n" + GetClassHash(strategy.GetType()); | |
bool needsRebuild = !persistedHashes.ContainsKey(bucketName) || persistedHashes[bucketName] != projectionHash; | |
return new | |
{ | |
bucketName, | |
projection, | |
hash = projectionHash, | |
needsRebuild | |
}; | |
}).ToArray(); | |
foreach (var memoryProjection in activeMemoryProjections) | |
{ | |
if (memoryProjection.needsRebuild) | |
{ | |
_logger.Info("{0} needs rebuild", memoryProjection.bucketName); | |
} | |
} | |
var needRebuild = activeMemoryProjections.Where(x => x.needsRebuild).ToArray(); | |
if (needRebuild.Length == 0) | |
{ | |
return; | |
} | |
var watch = Stopwatch.StartNew(); | |
var wire = new RedirectToDynamicEvent(); | |
needRebuild.ForEach(x => wire.WireToWhen(x.projection)); | |
var handlersWatch = Stopwatch.StartNew(); | |
_logger.Info("Starting rebuild"); | |
ObserveWhileCan(_stream.EnumerateAllItems(0, int.MaxValue), wire, token); | |
if (token.IsCancellationRequested) | |
{ | |
SystemObserver.Notify("[warn] Aborting projections before anything was changed"); | |
return; | |
} | |
var timeTotal = watch.Elapsed.TotalSeconds; | |
var handlerTicks = handlersWatch.ElapsedTicks; | |
var timeInHandlers = Math.Round(TimeSpan.FromTicks(handlerTicks).TotalSeconds, 1); | |
_logger.Info("Total Elapsed: {0}sec ({1}sec in handlers)", Math.Round(timeTotal, 0), timeInHandlers); | |
// update projections that need rebuild | |
foreach (var b in needRebuild) | |
{ | |
// server might shut down the process soon anyway, but we'll be | |
// in partially consistent mode (not all projections updated) | |
// so at least we blow up between projection buckets | |
token.ThrowIfCancellationRequested(); | |
var bucketName = b.bucketName; | |
var bucketHash = b.hash; | |
_logger.Info("Wiping bucket contents {0}.{1}", name, bucketName); | |
_store.Reset(bucketName); | |
_logger.Info("Writing new versions {0}.{1}", name, bucketName); | |
var contents = memoryContainer.EnumerateRawContents(bucketName); | |
_store.WriteRawContents(bucketName, contents); | |
// update hash | |
_hashStore.UpdateEntityEnforcingNew<ProjectionHash>(name, x => | |
{ | |
x.BucketHashes[bucketName] = bucketHash; | |
}); | |
_logger.Info("[good] Updated View bucket {0}.{1}", name, bucketName); | |
} | |
// Clean up obsolete views | |
var allBuckets = new HashSet<string>(activeMemoryProjections.Select(p => p.bucketName)); | |
var obsoleteBuckets = persistedHashes.Where(s => !allBuckets.Contains(s.Key)).ToArray(); | |
foreach (var hash in obsoleteBuckets) | |
{ | |
// quit at this stage without any bad side effects | |
if (token.IsCancellationRequested) | |
return; | |
var bucketName = hash.Key; | |
_logger.Info("[warn] {0} is obsolete", bucketName); | |
_store.Reset(bucketName); | |
_hashStore.UpdateEntityEnforcingNew<ProjectionHash>(name, x => x.BucketHashes.Remove(bucketName)); | |
_logger.Info("[good] Cleaned up obsolete view bucket {0}.{1}", name, bucketName); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment