Skip to content

Instantly share code, notes, and snippets.

@flyaruu
Last active May 9, 2019 13:35
Show Gist options
  • Save flyaruu/239a950ffda6f3fc544faffffc866f12 to your computer and use it in GitHub Desktop.
Save flyaruu/239a950ffda6f3fc544faffffc866f12 to your computer and use it in GitHub Desktop.
import java.time.Duration;
import org.junit.Test;
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.spi.Connection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TestIssue {
@Test
public void testQuery() {
PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration.builder()
.database("dvdrental")
.host("localhost")
.password("mysecretpassword")
.username("postgres")
.build();
PostgresqlConnectionFactory connectionFactory = new PostgresqlConnectionFactory(configuration);
ConnectionPoolConfiguration poolConfiguration = ConnectionPoolConfiguration.builder(connectionFactory)
.validationQuery("SELECT 1")
.maxIdleTime(Duration.ofSeconds(10))
.maxSize(2)
.build();
ConnectionPool pool = new ConnectionPool(poolConfiguration);
getConnection(pool)
.flatMapMany(connection->connection.createStatement("select title t,film_id f,description d from film where film_id>100 limit 35").execute())
.flatMap(e->e.map((row,rowmeta)->{
String title = row.get(0,String.class);
Integer filmId = row.get(1,Integer.class);
String description = row.get(2,String.class);
return filmDetails(pool,filmId,title,description);
}),1)
.flatMap(e->e,1)
.toIterable().forEach(System.err::println);
}
private Flux<Short> actorsFromFilm(ConnectionPool pool,Short filmId) {
return getConnection(pool)
.flatMapMany(connection->connection.createStatement("select actor_id from film_actor where film_id = $1").bind(0, filmId).execute())
.concatMap(e->e.map((row,rowmeta)->row.get("actor_id",Short.class)),1);
}
private Flux<String> filmDetails(ConnectionPool pool,int filmId, String title, String description) {
return Flux.just("Film with title: "+title+" and id: " +filmId)
.concatWith(Flux.just(" -> Description: "+description))
.concatWith(actorsFromFilm(pool,(short)filmId).map(e->" -> Actor with id: "+e))
.concatWith(Flux.just("End of film with title: "+title));
}
private Mono<Connection> getConnection(ConnectionPool pool) {
return pool.create()
.doAfterSuccessOrError(this::disposeConnection);
}
private void disposeConnection(Connection connection, Throwable ex) {
if(connection!=null) {
Mono.from(connection.close())
.subscribe();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment