Skip to content

Instantly share code, notes, and snippets.

@artembilan
Created November 4, 2014 13:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save artembilan/9bd32f5e899ef293b6c2 to your computer and use it in GitHub Desktop.
Save artembilan/9bd32f5e899ef293b6c2 to your computer and use it in GitHub Desktop.
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.test.reactor;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import reactor.core.Environment;
import reactor.rx.Streams;
import reactor.rx.stream.HotStream;
/**
* @author Artem Bilan
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class ReactorTests {
@Autowired
@Qualifier("reactorStreamFlow.input")
private MessageChannel reactorStreamFlowChannel;
@Autowired
@Qualifier("reactorStreamResult")
private PollableChannel reactorStreamResult;
@Autowired
@Qualifier("reactorPromiseFlow.input")
private MessageChannel reactorPromiseFlowChannel;
@Autowired
@Qualifier("reactorPromiseResult")
private PollableChannel reactorPromiseResult;
@Test
public void testReactorStreamFlow() {
List<Integer> payload = new ArrayList<>(10);
for (int i = 1; i <= 10; i++) {
payload.add(i);
}
this.reactorStreamFlowChannel.send(new GenericMessage<>(payload));
Message<?> receive = this.reactorStreamResult.receive(5000);
assertNotNull(receive);
assertThat(receive.getPayload(), instanceOf(List.class));
List<?> results = (List<?>) receive.getPayload();
for (int i = 1; i <= 10; i++) {
Object actual = results.get(i - 1);
assertEquals("S" + (i * 2), actual);
}
}
@Test
public void testReactorPromiseFlow() throws InterruptedException {
List<Integer> payload = new ArrayList<>(10);
for (int i = 1; i <= 10; i++) {
payload.add(i);
}
this.reactorPromiseFlowChannel.send(new GenericMessage<>(payload));
Message<?> receive = this.reactorPromiseResult.receive(5000);
assertNotNull(receive);
assertThat(receive.getPayload(), instanceOf(List.class));
List<?> results = (List<?>) receive.getPayload();
for (int i = 1; i <= 10; i++) {
Object actual = results.get(i - 1);
assertEquals("P" + (i * 3), actual);
}
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public Environment reactorEnv() {
return new Environment();
}
@Bean
public MessagingTemplate messagingTemplate() {
return new MessagingTemplate();
}
@Bean
public PollableChannel reactorStreamResult() {
return new QueueChannel();
}
@Bean
public HotStream<Object> reactorStream() {
HotStream<Object> stream = Streams.defer(reactorEnv());
stream.<List<String>>flatMap(data -> Streams.defer((Iterable<?>) data)
.map(v -> "S" + ((int) v * 2)).buffer())
.consume(v -> messagingTemplate().convertAndSend(reactorStreamResult(), v));
return stream;
}
@Bean
public IntegrationFlow reactorStreamFlow() {
return f -> f.handle(m -> reactorStream().broadcastNext(m.getPayload()));
}
@Bean
public IntegrationFlow reactorPromiseFlow() {
return f -> f
.handle((p, h) -> Streams.just(p)
.dispatchOn(reactorEnv())
.<Integer>split()
.<String>parallel(10, s -> s.map(v -> "P" + (v * 3)))
.toList()
.poll(3, TimeUnit.SECONDS))
.channel(c -> c.queue("reactorPromiseResult"));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment