Reading from AMQP 1.0 queue (Apache QPid CPP) with Apache Beam, and performing some processing, than writing out to Apache Kafka
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<Message> output = pipeline.apply(AmqpIO.read()
.withAddresses(Collections.singletonList("amqp://172.17.0.2:5672/myQueue")));
PCollection<String> words = output.apply(ParDo.of(new UppercaseFn()));
words.apply(KafkaIO.<String, String>write().withBootstrapServers("172.17.0.4:9092")
.withTopic("beam.results").withValueSerializer(StringSerializer.class)
.withKeySerializer(StringSerializer.class).values());
pipeline.run();