Skip to content

Instantly share code, notes, and snippets.

@mp911de
Created June 20, 2019 21:42
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mp911de/f300d9a55768eb8a7d845eb96e068f3f to your computer and use it in GitHub Desktop.
Save mp911de/f300d9a55768eb8a7d845eb96e068f3f to your computer and use it in GitHub Desktop.
R2DBC with Akka Streams
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>biz.paluch.r2dbc</groupId>
<artifactId>scala-r2dbc-example</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<repositories>
<repository>
<id>spring-milestone</id>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-bom</artifactId>
<version>Arabba-M8</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- scala-maven-plugin determines the Scala version to use from this dependency -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.12</artifactId>
<version>2.5.23</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-client</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Configure maven-compiler-plugin to use the desired Java version -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Use build-helper-maven-plugin to add Scala source and test source directories -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<!-- Use scala-maven-plugin for Scala support -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<!-- Need to specify this explicitly, otherwise plugin won't be called when doing e.g. mvn compile -->
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- scala assembly-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import io.r2dbc.client.R2dbc
import io.r2dbc.spi.ConnectionFactories
import scala.concurrent.Await
import scala.concurrent.duration._
/**
* @author Mark Paluch
*/
object R2dbcWithAkkaStreams extends App {
implicit val system = ActorSystem("ExampleSystem")
implicit val mat = ActorMaterializer()
val source: Source[Int, NotUsed] = Source(1 to 100)
val cf = ConnectionFactories.get("r2dbc:h2:mem:///akka-test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE")
val r2dbc = new R2dbc(cf)
// A bit of setup
r2dbc.withHandle(handle => handle.execute("CREATE TABLE counter (id INTEGER PRIMARY KEY, desc VARCHAR(255))")).blockLast()
println("Insert 100 rows into counter")
val insertRows = source.flatMapConcat(it => {
Source.fromPublisher[Int](r2dbc.withHandle(handle => {
handle.createUpdate("INSERT INTO counter (id, desc) VALUES(?1, ?2)")
.bind("?1", it)
.bind("$2", "desc " + it.toString)
.execute()
.map(_.toInt)
}))
}).toMat(Sink.fold[Int, Int](0)(_ + _))(Keep.right).run()
Await.result(insertRows, 60 seconds)
println("Insert count:")
insertRows.value.foreach(println(_))
println()
println("SELECT desc FROM counter:")
val selectAll = Source.fromPublisher[String](r2dbc.withHandle(handle => {
handle.select("SELECT desc FROM counter")
.mapRow(row => row.get("desc", classOf[String]))
}))
.runForeach(println(_))
Await.result(selectAll, 60 seconds)
mat.shutdown()
system.terminate()
}
@mp911de
Copy link
Author

mp911de commented Jun 20, 2019

Prints:

Insert 100 rows into counter
Insert count:
Success(100)

SELECT desc FROM counter:
desc 1
desc 2
desc 3
desc 4
desc 5
desc 6
desc 7
desc 8
desc 9
desc 10
desc 11
desc 12
desc 13
desc 14
desc 15
desc 16
desc 17
desc 18
desc 19
desc 20
desc 21
desc 22
desc 23
desc 24
desc 25
desc 26
desc 27
desc 28
desc 29
desc 30
desc 31
desc 32
desc 33
desc 34
desc 35
desc 36
desc 37
desc 38
desc 39
desc 40
desc 41
desc 42
desc 43
desc 44
desc 45
desc 46
desc 47
desc 48
desc 49
desc 50
desc 51
desc 52
desc 53
desc 54
desc 55
desc 56
desc 57
desc 58
desc 59
desc 60
desc 61
desc 62
desc 63
desc 64
desc 65
desc 66
desc 67
desc 68
desc 69
desc 70
desc 71
desc 72
desc 73
desc 74
desc 75
desc 76
desc 77
desc 78
desc 79
desc 80
desc 81
desc 82
desc 83
desc 84
desc 85
desc 86
desc 87
desc 88
desc 89
desc 90
desc 91
desc 92
desc 93
desc 94
desc 95
desc 96
desc 97
desc 98
desc 99
desc 100

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment