Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Last active July 18, 2020 02:57
Show Gist options
  • Save to11mtm/c9d29944184fe00bcb1be2a9dd09319d to your computer and use it in GitHub Desktop.
Save to11mtm/c9d29944184fe00bcb1be2a9dd09319d to your computer and use it in GitHub Desktop.
Distributed Data Dedupe alpha concept
using System;
using System.Linq;
using Akka.Actor;
using Akka.Cluster;
using Akka.DistributedData;
namespace Worker.Receiver.Deduplication.DData
{
public abstract class DeduplicatingReceiverBase : ActorBase
{
protected readonly DistributedData Replicator =
DistributedData.Get(Context.System);
protected abstract string ReplicatorKey { get; }
protected abstract int MaxAttempts { get; }
protected DeduplicationState PeekCurrentState(string key)
{
return Replicator
.GetAsync(
new LWWDictionaryKey<string, DeduplicationState>(
ReplicatorKey)).Result.FirstOrDefault(r => r.Key == key)
.Value;
}
protected DeduplicationState AdvanceAndGetProcessingState(string key)
{
Replicator.Replicator.Tell(CreateAdvanceAndGetStateCommand(key));
return Replicator
.GetAsync(
new LWWDictionaryKey<string, DeduplicationState>(
ReplicatorKey), Dsl.ReadLocal).Result
.FirstOrDefault(r => r.Key == key).Value;
}
protected void MarkCompletion(string key)
{
Replicator.Replicator.Tell(CreateUpdateStateCompletedCommand(key));
}
protected Update CreateUpdateStateCompletedCommand(string innerKey)
{
var cluster = Cluster.Get(Context.System);
return Dsl.Update(
key: new LWWDictionaryKey<string, DeduplicationState>(
ReplicatorKey),
initial: LWWDictionary.Create(node: cluster.SelfUniqueAddress,
key: innerKey,
value: new DeduplicationState()
{
NumberAttempts = 0,
ProcessingState = DeduplicationProcessingState.Processed
})
, consistency: new WriteMajority(TimeSpan.FromSeconds(5)),
modify: dds =>
{
if (dds.TryGetValue(innerKey, out DeduplicationState state))
{
state.ProcessingState =
DeduplicationProcessingState.Processed;
return dds.SetItem(cluster, innerKey, state);
}
else
{
return dds.SetItem(cluster, innerKey,
new DeduplicationState()
{
NumberAttempts = 0,
ProcessingState = DeduplicationProcessingState
.Processed
});
}
});
}
protected Update CreateAdvanceAndGetStateCommand(string innerKey)
{
var cluster = Cluster.Get(Context.System);
return Dsl.Update(
key: new LWWDictionaryKey<string, DeduplicationState>(
ReplicatorKey),
initial: LWWDictionary.Create(node: cluster.SelfUniqueAddress,
key: innerKey,
value: new DeduplicationState()
{
NumberAttempts = 0,
ProcessingState =
DeduplicationProcessingState.NotAttempted
}
)
, consistency: new WriteMajority(TimeSpan.FromSeconds(5)),
modify: dds =>
{
if (dds.TryGetValue(innerKey, out DeduplicationState state))
{
if (state.ProcessingState ==
DeduplicationProcessingState.Error ||
state.ProcessingState ==
DeduplicationProcessingState.Processed)
{
return dds;
}
else if (state.NumberAttempts < MaxAttempts)
{
state.NumberAttempts = state.NumberAttempts + 1;
state.ProcessingState = DeduplicationProcessingState
.Attempted;
return dds.SetItem(cluster, innerKey, state);
}
else
{
state.ProcessingState =
DeduplicationProcessingState.Error;
return dds.SetItem(cluster, innerKey, state);
}
}
else
{
return dds.SetItem(cluster, innerKey,
new DeduplicationState()
{
NumberAttempts = 0,
ProcessingState = DeduplicationProcessingState
.NotAttempted
});
}
});
}
}
public class DeduplicationState
{
public int NumberAttempts { get; set; }
public DeduplicationProcessingState ProcessingState { get; set; }
}
public enum DeduplicationProcessingState
{
NotAttempted,
Attempted,
Processed,
Error
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment