-
-
Save Tvaroh/f439101ea064f38f877853f16f13ade2 to your computer and use it in GitHub Desktop.
Eventuate-based tree CRDT attempt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Edge[A](nodeId: String, parentId: String, entry: A) | |
case class TreeCrdt[A](edges: ORSet[Edge[A]] = ORSet.apply[Edge[A]], | |
edgesByNodeId: Map[String, Edge[A]] = Map.empty[String, Edge[A]], | |
edgesByParentId: Map[String, Set[Edge[A]]] = Map.empty[String, Set[Edge[A]]]) | |
extends CRDTFormat { | |
import TreeCrdt._ | |
def value: Set[Edge[A]] = | |
edges.value | |
def createChildNode(parentId: String, nodeId: String, entry: A, timestamp: VectorTime): TreeCrdt[A] = { | |
val edge = Edge(nodeId, parentId, entry) | |
copy( | |
edges = edges.add(edge, timestamp), | |
edgesByNodeId = edgesByNodeId.updated(nodeId, edge), | |
edgesByParentId = edgesByParentId.updated(parentId, edgesByParentId.getOrElse(parentId, Set.empty) + edge) | |
) | |
} | |
def prepareDeleteSubTree(nodeId: String): Option[DeleteSubTreeOpPrepared] = { | |
def getChildren(edge: Edge[A]): Set[Edge[A]] = | |
edgesByParentId | |
.get(edge.nodeId) | |
.fold(Set.empty[Edge[A]])(children => children ++ children.flatMap(getChildren)) | |
edgesByNodeId.get(nodeId).map { edge => | |
val children = getChildren(edge) | |
val timestamps = children.flatMap(edges.prepareRemove) | |
val nodeIds = children.map(_.nodeId) | |
DeleteSubTreeOpPrepared(timestamps, nodeIds, edge.parentId) | |
} | |
} | |
def deleteSubTree(prepared: DeleteSubTreeOpPrepared): TreeCrdt[A] = | |
copy( | |
edges = edges.remove(prepared.timestamps), | |
edgesByNodeId = edgesByNodeId -- prepared.nodeIds, | |
edgesByParentId = edgesByParentId -- prepared.nodeIds - prepared.parentId | |
) | |
} | |
object TreeCrdt { | |
def apply[A]: TreeCrdt[A] = | |
new TreeCrdt[A]() | |
implicit def TreeCrdtServiceOps[A] = new CRDTServiceOps[TreeCrdt[A], Set[Edge[A]]] { | |
override def zero: TreeCrdt[A] = | |
TreeCrdt.apply[A] | |
override def value(crdt: TreeCrdt[A]): Set[Edge[A]] = | |
crdt.value | |
override def prepare(crdt: TreeCrdt[A], operation: Any): Option[Any] = | |
operation match { | |
case op@DeleteSubTreeOp(nodeId) => | |
crdt.prepareDeleteSubTree(nodeId) | |
case _ => | |
super.prepare(crdt, operation) | |
} | |
override def update(crdt: TreeCrdt[A], operation: Any, event: DurableEvent): TreeCrdt[A] = | |
operation match { | |
case CreateChildNodeOp(parentId, nodeId, entry) => | |
crdt.createChildNode(parentId, nodeId, entry.asInstanceOf[A], event.vectorTimestamp) | |
case op@DeleteSubTreeOpPrepared(_, _, _) => | |
crdt.deleteSubTree(op) | |
} | |
} | |
case class DeleteSubTreeOpPrepared(timestamps: Set[VectorTime], | |
nodeIds: Set[String], | |
parentId: String) | |
} | |
class TreeCrdtService[A](override val serviceId: String, | |
override val log: ActorRef) | |
(implicit override val system: ActorSystem, | |
override val ops: CRDTServiceOps[TreeCrdt[A], Set[Edge[A]]]) | |
extends CRDTService[TreeCrdt[A], Set[Edge[A]]] { | |
def createChildNode(treeId: String, parentId: String, nodeId: String, entry: A): Future[Set[Edge[A]]] = | |
op(treeId, CreateChildNodeOp(parentId, nodeId, entry)) | |
def deleteSubTree(treeId: String, nodeId: String): Future[Set[Edge[A]]] = | |
op(treeId, DeleteSubTreeOp(nodeId)) | |
start() | |
} | |
private[crdt] case class CreateChildNodeOp(parentId: String, nodeId: String, entry: Any) | |
private[crdt] case class DeleteSubTreeOp(nodeId: String) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment