Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Journal parallel writes prototype
package example
import scala.concurrent._
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
object Parwrite extends App {
val system = ActorSystem("example")
implicit val timeout = Timeout(5 seconds)
import system.dispatcher
val journal = system.actorOf(Props[Journal])
val target = system.actorOf(Props(new Actor {
def receive = {
case msg => {
println("received message " + msg)
sender ! "re: " + msg
}
}
}))
1 to 100 foreach { i =>
journal ? Write(i, target) onSuccess {
case msg => //println("received reply " + msg)
}
}
}
case class Write(msg: Any, target: ActorRef, sequenceNr: Long = -1)
case class Written(msg: Any, target: ActorRef)
class Journal extends Actor {
import context.dispatcher
implicit val timeout = Timeout(5 seconds)
val resequencer = context.actorOf(Props(new Resequencer))
val writers = List.fill(5)(context.actorOf(Props(new Writer)))
val writersCount = writers.length
var counter = 0L
def receive = {
case write: Write => {
counter += 1
val ctr = counter
val sdr = sender
val idx = ctr % writersCount toInt
val io = writers(idx) ? write.copy(sequenceNr = ctr)
io onSuccess {
case written: Written => {
resequencer tell ((ctr, written), sdr)
}
}
io onFailure {
case thr => // wtf
}
}
}
}
class Writer extends Actor {
def receive = {
case Write(msg, target, sequenceNr) => {
// write to storage backend
// ...
// reply successful write
sender ! Written(msg, target)
}
}
}
class Resequencer extends Actor {
import scala.collection.mutable.Map
private val delayed = Map.empty[Long, (Written, ActorRef)]
private var delivered = 0L
def receive = {
case (seqnr: Long, written: Written) => resequence(seqnr, written, sender)
}
@scala.annotation.tailrec
private def resequence(seqnr: Long, written: Written, sdr: ActorRef) {
import written._
if (seqnr == delivered + 1) {
delivered = seqnr
target tell (msg, sdr)
} else {
delayed += (seqnr -> (written, sender))
}
val eo = delayed.remove(delivered + 1)
if (eo.isDefined) resequence(delivered + 1, eo.get._1, eo.get._2)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment