Skip to content

Instantly share code, notes, and snippets.

@Tvaroh
Created October 6, 2016 06:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Tvaroh/f439101ea064f38f877853f16f13ade2 to your computer and use it in GitHub Desktop.
Save Tvaroh/f439101ea064f38f877853f16f13ade2 to your computer and use it in GitHub Desktop.
Eventuate-based tree CRDT attempt
case class Edge[A](nodeId: String, parentId: String, entry: A)
case class TreeCrdt[A](edges: ORSet[Edge[A]] = ORSet.apply[Edge[A]],
edgesByNodeId: Map[String, Edge[A]] = Map.empty[String, Edge[A]],
edgesByParentId: Map[String, Set[Edge[A]]] = Map.empty[String, Set[Edge[A]]])
extends CRDTFormat {
import TreeCrdt._
def value: Set[Edge[A]] =
edges.value
def createChildNode(parentId: String, nodeId: String, entry: A, timestamp: VectorTime): TreeCrdt[A] = {
val edge = Edge(nodeId, parentId, entry)
copy(
edges = edges.add(edge, timestamp),
edgesByNodeId = edgesByNodeId.updated(nodeId, edge),
edgesByParentId = edgesByParentId.updated(parentId, edgesByParentId.getOrElse(parentId, Set.empty) + edge)
)
}
def prepareDeleteSubTree(nodeId: String): Option[DeleteSubTreeOpPrepared] = {
def getChildren(edge: Edge[A]): Set[Edge[A]] =
edgesByParentId
.get(edge.nodeId)
.fold(Set.empty[Edge[A]])(children => children ++ children.flatMap(getChildren))
edgesByNodeId.get(nodeId).map { edge =>
val children = getChildren(edge)
val timestamps = children.flatMap(edges.prepareRemove)
val nodeIds = children.map(_.nodeId)
DeleteSubTreeOpPrepared(timestamps, nodeIds, edge.parentId)
}
}
def deleteSubTree(prepared: DeleteSubTreeOpPrepared): TreeCrdt[A] =
copy(
edges = edges.remove(prepared.timestamps),
edgesByNodeId = edgesByNodeId -- prepared.nodeIds,
edgesByParentId = edgesByParentId -- prepared.nodeIds - prepared.parentId
)
}
object TreeCrdt {
def apply[A]: TreeCrdt[A] =
new TreeCrdt[A]()
implicit def TreeCrdtServiceOps[A] = new CRDTServiceOps[TreeCrdt[A], Set[Edge[A]]] {
override def zero: TreeCrdt[A] =
TreeCrdt.apply[A]
override def value(crdt: TreeCrdt[A]): Set[Edge[A]] =
crdt.value
override def prepare(crdt: TreeCrdt[A], operation: Any): Option[Any] =
operation match {
case op@DeleteSubTreeOp(nodeId) =>
crdt.prepareDeleteSubTree(nodeId)
case _ =>
super.prepare(crdt, operation)
}
override def update(crdt: TreeCrdt[A], operation: Any, event: DurableEvent): TreeCrdt[A] =
operation match {
case CreateChildNodeOp(parentId, nodeId, entry) =>
crdt.createChildNode(parentId, nodeId, entry.asInstanceOf[A], event.vectorTimestamp)
case op@DeleteSubTreeOpPrepared(_, _, _) =>
crdt.deleteSubTree(op)
}
}
case class DeleteSubTreeOpPrepared(timestamps: Set[VectorTime],
nodeIds: Set[String],
parentId: String)
}
class TreeCrdtService[A](override val serviceId: String,
override val log: ActorRef)
(implicit override val system: ActorSystem,
override val ops: CRDTServiceOps[TreeCrdt[A], Set[Edge[A]]])
extends CRDTService[TreeCrdt[A], Set[Edge[A]]] {
def createChildNode(treeId: String, parentId: String, nodeId: String, entry: A): Future[Set[Edge[A]]] =
op(treeId, CreateChildNodeOp(parentId, nodeId, entry))
def deleteSubTree(treeId: String, nodeId: String): Future[Set[Edge[A]]] =
op(treeId, DeleteSubTreeOp(nodeId))
start()
}
private[crdt] case class CreateChildNodeOp(parentId: String, nodeId: String, entry: Any)
private[crdt] case class DeleteSubTreeOp(nodeId: String)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment