Last active
July 18, 2020 02:57
-
-
Save to11mtm/c9d29944184fe00bcb1be2a9dd09319d to your computer and use it in GitHub Desktop.
Distributed Data Dedupe alpha concept
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using Akka.Actor; | |
using Akka.Cluster; | |
using Akka.DistributedData; | |
namespace Worker.Receiver.Deduplication.DData | |
{ | |
public abstract class DeduplicatingReceiverBase : ActorBase | |
{ | |
protected readonly DistributedData Replicator = | |
DistributedData.Get(Context.System); | |
protected abstract string ReplicatorKey { get; } | |
protected abstract int MaxAttempts { get; } | |
protected DeduplicationState PeekCurrentState(string key) | |
{ | |
return Replicator | |
.GetAsync( | |
new LWWDictionaryKey<string, DeduplicationState>( | |
ReplicatorKey)).Result.FirstOrDefault(r => r.Key == key) | |
.Value; | |
} | |
protected DeduplicationState AdvanceAndGetProcessingState(string key) | |
{ | |
Replicator.Replicator.Tell(CreateAdvanceAndGetStateCommand(key)); | |
return Replicator | |
.GetAsync( | |
new LWWDictionaryKey<string, DeduplicationState>( | |
ReplicatorKey), Dsl.ReadLocal).Result | |
.FirstOrDefault(r => r.Key == key).Value; | |
} | |
protected void MarkCompletion(string key) | |
{ | |
Replicator.Replicator.Tell(CreateUpdateStateCompletedCommand(key)); | |
} | |
protected Update CreateUpdateStateCompletedCommand(string innerKey) | |
{ | |
var cluster = Cluster.Get(Context.System); | |
return Dsl.Update( | |
key: new LWWDictionaryKey<string, DeduplicationState>( | |
ReplicatorKey), | |
initial: LWWDictionary.Create(node: cluster.SelfUniqueAddress, | |
key: innerKey, | |
value: new DeduplicationState() | |
{ | |
NumberAttempts = 0, | |
ProcessingState = DeduplicationProcessingState.Processed | |
}) | |
, consistency: new WriteMajority(TimeSpan.FromSeconds(5)), | |
modify: dds => | |
{ | |
if (dds.TryGetValue(innerKey, out DeduplicationState state)) | |
{ | |
state.ProcessingState = | |
DeduplicationProcessingState.Processed; | |
return dds.SetItem(cluster, innerKey, state); | |
} | |
else | |
{ | |
return dds.SetItem(cluster, innerKey, | |
new DeduplicationState() | |
{ | |
NumberAttempts = 0, | |
ProcessingState = DeduplicationProcessingState | |
.Processed | |
}); | |
} | |
}); | |
} | |
protected Update CreateAdvanceAndGetStateCommand(string innerKey) | |
{ | |
var cluster = Cluster.Get(Context.System); | |
return Dsl.Update( | |
key: new LWWDictionaryKey<string, DeduplicationState>( | |
ReplicatorKey), | |
initial: LWWDictionary.Create(node: cluster.SelfUniqueAddress, | |
key: innerKey, | |
value: new DeduplicationState() | |
{ | |
NumberAttempts = 0, | |
ProcessingState = | |
DeduplicationProcessingState.NotAttempted | |
} | |
) | |
, consistency: new WriteMajority(TimeSpan.FromSeconds(5)), | |
modify: dds => | |
{ | |
if (dds.TryGetValue(innerKey, out DeduplicationState state)) | |
{ | |
if (state.ProcessingState == | |
DeduplicationProcessingState.Error || | |
state.ProcessingState == | |
DeduplicationProcessingState.Processed) | |
{ | |
return dds; | |
} | |
else if (state.NumberAttempts < MaxAttempts) | |
{ | |
state.NumberAttempts = state.NumberAttempts + 1; | |
state.ProcessingState = DeduplicationProcessingState | |
.Attempted; | |
return dds.SetItem(cluster, innerKey, state); | |
} | |
else | |
{ | |
state.ProcessingState = | |
DeduplicationProcessingState.Error; | |
return dds.SetItem(cluster, innerKey, state); | |
} | |
} | |
else | |
{ | |
return dds.SetItem(cluster, innerKey, | |
new DeduplicationState() | |
{ | |
NumberAttempts = 0, | |
ProcessingState = DeduplicationProcessingState | |
.NotAttempted | |
}); | |
} | |
}); | |
} | |
} | |
public class DeduplicationState | |
{ | |
public int NumberAttempts { get; set; } | |
public DeduplicationProcessingState ProcessingState { get; set; } | |
} | |
public enum DeduplicationProcessingState | |
{ | |
NotAttempted, | |
Attempted, | |
Processed, | |
Error | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment