Skip to content

Instantly share code, notes, and snippets.

@beranradek
Created March 20, 2014 20:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save beranradek/9673234 to your computer and use it in GitHub Desktop.
Save beranradek/9673234 to your computer and use it in GitHub Desktop.
package com.spinoco.dojo
import scalaz.concurrent.Task
import scalaz.stream.{ process1, Process }
import Process._
import scala.concurrent.duration._
import scalaz.stream.io
/*
Time to put it all together.
So far, we have learned a few concepts about how to build and manipulate streams. We can use these simple concepts
to build something more interesting. Lets imagine we have a process of events that indicate user actions. We want to build
a service that will process these events and react to them.
TODO given process of events from `userEvents`, create a process that will:
- parse strings into case classes (you can use parseRequests helper function)
- emit 'UserLoggedIn' if password from UserLogin is correct for given user AND user is not already logged in
(you can use 'authenticate' function to check if password is correct)
- emit 'UserLoggedOut' if and only if user already logged in
the result you should expect when your code is correct is defined in val 'expectedResult'
When you are done, what you can do is to use `fileWithRequests` instead of `requests` as input for your service. There
are some helper functions in scalaz library for working with input streams that might help you out. Notice how little code you
have to change to swap things around.
If you have even more time, you can try to save all the messages from our service into some file.
*/
sealed trait Request
case class UserLogin(userName: String, password: String) extends Request
case class UserLogout(userName: String) extends Request
sealed trait Response
case class UserLoggedIn(userName: String) extends Response
case class UserLoggedOut(userName: String) extends Response
object DojoService {
import Utils._
//helper function for authenticating users
def authenticate(username: String, pass: String): Boolean = passwords.get(username).map(_ == pass).getOrElse(false)
//helper function for parsing messages from the stream
val parseRequests: (String) => Request = parseRequestsHelper _
//file for testing (when you are done with in-memory representation)
val fileWithRequests = this.getClass.getResourceAsStream("/testData.txt")
//request for our service
val requests: Process[Task, String] = io.linesR(fileWithRequests)
val durations: Process[Task, Duration] = awakeEvery(1.seconds)
val expectedResult = Vector(
UserLoggedIn("lister"),
UserLoggedIn("cat"),
UserLoggedOut("lister"),
UserLoggedIn("kryten"),
UserLoggedOut("kryten"),
UserLoggedOut("cat"))
// Kazdy interval provede take vypis vysledku do stdout
def parsedRequests: Process[Task, Request] = ((requests zip durations).map {
case (r, d) => r
} map parseRequests).observe(io.stdOut.contramap(s => s.toString() + "\n"))
def userStateProcess: Process[Task, Response] = {
def response(loggedUsers: Set[String]): Process1[Request, Response] = {
await1[Request] flatMap {
case UserLogin(userName, password) if !loggedUsers.contains(userName) =>
if (authenticate(userName, password)) {
emit(UserLoggedIn(userName)) fby response(loggedUsers + userName)
} else {
response(loggedUsers)
}
case UserLogin(userName, password) if loggedUsers.contains(userName) =>
response(loggedUsers)
case UserLogout(userName) if loggedUsers.contains(userName) =>
emit(UserLoggedOut(userName)) fby response(loggedUsers - userName)
case UserLogout(userName) if !loggedUsers.contains(userName) =>
response(loggedUsers)
}
}
parsedRequests |> response(Set.empty)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment