Created
February 27, 2019 23:27
-
-
Save AndreSteenbergen/58fd2a232f47b97b666f462293523a78 to your computer and use it in GitHub Desktop.
Kafka journal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IEventTopicMapper | |
{ | |
/// <summary> | |
/// Determine the kafka topic and partion, as well as the key for this particular topic persistenceId | |
/// </summary> | |
/// <param name="persistenceId"></param> | |
/// <returns></returns> | |
Tuple<TopicPartition, byte[]> GetTopicPartitionForPersistenceId(string persistenceId); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections.Immutable; | |
using System.Linq; | |
using System.Reflection; | |
using System.Text; | |
using System.Threading.Tasks; | |
using System.Xml.Schema; | |
using Akka.Actor; | |
using Akka.Persistence.Journal; | |
using Akka.Serialization; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
using Akka.Streams.Kafka.Dsl; | |
using Akka.Streams.Kafka.Messages; | |
using Akka.Streams.Kafka.Settings; | |
using Confluent.Kafka; | |
using Confluent.Kafka.Serdes; | |
namespace Akka.Persistence.KafkaRocks.Journal | |
{ | |
public class KafkaJournal : AsyncWriteJournal | |
{ | |
private readonly KafkaRocksJournalSettings settings; | |
private IProducer<byte[], byte[]> producer; | |
private IEventTopicMapper topicMapper; | |
private ConsumerSettings<byte[], byte[]> consumerSettings; | |
private ActorMaterializer materializer; | |
public KafkaJournal() | |
{ | |
settings = KafkaRocksPersistence.Get(Context.System).JournalSettings; | |
} | |
protected override void PreStart() | |
{ | |
base.PreStart(); | |
var producerSettings = ProducerSettings<byte[], byte[]> | |
.Create(settings.KafkaConfig, Serializers.ByteArray, Serializers.ByteArray) | |
.WithBootstrapServers(settings.KafkaConfig.GetString("bootstrap.servers")); | |
consumerSettings = ConsumerSettings<byte[], byte[]> | |
.Create(settings.KafkaConfig, Deserializers.ByteArray, Deserializers.ByteArray) | |
.WithBootstrapServers(settings.KafkaConfig.GetString("bootstrap.servers")) | |
.WithEofMessages(true); | |
materializer = Context.Materializer(); | |
producer = producerSettings.CreateKafkaProducer(); | |
topicMapper = settings.EventTopicMapper; | |
} | |
protected override void PostStop() | |
{ | |
producer.Flush(); | |
producer.Dispose(); | |
materializer.Dispose(); | |
base.PostStop(); | |
} | |
public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, | |
long toSequenceNr, long max, | |
Action<IPersistentRepresentation> recoveryCallback) | |
{ | |
(TopicPartition topicPartition, byte[] messageKey) = | |
topicMapper.GetTopicPartitionForPersistenceId(persistenceId); | |
//create a consumer source, and call recoveryCallback for each object with correct persistenceId | |
var cSettings = consumerSettings.WithGroupId("JournalRead" + Guid.NewGuid()); | |
var subscription = | |
Subscriptions.AssignmentWithOffset(new TopicPartitionOffset(topicPartition, fromSequenceNr - 1)); | |
long i = 0; | |
var source = KafkaConsumer.PlainSource(cSettings, subscription) | |
.TakeWhile(x => x.MessageType == MessageType.ConsumerRecord && x.Record.Offset < toSequenceNr && ++i <= max) | |
.Where(x => x.MessageType == MessageType.ConsumerRecord && x.Record.Key.SequenceEqual(messageKey)); | |
return source.RunForeach(msg => { recoveryCallback(PersistentFromBytes(msg)); }, materializer); | |
} | |
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) | |
{ | |
//is this ok? | |
return Task.FromResult(long.MaxValue); | |
} | |
protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages) | |
{ | |
var writeTasks = new List<Task>(); | |
foreach (AtomicWrite atomicWrite in messages) | |
{ | |
var atomicWriteTasks = new List<Task>(); | |
(TopicPartition topicPartition, byte[] messageKey) = | |
topicMapper.GetTopicPartitionForPersistenceId(atomicWrite.PersistenceId); | |
var persistentMessages = (IImmutableList<IPersistentRepresentation>) atomicWrite.Payload; | |
foreach (IPersistentRepresentation message in persistentMessages) | |
{ | |
var @event = message.Payload; | |
var eventType = @event.GetType(); | |
var clrEventType = string.Concat(eventType.FullName, ", ", | |
eventType.GetTypeInfo().Assembly.GetName().Name); | |
var serializer = settings.Serialization.FindSerializerForType(eventType); | |
var msg = new Message<byte[], byte[]> | |
{ | |
Timestamp = Timestamp.Default, | |
Key = messageKey, | |
Value = serializer.ToBinary(@event), | |
Headers = new Headers | |
{ | |
{"clrType", Encoding.UTF8.GetBytes(clrEventType)}, | |
{"persistenceId", Encoding.UTF8.GetBytes(atomicWrite.PersistenceId)}, | |
} | |
}; | |
atomicWriteTasks.Add(producer.ProduceAsync(topicPartition, msg)); | |
} | |
writeTasks.Add(Task.WhenAll(atomicWriteTasks)); | |
} | |
var result = await Task.WhenAll(writeTasks).ContinueWith(_ => | |
writeTasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList() as | |
IImmutableList<Exception>); | |
return result; | |
} | |
private IPersistentRepresentation PersistentFromBytes(ConsumerMessage<byte[], byte[]> message) | |
{ | |
var headers = message.Record.Headers.ToDictionary(x => x.Key, x => x.Value); | |
var persistenceId = Encoding.UTF8.GetString(headers["persistenceId"]); | |
var eventTypeString = Encoding.UTF8.GetString(headers["clrType"]); | |
var eventType = Type.GetType(eventTypeString, true, true); | |
var serializer = settings.Serialization.FindSerializerForType(eventType); | |
var @event = serializer.FromBinary(message.Record.Value, eventType); | |
return new Persistent(@event, persistenceId: persistenceId, sequenceNr: message.Record.Offset + 1); | |
} | |
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) | |
{ | |
throw new NotImplementedException("Kafka can't delete messages; it is an append only log"); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Akka.Serialization; | |
using Akka.Configuration; | |
namespace Akka.Persistence.KafkaRocks | |
{ | |
public class KafkaRocksJournalSettings | |
{ | |
public KafkaRocksJournalSettings( | |
string eventTopicMapperClassname, | |
Config kafkaConfig, | |
Akka.Serialization.Serialization serialization) | |
{ | |
KafkaConfig = kafkaConfig; | |
EventTopicMapper = (IEventTopicMapper) Activator.CreateInstance(Type.GetType(eventTopicMapperClassname) ?? throw new InvalidOperationException()); | |
Serialization = serialization; | |
} | |
public IEventTopicMapper EventTopicMapper { get; } | |
public Config KafkaConfig { get; } | |
public Akka.Serialization.Serialization Serialization { get; } | |
public static KafkaRocksJournalSettings Create(Config config, Akka.Serialization.Serialization serialization) | |
{ | |
if (config == null) | |
throw new ArgumentNullException(nameof(config)); | |
return new KafkaRocksJournalSettings( | |
eventTopicMapperClassname: config.GetString("eventTopicMapperClassname"), | |
kafkaConfig: config.GetConfig("kafka"), | |
serialization: serialization); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment