Skip to content

Instantly share code, notes, and snippets.

@raboof
Created August 23, 2014 17:57
Show Gist options
  • Save raboof/f5d3a500bdfc758dfb9b to your computer and use it in GitHub Desktop.
Save raboof/f5d3a500bdfc758dfb9b to your computer and use it in GitHub Desktop.
import akka.actor.ActorPath
trait AtLeastOnceTransmission[A] extends akka.persistence.AtLeastOnceDelivery {
private var unconfirmed = Map[A, Set[Long]]()
def transmit(destination: ActorPath, transmissionId: A, message: Any): Unit = {
deliver(destination, { deliveryId ⇒
unconfirmed = add(unconfirmed, transmissionId, deliveryId)
message
})
}
def confirmTransmission(transmissionId: A): Boolean = {
unconfirmed.get(transmissionId) match {
case Some(deliveryIds) ⇒
unconfirmed = unconfirmed - transmissionId
deliveryIds.forall(super.confirmDelivery)
case None ⇒ false
}
}
def add[B, C](map: Map[B, Set[C]], key: B, value: C): Map[B, Set[C]] = map.updated(key, map.getOrElse(key, Set[C]()) + value)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment