Skip to content

Instantly share code, notes, and snippets.

@seratch
Last active January 3, 2017 16:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seratch/dd95aa2e367a7df0fd9837cd9e0a8858 to your computer and use it in GitHub Desktop.
Save seratch/dd95aa2e367a7df0fd9837cd9e0a8858 to your computer and use it in GitHub Desktop.
scalikejdbc-streams alpha
package example
// How to run: `sbt run`
object ReactiveStreamsExample extends App {
import scalikejdbc._
import org.slf4j._
private val logger = LoggerFactory.getLogger("app")
Class.forName("org.h2.Driver")
ConnectionPool.singleton("jdbc:h2:file:./db/hello", "user", "pass")
try {
DB.readOnly { implicit s =>
sql"select count(1) from members".toMap.single.apply()
}
} catch { case e: Exception =>
DB.autoCommit { implicit s =>
sql"""
create table members (
id serial not null primary key,
name varchar(64),
created_at timestamp not null
)
""".execute.apply()
(1 to 100).foreach { i =>
val name = s"Member ${i}"
sql"insert into members (name, created_at) values (${name}, current_timestamp)".update.apply()
}
}
}
import scalikejdbc.streams._
import scala.concurrent.ExecutionContext
import scala.concurrent._
import scala.concurrent.duration.Duration
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong
import org.reactivestreams.example.unicast.{ AsyncSubscriber, SyncSubscriber }
val publisherExecutor = Executors.newFixedThreadPool(3)
val publisher: DatabasePublisher[(Int, String)] = {
implicit val publisherExecutionContext = ExecutionContext.fromExecutor(publisherExecutor)
DB.readOnlyStream {
sql"select id, name from members"
.map { rs => (rs.get[Int]("id"), rs.get[String]("name")) }
.iterator
}
}
val syncSubscriber = new SyncSubscriber[(Int, String)] {
def foreach(element: (Int, String)): Boolean = {
logger.info(s"threadId: ${Thread.currentThread.getId}, element: ${element}")
true
}
}
publisher.subscribe(syncSubscriber)
val expectedStreamSize = 10L
val asyncConsumedCount = Promise[Long]()
val asyncSubscriber: AsyncSubscriber[(Int, String)] = {
val subscriberExecutor = Executors.newFixedThreadPool(3)
new AsyncSubscriber[(Int, String)](subscriberExecutor) {
private[this] val consumedCount = new AtomicLong(0)
def whenNext(element: (Int, String)): Boolean = {
val count = consumedCount.incrementAndGet()
logger.info(s"threadId: ${Thread.currentThread.getId}, element: ${element}, count: ${count}")
val needMore = count < expectedStreamSize
if (needMore == false) {
asyncConsumedCount.trySuccess(consumedCount.get)
subscriberExecutor.shutdownNow()
}
needMore
}
}
}
publisher.subscribe(asyncSubscriber)
val clientExecutor = Executors.newFixedThreadPool(3)
implicit val mainEC = ExecutionContext.fromExecutor(clientExecutor)
val f = asyncConsumedCount.future.andThen { case count =>
println(s"Result - received elements before cancellation: $count, expected: $expectedStreamSize")
clientExecutor.shutdownNow()
publisherExecutor.shutdownNow()
}
Await.result(f, Duration.Inf)
}
scalaVersion := "2.12.1"
resolvers += "sonatype releases" at "https://oss.sonatype.org/content/repositories/releases"
libraryDependencies ++= Seq(
"org.scalikejdbc" %% "scalikejdbc" % "3.0.0-M3",
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.0.0-M3",
"org.reactivestreams" % "reactive-streams-examples" % "1.0.0",
"com.h2database" % "h2" % "1.4.193",
"org.slf4j" % "slf4j-simple" % "1.7.22"
)
@seratch
Copy link
Author

seratch commented Jan 3, 2017

$ sbt run
[info] Loading global plugins from /Users/kazuhirosera/.sbt/0.13/plugins
[info] Set current project to reactive-streams (in build file:/Users/kazuhirosera/tmp/reactive-streams/)
[info] Compiling 1 Scala source to /Users/kazuhirosera/tmp/reactive-streams/target/scala-2.12/classes...
[info] Running example.ReactiveStreamsExample
[run-main-0] INFO scalikejdbc.streams.DatabasePublisher - Database stream requested by subscriber: example.ReactiveStreamsExample$$anon$2@7d2fdef2 is ready
[run-main-0] INFO scalikejdbc.streams.DatabasePublisher - Database stream requested by subscriber: example.ReactiveStreamsExample$$anon$1@a475d59 is ready
[pool-5-thread-1] INFO app - threadId: 35, element: (1,Member 1)
[pool-5-thread-1] INFO app - threadId: 35, element: (2,Member 2)
[pool-6-thread-2] INFO app - threadId: 38, element: (1,Member 1), count: 1
[pool-5-thread-1] INFO app - threadId: 35, element: (3,Member 3)
[pool-5-thread-1] INFO app - threadId: 35, element: (4,Member 4)
[pool-5-thread-1] INFO app - threadId: 35, element: (5,Member 5)
[pool-5-thread-1] INFO app - threadId: 35, element: (6,Member 6)
[pool-5-thread-1] INFO app - threadId: 35, element: (7,Member 7)
[pool-5-thread-1] INFO app - threadId: 35, element: (8,Member 8)
[pool-5-thread-1] INFO app - threadId: 35, element: (9,Member 9)
[pool-5-thread-1] INFO app - threadId: 35, element: (10,Member 10)
[pool-5-thread-1] INFO app - threadId: 35, element: (11,Member 11)
[pool-5-thread-1] INFO app - threadId: 35, element: (12,Member 12)
[pool-6-thread-3] INFO app - threadId: 40, element: (2,Member 2), count: 2
[pool-5-thread-1] INFO app - threadId: 35, element: (13,Member 13)
[pool-5-thread-1] INFO app - threadId: 35, element: (14,Member 14)
[pool-5-thread-1] INFO app - threadId: 35, element: (15,Member 15)
[pool-5-thread-1] INFO app - threadId: 35, element: (16,Member 16)
[pool-5-thread-1] INFO app - threadId: 35, element: (17,Member 17)
[pool-6-thread-1] INFO app - threadId: 36, element: (3,Member 3), count: 3
[pool-5-thread-1] INFO app - threadId: 35, element: (18,Member 18)
[pool-5-thread-1] INFO app - threadId: 35, element: (19,Member 19)
[pool-5-thread-1] INFO app - threadId: 35, element: (20,Member 20)
[pool-5-thread-1] INFO app - threadId: 35, element: (21,Member 21)
[pool-5-thread-1] INFO app - threadId: 35, element: (22,Member 22)
[pool-6-thread-2] INFO app - threadId: 38, element: (4,Member 4), count: 4
[pool-5-thread-1] INFO app - threadId: 35, element: (23,Member 23)
[pool-5-thread-1] INFO app - threadId: 35, element: (24,Member 24)
[pool-6-thread-3] INFO app - threadId: 40, element: (5,Member 5), count: 5
[pool-5-thread-1] INFO app - threadId: 35, element: (25,Member 25)
[pool-6-thread-1] INFO app - threadId: 36, element: (6,Member 6), count: 6
[pool-5-thread-1] INFO app - threadId: 35, element: (26,Member 26)
[pool-6-thread-2] INFO app - threadId: 38, element: (7,Member 7), count: 7
[pool-5-thread-1] INFO app - threadId: 35, element: (27,Member 27)
[pool-5-thread-1] INFO app - threadId: 35, element: (28,Member 28)
[pool-6-thread-3] INFO app - threadId: 40, element: (8,Member 8), count: 8
[pool-5-thread-1] INFO app - threadId: 35, element: (29,Member 29)
[pool-5-thread-1] INFO app - threadId: 35, element: (30,Member 30)
[pool-5-thread-1] INFO app - threadId: 35, element: (31,Member 31)
[pool-6-thread-1] INFO app - threadId: 36, element: (9,Member 9), count: 9
[pool-5-thread-1] INFO app - threadId: 35, element: (32,Member 32)
[pool-5-thread-1] INFO app - threadId: 35, element: (33,Member 33)
[pool-5-thread-1] INFO app - threadId: 35, element: (34,Member 34)
[pool-6-thread-2] INFO app - threadId: 38, element: (10,Member 10), count: 10
[pool-5-thread-1] INFO app - threadId: 35, element: (35,Member 35)
[pool-5-thread-1] INFO app - threadId: 35, element: (36,Member 36)
[pool-5-thread-1] INFO app - threadId: 35, element: (37,Member 37)
[pool-5-thread-1] INFO app - threadId: 35, element: (38,Member 38)
[pool-5-thread-1] INFO app - threadId: 35, element: (39,Member 39)
[pool-5-thread-1] INFO app - threadId: 35, element: (40,Member 40)
[pool-5-thread-1] INFO app - threadId: 35, element: (41,Member 41)
[pool-5-thread-1] INFO app - threadId: 35, element: (42,Member 42)
[pool-5-thread-1] INFO app - threadId: 35, element: (43,Member 43)
[pool-5-thread-1] INFO app - threadId: 35, element: (44,Member 44)
[pool-5-thread-1] INFO app - threadId: 35, element: (45,Member 45)
[pool-5-thread-1] INFO app - threadId: 35, element: (46,Member 46)
[pool-5-thread-1] INFO app - threadId: 35, element: (47,Member 47)
[pool-5-thread-1] INFO app - threadId: 35, element: (48,Member 48)
[pool-5-thread-1] INFO app - threadId: 35, element: (49,Member 49)
[pool-5-thread-1] INFO app - threadId: 35, element: (50,Member 50)
[pool-5-thread-1] INFO app - threadId: 35, element: (51,Member 51)
[pool-5-thread-1] INFO app - threadId: 35, element: (52,Member 52)
[pool-5-thread-1] INFO app - threadId: 35, element: (53,Member 53)
[pool-5-thread-1] INFO app - threadId: 35, element: (54,Member 54)
[pool-5-thread-1] INFO app - threadId: 35, element: (55,Member 55)
[pool-5-thread-1] INFO app - threadId: 35, element: (56,Member 56)
[pool-5-thread-1] INFO app - threadId: 35, element: (57,Member 57)
[pool-5-thread-1] INFO app - threadId: 35, element: (58,Member 58)
[pool-5-thread-1] INFO app - threadId: 35, element: (59,Member 59)
[pool-6-thread-2] INFO scalikejdbc.streams.DatabaseSubscription - Subscription#cancel() called from subscriber: example.ReactiveStreamsExample$$anon$1@a475d59
[pool-5-thread-1] INFO app - threadId: 35, element: (60,Member 60)
[pool-5-thread-1] INFO app - threadId: 35, element: (61,Member 61)
[pool-5-thread-1] INFO app - threadId: 35, element: (62,Member 62)
[pool-5-thread-1] INFO app - threadId: 35, element: (63,Member 63)
[pool-5-thread-1] INFO app - threadId: 35, element: (64,Member 64)
[pool-5-thread-1] INFO app - threadId: 35, element: (65,Member 65)
[pool-5-thread-1] INFO app - threadId: 35, element: (66,Member 66)
[pool-5-thread-1] INFO app - threadId: 35, element: (67,Member 67)
[pool-5-thread-1] INFO app - threadId: 35, element: (68,Member 68)
[pool-5-thread-1] INFO app - threadId: 35, element: (69,Member 69)
[pool-5-thread-1] INFO app - threadId: 35, element: (70,Member 70)
[pool-5-thread-1] INFO app - threadId: 35, element: (71,Member 71)
[pool-5-thread-1] INFO app - threadId: 35, element: (72,Member 72)
[pool-5-thread-1] INFO app - threadId: 35, element: (73,Member 73)
[pool-5-thread-1] INFO app - threadId: 35, element: (74,Member 74)
[pool-5-thread-2] INFO scalikejdbc.streams.DatabaseSubscription - Cancellation from subscriber: example.ReactiveStreamsExample$$anon$1@a475d59 detected
[pool-5-thread-1] INFO app - threadId: 35, element: (75,Member 75)
[pool-5-thread-1] INFO app - threadId: 35, element: (76,Member 76)
[pool-5-thread-1] INFO app - threadId: 35, element: (77,Member 77)
[pool-5-thread-1] INFO app - threadId: 35, element: (78,Member 78)
[pool-5-thread-1] INFO app - threadId: 35, element: (79,Member 79)
[pool-5-thread-1] INFO app - threadId: 35, element: (80,Member 80)
[pool-5-thread-1] INFO app - threadId: 35, element: (81,Member 81)
[pool-5-thread-1] INFO app - threadId: 35, element: (82,Member 82)
[pool-5-thread-1] INFO app - threadId: 35, element: (83,Member 83)
[pool-5-thread-1] INFO app - threadId: 35, element: (84,Member 84)
[pool-5-thread-1] INFO app - threadId: 35, element: (85,Member 85)
[pool-5-thread-1] INFO app - threadId: 35, element: (86,Member 86)
[pool-5-thread-1] INFO app - threadId: 35, element: (87,Member 87)
[pool-5-thread-1] INFO app - threadId: 35, element: (88,Member 88)
[pool-5-thread-1] INFO app - threadId: 35, element: (89,Member 89)
[pool-5-thread-1] INFO app - threadId: 35, element: (90,Member 90)
[pool-5-thread-1] INFO app - threadId: 35, element: (91,Member 91)
[pool-5-thread-1] INFO app - threadId: 35, element: (92,Member 92)
[pool-5-thread-1] INFO app - threadId: 35, element: (93,Member 93)
[pool-5-thread-1] INFO app - threadId: 35, element: (94,Member 94)
[pool-5-thread-1] INFO app - threadId: 35, element: (95,Member 95)
[pool-5-thread-1] INFO app - threadId: 35, element: (96,Member 96)
[pool-5-thread-1] INFO app - threadId: 35, element: (97,Member 97)
[pool-5-thread-1] INFO app - threadId: 35, element: (98,Member 98)
[pool-5-thread-1] INFO app - threadId: 35, element: (99,Member 99)
[pool-5-thread-1] INFO app - threadId: 35, element: (100,Member 100)
Result - received elements before cancellation: Success(10), expected: 10
[success] Total time: 9 s, completed 2017/01/03 19:48:40

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment