Skip to content

Instantly share code, notes, and snippets.

@megri
Last active May 11, 2017 16:56
Show Gist options
  • Save megri/c4f5f692785b8a2ae2591d157bdb0a9e to your computer and use it in GitHub Desktop.
Save megri/c4f5f692785b8a2ae2591d157bdb0a9e to your computer and use it in GitHub Desktop.
import doobie._
import doobie.imports._
import scala.concurrent.duration._
import scalaz.Nondeterminism
import scalaz.concurrent.Task
import scalaz.std.list._
// def transactor: Transactor[Task] = ???
case class Notification2(invoice_number: Long, email: String)
case class Event2(invoice_number: Long, event_code: String)
val setup =
for {
_ <- sql"DROP TABLE IF EXISTS notification2".update.run
_ <- sql"DROP TABLE IF EXISTS event2".update.run
_ <- sql"CREATE TABLE notification2 (invoice_number BIGINT, email VARCHAR NOT NULL)".update.run
_ <- sql"CREATE TABLE event2 (invoice_number BIGINT, event_code VARCHAR NOT NULL)".update.run
_ <- sql"INSERT INTO notification2(invoice_number, email) VALUES(123, 'foo@bar.com')".update.run
} yield ()
setup.transact(transactor).unsafePerformSync
val transactionTask = {
def log(s: String) = Capture[ConnectionIO].apply(println(s))
val unhandledQ =
sql"""SELECT * FROM notification2 n WHERE n.invoice_number NOT IN (
SELECT e.invoice_number FROM event2 e WHERE e.event_code = 'NotificationDispatched'
) FOR UPDATE""".query[Notification2]
def simulatedRequestResponse(ns: List[Notification2]) =
Capture[ConnectionIO].apply(
Task
.schedule(ns.map(n => Event2(n.invoice_number, "NotificationDispatched")), 1.second)
.unsafePerformSync
)
val insertEventQ =
Update[Event2]("""INSERT INTO event2 (invoice_number, event_code) VALUES (?, ?)""")
val transaction = for {
_ <- log("Retrieving unhandled notifications from database")
unhandled <- unhandledQ.list
_ <- log(s"Unhandled notifications: $unhandled")
_ <- log("Simulating request")
response <- simulatedRequestResponse(unhandled)
_ <- log("Inserting events")
updatedCount <- insertEventQ.updateMany(response)
} yield updatedCount
transaction.transact(transactor)
}
Nondeterminism[Task].gatherUnordered(
List(
Task.fork(transactionTask),
Task.fork(transactionTask),
)
).unsafePerformSync
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment