Last active
January 3, 2017 16:18
-
-
Save seratch/dd95aa2e367a7df0fd9837cd9e0a8858 to your computer and use it in GitHub Desktop.
scalikejdbc-streams alpha
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
// How to run: `sbt run` | |
object ReactiveStreamsExample extends App { | |
import scalikejdbc._ | |
import org.slf4j._ | |
private val logger = LoggerFactory.getLogger("app") | |
Class.forName("org.h2.Driver") | |
ConnectionPool.singleton("jdbc:h2:file:./db/hello", "user", "pass") | |
try { | |
DB.readOnly { implicit s => | |
sql"select count(1) from members".toMap.single.apply() | |
} | |
} catch { case e: Exception => | |
DB.autoCommit { implicit s => | |
sql""" | |
create table members ( | |
id serial not null primary key, | |
name varchar(64), | |
created_at timestamp not null | |
) | |
""".execute.apply() | |
(1 to 100).foreach { i => | |
val name = s"Member ${i}" | |
sql"insert into members (name, created_at) values (${name}, current_timestamp)".update.apply() | |
} | |
} | |
} | |
import scalikejdbc.streams._ | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent._ | |
import scala.concurrent.duration.Duration | |
import java.util.concurrent.Executors | |
import java.util.concurrent.atomic.AtomicLong | |
import org.reactivestreams.example.unicast.{ AsyncSubscriber, SyncSubscriber } | |
val publisherExecutor = Executors.newFixedThreadPool(3) | |
val publisher: DatabasePublisher[(Int, String)] = { | |
implicit val publisherExecutionContext = ExecutionContext.fromExecutor(publisherExecutor) | |
DB.readOnlyStream { | |
sql"select id, name from members" | |
.map { rs => (rs.get[Int]("id"), rs.get[String]("name")) } | |
.iterator | |
} | |
} | |
val syncSubscriber = new SyncSubscriber[(Int, String)] { | |
def foreach(element: (Int, String)): Boolean = { | |
logger.info(s"threadId: ${Thread.currentThread.getId}, element: ${element}") | |
true | |
} | |
} | |
publisher.subscribe(syncSubscriber) | |
val expectedStreamSize = 10L | |
val asyncConsumedCount = Promise[Long]() | |
val asyncSubscriber: AsyncSubscriber[(Int, String)] = { | |
val subscriberExecutor = Executors.newFixedThreadPool(3) | |
new AsyncSubscriber[(Int, String)](subscriberExecutor) { | |
private[this] val consumedCount = new AtomicLong(0) | |
def whenNext(element: (Int, String)): Boolean = { | |
val count = consumedCount.incrementAndGet() | |
logger.info(s"threadId: ${Thread.currentThread.getId}, element: ${element}, count: ${count}") | |
val needMore = count < expectedStreamSize | |
if (needMore == false) { | |
asyncConsumedCount.trySuccess(consumedCount.get) | |
subscriberExecutor.shutdownNow() | |
} | |
needMore | |
} | |
} | |
} | |
publisher.subscribe(asyncSubscriber) | |
val clientExecutor = Executors.newFixedThreadPool(3) | |
implicit val mainEC = ExecutionContext.fromExecutor(clientExecutor) | |
val f = asyncConsumedCount.future.andThen { case count => | |
println(s"Result - received elements before cancellation: $count, expected: $expectedStreamSize") | |
clientExecutor.shutdownNow() | |
publisherExecutor.shutdownNow() | |
} | |
Await.result(f, Duration.Inf) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scalaVersion := "2.12.1" | |
resolvers += "sonatype releases" at "https://oss.sonatype.org/content/repositories/releases" | |
libraryDependencies ++= Seq( | |
"org.scalikejdbc" %% "scalikejdbc" % "3.0.0-M3", | |
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.0.0-M3", | |
"org.reactivestreams" % "reactive-streams-examples" % "1.0.0", | |
"com.h2database" % "h2" % "1.4.193", | |
"org.slf4j" % "slf4j-simple" % "1.7.22" | |
) |
Author
seratch
commented
Jan 3, 2017
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment