Skip to content

Instantly share code, notes, and snippets.

@PierreMage
Last active February 22, 2016 16:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PierreMage/30a281aed21ea869f87c to your computer and use it in GitHub Desktop.
Save PierreMage/30a281aed21ea869f87c to your computer and use it in GitHub Desktop.
Testing Scalding type-safe API
class SessionsStatsJob(args: Args) extends Job(args) {
import ExternalOperations._
import SessionsStatsJob._
val maxIdleTimeInMillis = args.getOrElse("maxIdleTimeInMillis", "100").toInt
val input = args("input")
val output = args("output")
val events = TypedPipe.from(TypedCsv[(Int, Int, String, String)](input))
.map { case (timestamp, userId, _, _) => Event(timestamp, userId) }
events.extractSessions(maxIdleTimeInMillis)
.summarizeSessions
.write(TypedCsv[(Int, Int, Double, Double)](output))
}
object SessionsStatsJob {
case class Event(timestamp: Int, userId: Int)
case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) {
val duration: Int = end - start
def add(event: Event): Session =
copy(end = event.timestamp, numberOfActions = numberOfActions + 1)
}
case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) {
def add(session: Session): SessionsSummary =
copy(
duration = duration + session.duration,
numberOfActions = numberOfActions + session.numberOfActions,
numberOfSessions = numberOfSessions + 1
)
def stats(): (Int, Int, Double, Double) = {
val averageDuration = duration.toDouble / numberOfSessions
val averageNumberOfActions = numberOfActions.toDouble / numberOfSessions
(userId, numberOfSessions, averageDuration, averageNumberOfActions)
}
}
object SessionsSummary {
def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] =
sessions.groupBy { case Session(userId, _, _, _) => userId }
.map { case (userId, ss) =>
ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((summary, session) => summary.add(session))
}
}
}
object ExternalOperations {
import SessionsStatsJob._
trait ExtractSessions {
def events: TypedPipe[Event]
def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] =
events.groupBy { case Event(_, userId) => userId }
.foldLeft(List.empty[Session]) {
case (head :: tail, event@Event(ts, _)) if (ts - head.end) < maxIdleTimeInMillis =>
head.add(event) :: tail
case (sessions, Event(timestamp, userId)) =>
Session(userId, timestamp, timestamp, 1) :: sessions
}.values
}
implicit class ExtractSessionsWrapper(val events: TypedPipe[Event]) extends ExtractSessions
trait SummarizeSessions {
def sessions: TypedPipe[List[Session]]
def summarizeSessions: TypedPipe[(Int, Int, Double, Double)] =
sessions.flatMap(SessionsSummary.fromSessions).map(_.stats())
}
implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]])
extends SummarizeSessions
}
class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl {
import SessionsStatsJob._
"ExternalOperations" should {
import ExternalOperations._
"extract sessions" in {
Given {
List(
Event(1001, 1),
Event(1010, 1),
Event(1019, 1),
Event(1027, 1),
Event(1027, 2),
Event(1029, 2),
Event(1037, 1)
)
} When {
events: TypedPipe[Event] => events.extractSessions(10)
} Then {
_.toSet shouldBe Set(
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)),
List(Session(2, 1027, 1029, 2))
)
}
}
"summarize sessions" in {
Given {
List(
List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)),
List(Session(2, 1027, 1029, 2))
)
} When {
sessions: TypedPipe[List[Session]] => sessions.summarizeSessions
} Then {
_.toSet shouldBe Set((1, 2, 13.0, 2.5), (2, 1, 2, 2))
}
}
}
"SessionsStatsJob" should {
"calculate average session duration and average number of actions per session" in {
val events = Seq(
(1000, 1, "get", "session1_event1"),
(1010, 1, "click", "session1_event2"),
(1020, 1, "click", "session1_event3"),
(1030, 1, "put", "session1_event4"),
(1100, 2, "get", "session1_event1"),
(1110, 2, "click", "session1_event2"),
(1160, 2, "put", "session1_event3"),
(1200, 1, "get", "session2_event1"),
(1210, 1, "click", "session2_event2"),
(1260, 1, "put", "session2_event3")
)
val expectedAverages = Set(
(1, 2, 45.0, 3.5),
(2, 1, 60.0, 3.0)
)
JobTest[SessionsStatsJob]
.arg("input", "events")
.arg("output", "averages")
.arg("maxIdleTimeInMillis", "100")
.source(TypedCsv[(Int, Int, String, String)]("events"), events)
.sink[(Int, Int, Double, Double)](TypedCsv[(Int, Int, Double, Double)]("averages")) {
_.toSet shouldBe expectedAverages
}
.run.finish
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment