Skip to content

Instantly share code, notes, and snippets.

@matzew
Created April 25, 2018 18:13
Show Gist options
  • Save matzew/d1f15b14e068616dec9696bf470e0302 to your computer and use it in GitHub Desktop.
Save matzew/d1f15b14e068616dec9696bf470e0302 to your computer and use it in GitHub Desktop.

Reading from AMQP 1.0 queue (Apache QPid CPP) with Apache Beam, and performing some processing, than writing out to Apache Kafka

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(PipelineOptions.class);

        Pipeline pipeline = Pipeline.create(options);

        PCollection<Message> output = pipeline.apply(AmqpIO.read()
                .withAddresses(Collections.singletonList("amqp://172.17.0.2:5672/myQueue")));

        PCollection<String> words = output.apply(ParDo.of(new UppercaseFn()));


        words.apply(KafkaIO.<String, String>write().withBootstrapServers("172.17.0.4:9092")
                .withTopic("beam.results").withValueSerializer(StringSerializer.class)
                .withKeySerializer(StringSerializer.class).values());

        pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment