Skip to content

Instantly share code, notes, and snippets.

@AndreSteenbergen
Last active September 29, 2018 22:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AndreSteenbergen/700a25c862e291486c1e351b588f79ff to your computer and use it in GitHub Desktop.
Save AndreSteenbergen/700a25c862e291486c1e351b588f79ff to your computer and use it in GitHub Desktop.
Migrating kafka topics from source to sink
using Akka;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Kafka.Dsl;
using Akka.Streams.Kafka.Messages;
using Akka.Streams.Kafka.Settings;
using Akka.Util;
using Confluent.Kafka;
using Es.Interfaces;
using KafkaUtils;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace JsonToMsgPackMigrator
{
public static class HoconLoader
{
public static Config FromFile(string path)
{
return ConfigurationFactory.ParseString(File.ReadAllText(path));
}
}
class Program
{
static readonly Dictionary<TopicPartition, Offset> topicPartitionOffsets = new Dictionary<TopicPartition, Offset>();
static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, ev) =>
{
Console.WriteLine("Ctrl+C pressed");
ev.Cancel = true;
cts.Cancel();
};
var config = HoconLoader.FromFile(args.FirstOrDefault() ?? "./config.hocon");
var system = ActorSystem.Create("migrator");
var mat = ActorMaterializer.Create(system);
var serializationSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All };
var guidSerializer = new GuidSerializer();
var eventJsonSerializer = new IEventSerializer<Guid>(serializationSettings);
var eventMsgPackSerializer = new IEventSerializerMsgPack<Guid>();
var kafkaSrcCfg = config.GetConfig("akka.kafka-source");
var consumerConfig = kafkaSrcCfg.GetConfig("consumer");
var kafkaPrdCfg = config.GetConfig("akka.kafka-target");
var producerConfig = kafkaPrdCfg.GetConfig("producer");
var producerSettings = ProducerSettings<Guid, IEvent<Guid>>.Create(producerConfig, guidSerializer, eventMsgPackSerializer)
.WithBootstrapServers(kafkaPrdCfg.GetString("bootstrap.servers"));
var producer = producerSettings.CreateKafkaProducer();
while (!cts.Token.IsCancellationRequested)
{
//get metadata
var settings = ConsumerSettings<Null, Null>.Create(consumerConfig, null, null)
.WithBootstrapServers(kafkaSrcCfg.GetString("bootstrap.servers"));
List<TopicMetadata> topicToProcess = null;
using (var adminClient = new AdminClient(settings.Properties))
{
var metadata = adminClient.GetMetadata(true, TimeSpan.FromSeconds(10));
topicToProcess = metadata.Topics.Where(t => t.Topic.StartsWith("Webpages-") && !t.Topic.Contains("additional")).ToList();
}
Console.WriteLine($"Starting to process: {topicToProcess.Count} topics");
foreach (var topic in topicToProcess)
{
if (!Guid.TryParse(topic.Topic.Split("-", 2)[1], out Guid websiteId))
{
continue;
}
Console.WriteLine("Handling: " + topic.Topic);
if (cts.Token.IsCancellationRequested)
{
break;
}
try
{
var targetTopic = topic.Topic;
#if DEBUG
targetTopic = "DEBUG-" + targetTopic;
#endif
//topicPartitionOffsets geven de start aan, ivm opnieuw starten van lus
var tpos = topic.Partitions.Select(p =>
{
TopicPartition tp = new TopicPartition(topic.Topic, p.PartitionId);
if (!topicPartitionOffsets.TryGetValue(tp, out Offset offset))
{
offset = Offset.Beginning;
}
return new TopicPartitionOffset(tp, offset);
}).ToArray();
var subscription = Subscriptions.AssignmentWithOffset(tpos);
var consumerSettings = ConsumerSettings<Guid, IEvent<Guid>>.Create(consumerConfig, guidSerializer, eventJsonSerializer)
.WithBootstrapServers(kafkaSrcCfg.GetString("bootstrap.servers"))
.WithGroupId("jsonToMsgPackConvertor");
var eofsWaiting = new HashSet<TopicPartition>(tpos.Select(x => x.TopicPartition).ToList());
var source = KafkaConsumer.PlainSource(consumerSettings, subscription)
.Where(x =>
{
var shouldReturn = x.MessageType != MessageType.ConsumerRecord
|| (x.Record.Value != null && eventMsgPackSerializer.Serialize("", x.Record.Value) != null);
if (shouldReturn && x.MessageType == MessageType.ConsumerRecord) Console.Write(".");
return shouldReturn;
}).TakeWhile(x =>
{
if (x.MessageType == MessageType.Eof) eofsWaiting.Remove(x.Record.TopicPartition);
return eofsWaiting.Count > 0;
});
IGraph<ClosedShape, Task> graph = CreateGraphForTopic(topic, targetTopic, source, KafkaProducer.PlainSink(producerSettings, producer));
Console.WriteLine("Running graph: " + topic.Topic);
var sinkTask = mat.Materialize(graph);
sinkTask.Wait();
Console.WriteLine($"Done with topic: {topic.Topic}\n");
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}
cts.Dispose();
var rst = producer.Flush(producerSettings.FlushTimeout);
Console.WriteLine("Producer flushed with resultcode: " + rst);
producer.Dispose();
Debugger.Break();
system.Terminate();
system.WhenTerminated.Wait();
system.Dispose();
}
private static IGraph<ClosedShape, Task> CreateGraphForTopic<TMat>(
TopicMetadata topic,
string targetTopic,
Source<ConsumerMessage<Guid, IEvent<Guid>>, TMat> source,
Sink<MessageAndMeta<Guid, IEvent<Guid>>, Task> webpageSink)
{
Console.WriteLine("Creating Graph: " + topic.Topic);
int partitionCnt = topic.Partitions.Max(x => x.PartitionId) + 1;
return GraphDsl.Create(webpageSink, (builder, sink) =>
{
var src = builder.Add(source);
var splitMessageStream = builder.Add(new PartitionWith<ConsumerMessage<Guid, IEvent<Guid>>, ConsumerMessage<Guid, IEvent<Guid>>, TopicPartitionOffset>(PartitionOnEventType));
builder.From(src.Outlet).To(splitMessageStream.In);
var eofSink = builder.Add(Sink.Ignore<TopicPartitionOffset>());
builder.From(splitMessageStream.Out1).To(eofSink.Inlet);
var partitioner = builder.Add(new Partition<ConsumerMessage<Guid, IEvent<Guid>>>(partitionCnt, x => x.Record.Partition.Value));
builder.From(splitMessageStream.Out0).To(partitioner.In);
var currentSetOutlets = partitioner.Outlets.Cast<Outlet<ConsumerMessage<Guid, IEvent<Guid>>>>().ToList();
var bufferedOutlets = new List<Outlet<ConsumerMessage<Guid, IEvent<Guid>>>>();
//create buffered flows for all outlets
var f = Flow.Create<ConsumerMessage<Guid, IEvent<Guid>>>().Select(x => x).Buffer(1000000, OverflowStrategy.Backpressure);
foreach (var outletToBuffer in currentSetOutlets)
{
var of = builder.Add(f);
builder.From(outletToBuffer).To(of.Inlet);
bufferedOutlets.Add(of.Outlet);
}
var sortedOutlet = MergeSortedOutlets(builder, bufferedOutlets, CompareConsumerMessages);
var sinkPrepare = builder.Add(CreateMetaMessageFlow<Guid, IEvent<Guid>>(targetTopic));
builder.From(sortedOutlet).To(sinkPrepare.Inlet);
builder.From(sinkPrepare.Outlet).To(sink.Inlet);
return ClosedShape.Instance;
});
}
private static Flow<ConsumerMessage<K, V>, MessageAndMeta<K, V>, NotUsed> CreateMetaMessageFlow<K, V>(string topic)
{
return Flow.Create<ConsumerMessage<K, V>>().Select(x =>
{
Console.Write(".");
topicPartitionOffsets[x.Record.TopicPartition] = x.Record.Offset + 1;
return new MessageAndMeta<K, V>
{
Message = new Message<K, V>
{
Key = x.Record.Key,
Value = x.Record.Value,
Timestamp = x.Record.Timestamp
},
Topic = topic,
Partition = Partition.Any
};
});
}
private static Outlet<T> MergeSortedOutlets<S, T>(GraphDsl.Builder<S> builder, List<Outlet<T>> currentSetOutlets, Func<T, T, int> fn)
{
while (currentSetOutlets.Count > 1)
{
var newOutlets = new List<Outlet<T>>();
for (int i = 0; i < currentSetOutlets.Count; i += 2)
{
if (i + 1 == currentSetOutlets.Count)
{
newOutlets.Add(currentSetOutlets[i]);
}
else
{
var merger = builder.Add(new MergeSorted<T>(fn));
builder.From(currentSetOutlets[i]).To(merger.In0);
builder.From(currentSetOutlets[i + 1]).To(merger.In1);
newOutlets.Add(merger.Out);
}
}
currentSetOutlets = newOutlets;
}
return currentSetOutlets[0];
}
private static Either<ConsumerMessage<Guid, IEvent<Guid>>, TopicPartitionOffset> PartitionOnEventType(ConsumerMessage<Guid, IEvent<Guid>> arg)
{
if (arg.MessageType == MessageType.ConsumerRecord)
{
return new Left<ConsumerMessage<Guid, IEvent<Guid>>>(arg);
}
else
{
if (arg.MessageType == MessageType.Eof)
{
return new Right<TopicPartitionOffset>(arg.Record.TopicPartitionOffset);
}
return new Right<TopicPartitionOffset>(default(TopicPartitionOffset));
}
}
private static int CompareConsumerMessages(
ConsumerMessage<Guid, IEvent<Guid>> arg1,
ConsumerMessage<Guid, IEvent<Guid>> arg2)
{
//-1 als arg1 moet winnen
if (arg1.MessageType == MessageType.Eof && arg2.MessageType == MessageType.Eof)
{
if (arg1.Record.Partition < arg2.Record.Partition) return -1;
return 1;
}
if (arg2.MessageType == MessageType.Eof) return -1;
if (arg1.MessageType == MessageType.Eof) return 1;
//alle andere gevallen
if (arg1.Record.Value.Raised < arg2.Record.Value.Raised)
{
return -1;
}
return 1;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment