Skip to content

Instantly share code, notes, and snippets.

@afiore
Last active August 29, 2015 14:22
Show Gist options
  • Save afiore/320a94bbe06b984483ec to your computer and use it in GitHub Desktop.
Save afiore/320a94bbe06b984483ec to your computer and use it in GitHub Desktop.
akka persistence workshop
package pt.akka.workshop
import java.util.UUID
import akka.actor.{ActorLogging, Actor}
import akka.persistence.{RecoveryCompleted, SnapshotOffer, PersistentActor}
/*
So, our task is to implement a service actor for a votings management API.
A voting is two items competing against each other for the votes of users.
The voting ends when one of the items reaches max number of votes that was specified on voting's creation.
It should be possible to retrieve results for all present and past votings.
The API is:
POST /votings - where body is a json with fields: "itemAId":string, "itemBId":string, "maxVotes":int
POST /votings/<votingid> - where body is a json with fields: "votingId":string, "itemId":string, "userId":string
GEt /votings returns json with "winningItemId":string (optional), "votes":int, "finished":boolean
----------------------------
| Voting |
0 User1 | |
/|\ -------- | -------- -------- |
/ \ | Vote | | |Item A| |Item B| |
-------- | -------- -------- |
Item A --> | V: 4 V: 3 |
| |
----------------------------
Goals:
Path 1 (mandatory):
- creating votings, gathering votes and returning results
- basic error handling (voting or item does not exist, vote for a finished voting, duplicate item in a voting)
- all information that is needed to conform to the API must be preserved between application restarts.
(hence akka-persistence)
Path 2:
- it is illegal to create two votings with two the same items
- it is illegal for a user to vote more than once in a single voting
- state snapshots are used to allow for faster recovery
Path 3 (harder):
- a child actor is spawned to manage the state of each voting that is in progress - with its persistence.
- to handle increased load, the VotingsManager actor needs to be partitioned
- use persistAsync instead of persist and deal with the consequences;)
*/
object VotingsManager {
case class CreateVoting(itemAId:String, itemBId:String, maxVotes:Int)
case class VotingCreated(votingId:String)
case class Vote(votingId:String, itemId:String, userId:String)
/**
* Confirmation of the vote
* @param votes number of accumulated votes for the item
*/
case class VoteDone(votes:Int)
case class GetResult(votingId:String)
object VoteStore {
var votings = Map.empty[String, CreateVoting]
private var store = Map.empty[String, Map[String, String]] // VotingId -> Map(UserId -> ItemId)
def addVoting(vid: String, v: CreateVoting) = votings = votings + (vid -> v)
def getResult(votingId: String):Option[(String, Int)] =
votings.get(votingId).map { v =>
val z = ((v.itemAId, 0), (v.itemBId, 0))
val counts = store.getOrElse(votingId, Map.empty).foldLeft(z) {
case (((itemA, n), t2), (_, itemId)) if itemId == itemA =>
((itemA, n + 1), t2)
case ((t1, (itemB, n)), (_, itemId)) if itemId == itemB =>
(t1, (itemB, n + 1))
case (acc, _) => acc
}
z match {
case (a@(itemA, n), b@(itemB, m)) =>
if (n > m) a else b
}
}
def update(v: Vote): Unit = {
val votes = store.getOrElse(v.votingId, Map.empty) + (v.userId -> v.itemId)
store = store + (v.votingId -> votes)
}
}
/**
*
* @param winningItemId id of the winning item or None, if it is a draw
* (a draw is only possible if max number of votes is not reached yet).
* @param votes number of votes of the winning item or votes in a draw
* @param finished is the voting finished? (was max number of votes reached)
*/
case class VotingResult(winningItemId:Option[String], votes:Int, finished:Boolean)
}
class VotingsManager extends PersistentActor with ActorLogging {
import VotingsManager._
override def receiveCommand: Receive = {
case v@CreateVoting(itemAId, itemBId, maxVotes) =>
val replyTo = sender()
val id = java.util.UUID.randomUUID()
VoteStore.addVoting(id.toString, v)
persist(v) { v =>
replyTo ! VotingCreated(id.toString)
log.debug(s"Created a new voting with id: $id")
}
case v:Vote =>
VoteStore.update(v)
case GetResult(votingId) =>
val msg = VoteStore.getResult(votingId) match {
case Some((itemId, n)) => VotingResult(Some(itemId), n, true)
case _ => VotingResult(None, 0, false)
}
sender() ! msg
}
def receiveRecover = {
case VotingCreated(votingId) =>
log.debug(s"recovering VotingCreated: " + votingId)
case SnapshotOffer(_, snapshot: Any) =>
log.debug(s"Integrating snapshot: " + snapshot)
case RecoveryCompleted =>
log.info(s"Recovery of VotingsManager completed.")
case e =>
log.error(s"Received unknown event: "+e)
}
def persistenceId: String = "VotingsManager"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment