Skip to content

Instantly share code, notes, and snippets.

@fwrq41251
Created September 5, 2022 02:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fwrq41251/91675f4697db255705b6a8d94ebd0bb7 to your computer and use it in GitHub Desktop.
Save fwrq41251/91675f4697db255705b6a8d94ebd0bb7 to your computer and use it in GitHub Desktop.
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package actorbintree
import akka.actor._
import scala.collection.immutable.Queue
object BinaryTreeSet {
sealed trait Operation {
def requester: ActorRef
def id: Int
def elem: Int
}
trait OperationReply {
def id: Int
}
/** Request with identifier `id` to insert an element `elem` into the tree.
* The actor at reference `requester` should be notified when this operation
* is completed.
*/
case class Insert(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request with identifier `id` to check whether an element `elem` is present
* in the tree. The actor at reference `requester` should be notified when
* this operation is completed.
*/
case class Contains(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request with identifier `id` to remove the element `elem` from the tree.
* The actor at reference `requester` should be notified when this operation
* is completed.
*/
case class Remove(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request to perform garbage collection */
case object GC
/** Holds the answer to the Contains request with identifier `id`.
* `result` is true if and only if the element is present in the tree.
*/
case class ContainsResult(id: Int, result: Boolean) extends OperationReply
/** Message to signal successful completion of an insert or remove operation. */
case class OperationFinished(id: Int) extends OperationReply
}
class BinaryTreeSet extends Actor {
import BinaryTreeNode._
import BinaryTreeSet._
def createRoot: ActorRef = context.actorOf(BinaryTreeNode.props(0, initiallyRemoved = true))
var root = createRoot
// optional (used to stash incoming operations during garbage collection)
var pendingQueue = Queue.empty[Operation]
// optional
def receive = normal
// var requestMap = Map.empty[Int, ActorRef]
// optional
/** Accepts `Operation` and `GC` messages. */
val normal: Receive = {
case o: Operation =>
root ! o
case GC =>
val newRoot = createRoot
root ! CopyTo(newRoot)
context.become(garbageCollecting(newRoot))
}
// optional
/** Handles messages while garbage collection is performed.
* `newRoot` is the root of the new binary tree where we want to copy
* all non-removed elements into.
*/
def garbageCollecting(newRoot: ActorRef): Receive = {
case o: Operation =>
pendingQueue = pendingQueue.enqueue(o)
case GC =>
case CopyFinished =>
root ! PoisonPill
root = newRoot
context.become(normal)
pendingQueue.foreach(o => root ! o)
pendingQueue = Queue.empty[Operation]
}
}
object BinaryTreeNode {
trait Position
case object Left extends Position
case object Right extends Position
case class CopyTo(treeNode: ActorRef)
/**
* Acknowledges that a copy has been completed. This message should be sent
* from a node to its parent, when this node and all its children nodes have
* finished being copied.
*/
case object CopyFinished
def props(elem: Int, initiallyRemoved: Boolean) = Props(classOf[BinaryTreeNode], elem, initiallyRemoved)
}
class BinaryTreeNode(val elem: Int, initiallyRemoved: Boolean) extends Actor {
import BinaryTreeNode._
import BinaryTreeSet._
var subtrees = Map[Position, ActorRef]()
var removed = initiallyRemoved
// optional
def receive = normal
// optional
/** Handles `Operation` messages and `CopyTo` requests. */
val normal: Receive = {
case msg@Insert(requester, id, elem) =>
if (elem == this.elem) {
if (removed) {
removed = false
}
requester ! OperationFinished(id)
} else {
val position = getPosition(elem)
if (subtrees.contains(position)) {
subtrees(position) ! msg
} else {
val sbuTree = context.actorOf(BinaryTreeNode.props(elem, initiallyRemoved = false))
subtrees = subtrees + (position -> sbuTree)
requester ! OperationFinished(id)
}
}
case msg@Remove(requester, id, elem) =>
if (elem == this.elem) {
removed = true
requester ! OperationFinished(id)
} else {
val position = getPosition(elem)
if (subtrees.contains(position)) {
subtrees(position) ! msg
} else {
requester ! OperationFinished(id)
}
}
case msg@Contains(requester, id, elem) =>
if (elem == this.elem && !removed) {
requester ! ContainsResult(id, result = true)
} else {
val position = getPosition(elem)
if (subtrees.contains(position)) {
subtrees(position) ! msg
} else {
requester ! ContainsResult(id, result = false)
}
}
case CopyTo(newTree) =>
val children = subtrees.values.toSet
if (children.isEmpty && removed) {
context.parent ! CopyFinished
} else {
context.become(copying(children, removed))
if (!removed) {
newTree ! Insert(self, 0, elem)
}
children.foreach(a => a ! CopyTo(newTree))
}
}
private def getPosition(elem: Int) = if (elem < this.elem) Left else Right
// optional
/** `expected` is the set of ActorRefs whose replies we are waiting for,
* `insertConfirmed` tracks whether the copy of this node to the new tree has been confirmed.
*/
def copying(expected: Set[ActorRef], insertConfirmed: Boolean): Receive = {
case OperationFinished(_) =>
if (expected.isEmpty) {
context.parent ! CopyFinished
} else {
context.become(copying(expected, insertConfirmed = true))
}
case CopyFinished =>
val newExpected = expected - sender()
if (newExpected.isEmpty && insertConfirmed) {
context.parent ! CopyFinished
} else {
context.become(copying(newExpected, insertConfirmed))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment