Skip to content

Instantly share code, notes, and snippets.

@AndreSteenbergen
Created February 27, 2019 23:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AndreSteenbergen/58fd2a232f47b97b666f462293523a78 to your computer and use it in GitHub Desktop.
Save AndreSteenbergen/58fd2a232f47b97b666f462293523a78 to your computer and use it in GitHub Desktop.
Kafka journal
public interface IEventTopicMapper
{
/// <summary>
/// Determine the kafka topic and partion, as well as the key for this particular topic persistenceId
/// </summary>
/// <param name="persistenceId"></param>
/// <returns></returns>
Tuple<TopicPartition, byte[]> GetTopicPartitionForPersistenceId(string persistenceId);
}
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using System.Xml.Schema;
using Akka.Actor;
using Akka.Persistence.Journal;
using Akka.Serialization;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Confluent.Kafka;
using Confluent.Kafka.Serdes;
namespace Akka.Persistence.KafkaRocks.Journal
{
public class KafkaJournal : AsyncWriteJournal
{
private readonly KafkaRocksJournalSettings settings;
private IProducer<byte[], byte[]> producer;
private IEventTopicMapper topicMapper;
private ConsumerSettings<byte[], byte[]> consumerSettings;
private ActorMaterializer materializer;
public KafkaJournal()
{
settings = KafkaRocksPersistence.Get(Context.System).JournalSettings;
}
protected override void PreStart()
{
base.PreStart();
var producerSettings = ProducerSettings<byte[], byte[]>
.Create(settings.KafkaConfig, Serializers.ByteArray, Serializers.ByteArray)
.WithBootstrapServers(settings.KafkaConfig.GetString("bootstrap.servers"));
consumerSettings = ConsumerSettings<byte[], byte[]>
.Create(settings.KafkaConfig, Deserializers.ByteArray, Deserializers.ByteArray)
.WithBootstrapServers(settings.KafkaConfig.GetString("bootstrap.servers"))
.WithEofMessages(true);
materializer = Context.Materializer();
producer = producerSettings.CreateKafkaProducer();
topicMapper = settings.EventTopicMapper;
}
protected override void PostStop()
{
producer.Flush();
producer.Dispose();
materializer.Dispose();
base.PostStop();
}
public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr,
long toSequenceNr, long max,
Action<IPersistentRepresentation> recoveryCallback)
{
(TopicPartition topicPartition, byte[] messageKey) =
topicMapper.GetTopicPartitionForPersistenceId(persistenceId);
//create a consumer source, and call recoveryCallback for each object with correct persistenceId
var cSettings = consumerSettings.WithGroupId("JournalRead" + Guid.NewGuid());
var subscription =
Subscriptions.AssignmentWithOffset(new TopicPartitionOffset(topicPartition, fromSequenceNr - 1));
long i = 0;
var source = KafkaConsumer.PlainSource(cSettings, subscription)
.TakeWhile(x => x.MessageType == MessageType.ConsumerRecord && x.Record.Offset < toSequenceNr && ++i <= max)
.Where(x => x.MessageType == MessageType.ConsumerRecord && x.Record.Key.SequenceEqual(messageKey));
return source.RunForeach(msg => { recoveryCallback(PersistentFromBytes(msg)); }, materializer);
}
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
{
//is this ok?
return Task.FromResult(long.MaxValue);
}
protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
{
var writeTasks = new List<Task>();
foreach (AtomicWrite atomicWrite in messages)
{
var atomicWriteTasks = new List<Task>();
(TopicPartition topicPartition, byte[] messageKey) =
topicMapper.GetTopicPartitionForPersistenceId(atomicWrite.PersistenceId);
var persistentMessages = (IImmutableList<IPersistentRepresentation>) atomicWrite.Payload;
foreach (IPersistentRepresentation message in persistentMessages)
{
var @event = message.Payload;
var eventType = @event.GetType();
var clrEventType = string.Concat(eventType.FullName, ", ",
eventType.GetTypeInfo().Assembly.GetName().Name);
var serializer = settings.Serialization.FindSerializerForType(eventType);
var msg = new Message<byte[], byte[]>
{
Timestamp = Timestamp.Default,
Key = messageKey,
Value = serializer.ToBinary(@event),
Headers = new Headers
{
{"clrType", Encoding.UTF8.GetBytes(clrEventType)},
{"persistenceId", Encoding.UTF8.GetBytes(atomicWrite.PersistenceId)},
}
};
atomicWriteTasks.Add(producer.ProduceAsync(topicPartition, msg));
}
writeTasks.Add(Task.WhenAll(atomicWriteTasks));
}
var result = await Task.WhenAll(writeTasks).ContinueWith(_ =>
writeTasks.Select(t => t.IsFaulted ? TryUnwrapException(t.Exception) : null).ToImmutableList() as
IImmutableList<Exception>);
return result;
}
private IPersistentRepresentation PersistentFromBytes(ConsumerMessage<byte[], byte[]> message)
{
var headers = message.Record.Headers.ToDictionary(x => x.Key, x => x.Value);
var persistenceId = Encoding.UTF8.GetString(headers["persistenceId"]);
var eventTypeString = Encoding.UTF8.GetString(headers["clrType"]);
var eventType = Type.GetType(eventTypeString, true, true);
var serializer = settings.Serialization.FindSerializerForType(eventType);
var @event = serializer.FromBinary(message.Record.Value, eventType);
return new Persistent(@event, persistenceId: persistenceId, sequenceNr: message.Record.Offset + 1);
}
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
{
throw new NotImplementedException("Kafka can't delete messages; it is an append only log");
}
}
}
using System;
using Akka.Serialization;
using Akka.Configuration;
namespace Akka.Persistence.KafkaRocks
{
public class KafkaRocksJournalSettings
{
public KafkaRocksJournalSettings(
string eventTopicMapperClassname,
Config kafkaConfig,
Akka.Serialization.Serialization serialization)
{
KafkaConfig = kafkaConfig;
EventTopicMapper = (IEventTopicMapper) Activator.CreateInstance(Type.GetType(eventTopicMapperClassname) ?? throw new InvalidOperationException());
Serialization = serialization;
}
public IEventTopicMapper EventTopicMapper { get; }
public Config KafkaConfig { get; }
public Akka.Serialization.Serialization Serialization { get; }
public static KafkaRocksJournalSettings Create(Config config, Akka.Serialization.Serialization serialization)
{
if (config == null)
throw new ArgumentNullException(nameof(config));
return new KafkaRocksJournalSettings(
eventTopicMapperClassname: config.GetString("eventTopicMapperClassname"),
kafkaConfig: config.GetConfig("kafka"),
serialization: serialization);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment