Last active
September 29, 2018 22:06
-
-
Save AndreSteenbergen/700a25c862e291486c1e351b588f79ff to your computer and use it in GitHub Desktop.
Migrating kafka topics from source to sink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Akka; | |
using Akka.Actor; | |
using Akka.Configuration; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
using Akka.Streams.Kafka.Dsl; | |
using Akka.Streams.Kafka.Messages; | |
using Akka.Streams.Kafka.Settings; | |
using Akka.Util; | |
using Confluent.Kafka; | |
using Es.Interfaces; | |
using KafkaUtils; | |
using Newtonsoft.Json; | |
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace JsonToMsgPackMigrator | |
{ | |
public static class HoconLoader | |
{ | |
public static Config FromFile(string path) | |
{ | |
return ConfigurationFactory.ParseString(File.ReadAllText(path)); | |
} | |
} | |
class Program | |
{ | |
static readonly Dictionary<TopicPartition, Offset> topicPartitionOffsets = new Dictionary<TopicPartition, Offset>(); | |
static void Main(string[] args) | |
{ | |
CancellationTokenSource cts = new CancellationTokenSource(); | |
Console.CancelKeyPress += (s, ev) => | |
{ | |
Console.WriteLine("Ctrl+C pressed"); | |
ev.Cancel = true; | |
cts.Cancel(); | |
}; | |
var config = HoconLoader.FromFile(args.FirstOrDefault() ?? "./config.hocon"); | |
var system = ActorSystem.Create("migrator"); | |
var mat = ActorMaterializer.Create(system); | |
var serializationSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }; | |
var guidSerializer = new GuidSerializer(); | |
var eventJsonSerializer = new IEventSerializer<Guid>(serializationSettings); | |
var eventMsgPackSerializer = new IEventSerializerMsgPack<Guid>(); | |
var kafkaSrcCfg = config.GetConfig("akka.kafka-source"); | |
var consumerConfig = kafkaSrcCfg.GetConfig("consumer"); | |
var kafkaPrdCfg = config.GetConfig("akka.kafka-target"); | |
var producerConfig = kafkaPrdCfg.GetConfig("producer"); | |
var producerSettings = ProducerSettings<Guid, IEvent<Guid>>.Create(producerConfig, guidSerializer, eventMsgPackSerializer) | |
.WithBootstrapServers(kafkaPrdCfg.GetString("bootstrap.servers")); | |
var producer = producerSettings.CreateKafkaProducer(); | |
while (!cts.Token.IsCancellationRequested) | |
{ | |
//get metadata | |
var settings = ConsumerSettings<Null, Null>.Create(consumerConfig, null, null) | |
.WithBootstrapServers(kafkaSrcCfg.GetString("bootstrap.servers")); | |
List<TopicMetadata> topicToProcess = null; | |
using (var adminClient = new AdminClient(settings.Properties)) | |
{ | |
var metadata = adminClient.GetMetadata(true, TimeSpan.FromSeconds(10)); | |
topicToProcess = metadata.Topics.Where(t => t.Topic.StartsWith("Webpages-") && !t.Topic.Contains("additional")).ToList(); | |
} | |
Console.WriteLine($"Starting to process: {topicToProcess.Count} topics"); | |
foreach (var topic in topicToProcess) | |
{ | |
if (!Guid.TryParse(topic.Topic.Split("-", 2)[1], out Guid websiteId)) | |
{ | |
continue; | |
} | |
Console.WriteLine("Handling: " + topic.Topic); | |
if (cts.Token.IsCancellationRequested) | |
{ | |
break; | |
} | |
try | |
{ | |
var targetTopic = topic.Topic; | |
#if DEBUG | |
targetTopic = "DEBUG-" + targetTopic; | |
#endif | |
//topicPartitionOffsets geven de start aan, ivm opnieuw starten van lus | |
var tpos = topic.Partitions.Select(p => | |
{ | |
TopicPartition tp = new TopicPartition(topic.Topic, p.PartitionId); | |
if (!topicPartitionOffsets.TryGetValue(tp, out Offset offset)) | |
{ | |
offset = Offset.Beginning; | |
} | |
return new TopicPartitionOffset(tp, offset); | |
}).ToArray(); | |
var subscription = Subscriptions.AssignmentWithOffset(tpos); | |
var consumerSettings = ConsumerSettings<Guid, IEvent<Guid>>.Create(consumerConfig, guidSerializer, eventJsonSerializer) | |
.WithBootstrapServers(kafkaSrcCfg.GetString("bootstrap.servers")) | |
.WithGroupId("jsonToMsgPackConvertor"); | |
var eofsWaiting = new HashSet<TopicPartition>(tpos.Select(x => x.TopicPartition).ToList()); | |
var source = KafkaConsumer.PlainSource(consumerSettings, subscription) | |
.Where(x => | |
{ | |
var shouldReturn = x.MessageType != MessageType.ConsumerRecord | |
|| (x.Record.Value != null && eventMsgPackSerializer.Serialize("", x.Record.Value) != null); | |
if (shouldReturn && x.MessageType == MessageType.ConsumerRecord) Console.Write("."); | |
return shouldReturn; | |
}).TakeWhile(x => | |
{ | |
if (x.MessageType == MessageType.Eof) eofsWaiting.Remove(x.Record.TopicPartition); | |
return eofsWaiting.Count > 0; | |
}); | |
IGraph<ClosedShape, Task> graph = CreateGraphForTopic(topic, targetTopic, source, KafkaProducer.PlainSink(producerSettings, producer)); | |
Console.WriteLine("Running graph: " + topic.Topic); | |
var sinkTask = mat.Materialize(graph); | |
sinkTask.Wait(); | |
Console.WriteLine($"Done with topic: {topic.Topic}\n"); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
} | |
} | |
} | |
cts.Dispose(); | |
var rst = producer.Flush(producerSettings.FlushTimeout); | |
Console.WriteLine("Producer flushed with resultcode: " + rst); | |
producer.Dispose(); | |
Debugger.Break(); | |
system.Terminate(); | |
system.WhenTerminated.Wait(); | |
system.Dispose(); | |
} | |
private static IGraph<ClosedShape, Task> CreateGraphForTopic<TMat>( | |
TopicMetadata topic, | |
string targetTopic, | |
Source<ConsumerMessage<Guid, IEvent<Guid>>, TMat> source, | |
Sink<MessageAndMeta<Guid, IEvent<Guid>>, Task> webpageSink) | |
{ | |
Console.WriteLine("Creating Graph: " + topic.Topic); | |
int partitionCnt = topic.Partitions.Max(x => x.PartitionId) + 1; | |
return GraphDsl.Create(webpageSink, (builder, sink) => | |
{ | |
var src = builder.Add(source); | |
var splitMessageStream = builder.Add(new PartitionWith<ConsumerMessage<Guid, IEvent<Guid>>, ConsumerMessage<Guid, IEvent<Guid>>, TopicPartitionOffset>(PartitionOnEventType)); | |
builder.From(src.Outlet).To(splitMessageStream.In); | |
var eofSink = builder.Add(Sink.Ignore<TopicPartitionOffset>()); | |
builder.From(splitMessageStream.Out1).To(eofSink.Inlet); | |
var partitioner = builder.Add(new Partition<ConsumerMessage<Guid, IEvent<Guid>>>(partitionCnt, x => x.Record.Partition.Value)); | |
builder.From(splitMessageStream.Out0).To(partitioner.In); | |
var currentSetOutlets = partitioner.Outlets.Cast<Outlet<ConsumerMessage<Guid, IEvent<Guid>>>>().ToList(); | |
var bufferedOutlets = new List<Outlet<ConsumerMessage<Guid, IEvent<Guid>>>>(); | |
//create buffered flows for all outlets | |
var f = Flow.Create<ConsumerMessage<Guid, IEvent<Guid>>>().Select(x => x).Buffer(1000000, OverflowStrategy.Backpressure); | |
foreach (var outletToBuffer in currentSetOutlets) | |
{ | |
var of = builder.Add(f); | |
builder.From(outletToBuffer).To(of.Inlet); | |
bufferedOutlets.Add(of.Outlet); | |
} | |
var sortedOutlet = MergeSortedOutlets(builder, bufferedOutlets, CompareConsumerMessages); | |
var sinkPrepare = builder.Add(CreateMetaMessageFlow<Guid, IEvent<Guid>>(targetTopic)); | |
builder.From(sortedOutlet).To(sinkPrepare.Inlet); | |
builder.From(sinkPrepare.Outlet).To(sink.Inlet); | |
return ClosedShape.Instance; | |
}); | |
} | |
private static Flow<ConsumerMessage<K, V>, MessageAndMeta<K, V>, NotUsed> CreateMetaMessageFlow<K, V>(string topic) | |
{ | |
return Flow.Create<ConsumerMessage<K, V>>().Select(x => | |
{ | |
Console.Write("."); | |
topicPartitionOffsets[x.Record.TopicPartition] = x.Record.Offset + 1; | |
return new MessageAndMeta<K, V> | |
{ | |
Message = new Message<K, V> | |
{ | |
Key = x.Record.Key, | |
Value = x.Record.Value, | |
Timestamp = x.Record.Timestamp | |
}, | |
Topic = topic, | |
Partition = Partition.Any | |
}; | |
}); | |
} | |
private static Outlet<T> MergeSortedOutlets<S, T>(GraphDsl.Builder<S> builder, List<Outlet<T>> currentSetOutlets, Func<T, T, int> fn) | |
{ | |
while (currentSetOutlets.Count > 1) | |
{ | |
var newOutlets = new List<Outlet<T>>(); | |
for (int i = 0; i < currentSetOutlets.Count; i += 2) | |
{ | |
if (i + 1 == currentSetOutlets.Count) | |
{ | |
newOutlets.Add(currentSetOutlets[i]); | |
} | |
else | |
{ | |
var merger = builder.Add(new MergeSorted<T>(fn)); | |
builder.From(currentSetOutlets[i]).To(merger.In0); | |
builder.From(currentSetOutlets[i + 1]).To(merger.In1); | |
newOutlets.Add(merger.Out); | |
} | |
} | |
currentSetOutlets = newOutlets; | |
} | |
return currentSetOutlets[0]; | |
} | |
private static Either<ConsumerMessage<Guid, IEvent<Guid>>, TopicPartitionOffset> PartitionOnEventType(ConsumerMessage<Guid, IEvent<Guid>> arg) | |
{ | |
if (arg.MessageType == MessageType.ConsumerRecord) | |
{ | |
return new Left<ConsumerMessage<Guid, IEvent<Guid>>>(arg); | |
} | |
else | |
{ | |
if (arg.MessageType == MessageType.Eof) | |
{ | |
return new Right<TopicPartitionOffset>(arg.Record.TopicPartitionOffset); | |
} | |
return new Right<TopicPartitionOffset>(default(TopicPartitionOffset)); | |
} | |
} | |
private static int CompareConsumerMessages( | |
ConsumerMessage<Guid, IEvent<Guid>> arg1, | |
ConsumerMessage<Guid, IEvent<Guid>> arg2) | |
{ | |
//-1 als arg1 moet winnen | |
if (arg1.MessageType == MessageType.Eof && arg2.MessageType == MessageType.Eof) | |
{ | |
if (arg1.Record.Partition < arg2.Record.Partition) return -1; | |
return 1; | |
} | |
if (arg2.MessageType == MessageType.Eof) return -1; | |
if (arg1.MessageType == MessageType.Eof) return 1; | |
//alle andere gevallen | |
if (arg1.Record.Value.Raised < arg2.Record.Value.Raised) | |
{ | |
return -1; | |
} | |
return 1; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment