Last active
February 22, 2016 16:34
-
-
Save PierreMage/30a281aed21ea869f87c to your computer and use it in GitHub Desktop.
Testing Scalding type-safe API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SessionsStatsJob(args: Args) extends Job(args) { | |
import ExternalOperations._ | |
import SessionsStatsJob._ | |
val maxIdleTimeInMillis = args.getOrElse("maxIdleTimeInMillis", "100").toInt | |
val input = args("input") | |
val output = args("output") | |
val events = TypedPipe.from(TypedCsv[(Int, Int, String, String)](input)) | |
.map { case (timestamp, userId, _, _) => Event(timestamp, userId) } | |
events.extractSessions(maxIdleTimeInMillis) | |
.summarizeSessions | |
.write(TypedCsv[(Int, Int, Double, Double)](output)) | |
} | |
object SessionsStatsJob { | |
case class Event(timestamp: Int, userId: Int) | |
case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) { | |
val duration: Int = end - start | |
def add(event: Event): Session = | |
copy(end = event.timestamp, numberOfActions = numberOfActions + 1) | |
} | |
case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) { | |
def add(session: Session): SessionsSummary = | |
copy( | |
duration = duration + session.duration, | |
numberOfActions = numberOfActions + session.numberOfActions, | |
numberOfSessions = numberOfSessions + 1 | |
) | |
def stats(): (Int, Int, Double, Double) = { | |
val averageDuration = duration.toDouble / numberOfSessions | |
val averageNumberOfActions = numberOfActions.toDouble / numberOfSessions | |
(userId, numberOfSessions, averageDuration, averageNumberOfActions) | |
} | |
} | |
object SessionsSummary { | |
def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] = | |
sessions.groupBy { case Session(userId, _, _, _) => userId } | |
.map { case (userId, ss) => | |
ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((summary, session) => summary.add(session)) | |
} | |
} | |
} | |
object ExternalOperations { | |
import SessionsStatsJob._ | |
trait ExtractSessions { | |
def events: TypedPipe[Event] | |
def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] = | |
events.groupBy { case Event(_, userId) => userId } | |
.foldLeft(List.empty[Session]) { | |
case (head :: tail, event@Event(ts, _)) if (ts - head.end) < maxIdleTimeInMillis => | |
head.add(event) :: tail | |
case (sessions, Event(timestamp, userId)) => | |
Session(userId, timestamp, timestamp, 1) :: sessions | |
}.values | |
} | |
implicit class ExtractSessionsWrapper(val events: TypedPipe[Event]) extends ExtractSessions | |
trait SummarizeSessions { | |
def sessions: TypedPipe[List[Session]] | |
def summarizeSessions: TypedPipe[(Int, Int, Double, Double)] = | |
sessions.flatMap(SessionsSummary.fromSessions).map(_.stats()) | |
} | |
implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]]) | |
extends SummarizeSessions | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl { | |
import SessionsStatsJob._ | |
"ExternalOperations" should { | |
import ExternalOperations._ | |
"extract sessions" in { | |
Given { | |
List( | |
Event(1001, 1), | |
Event(1010, 1), | |
Event(1019, 1), | |
Event(1027, 1), | |
Event(1027, 2), | |
Event(1029, 2), | |
Event(1037, 1) | |
) | |
} When { | |
events: TypedPipe[Event] => events.extractSessions(10) | |
} Then { | |
_.toSet shouldBe Set( | |
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), | |
List(Session(2, 1027, 1029, 2)) | |
) | |
} | |
} | |
"summarize sessions" in { | |
Given { | |
List( | |
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), | |
List(Session(2, 1027, 1029, 2)) | |
) | |
} When { | |
sessions: TypedPipe[List[Session]] => sessions.summarizeSessions | |
} Then { | |
_.toSet shouldBe Set((1, 2, 13.0, 2.5), (2, 1, 2, 2)) | |
} | |
} | |
} | |
"SessionsStatsJob" should { | |
"calculate average session duration and average number of actions per session" in { | |
val events = Seq( | |
(1000, 1, "get", "session1_event1"), | |
(1010, 1, "click", "session1_event2"), | |
(1020, 1, "click", "session1_event3"), | |
(1030, 1, "put", "session1_event4"), | |
(1100, 2, "get", "session1_event1"), | |
(1110, 2, "click", "session1_event2"), | |
(1160, 2, "put", "session1_event3"), | |
(1200, 1, "get", "session2_event1"), | |
(1210, 1, "click", "session2_event2"), | |
(1260, 1, "put", "session2_event3") | |
) | |
val expectedAverages = Set( | |
(1, 2, 45.0, 3.5), | |
(2, 1, 60.0, 3.0) | |
) | |
JobTest[SessionsStatsJob] | |
.arg("input", "events") | |
.arg("output", "averages") | |
.arg("maxIdleTimeInMillis", "100") | |
.source(TypedCsv[(Int, Int, String, String)]("events"), events) | |
.sink[(Int, Int, Double, Double)](TypedCsv[(Int, Int, Double, Double)]("averages")) { | |
_.toSet shouldBe expectedAverages | |
} | |
.run.finish | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment