Skip to content

Instantly share code, notes, and snippets.

@jarutis
Created January 19, 2015 19:35
Show Gist options
  • Save jarutis/3957b00fbce586e6393a to your computer and use it in GitHub Desktop.
Save jarutis/3957b00fbce586e6393a to your computer and use it in GitHub Desktop.
import com.twitter.scalding.typed._
case class NextEvent(id: String, time: Long)
val events1 = List(
NextEvent("user1", 1421280000498L),
NextEvent("user1", 1421280000769L),
NextEvent("user2", 1421280000819L)
)
val events2 = List(
NextEvent("user1", 1421280000079L),
NextEvent("user2", 1421280000498L),
NextEvent("user3", 1421280000548L)
)
val eventList = List(events1, events2)
def firstEvent(pipe: TypedPipe[NextEvent]) = {
pipe
.map { e:NextEvent => (e.id, e.time) }
.group
.sortWithTake(1) { (prev, next) => (prev < next) }
}
val groupedEvents = eventList.map{ TypedPipe.from(_) }.map(firstEvent)
//works
groupedEvents(0).outerJoin(groupedEvents(1))
MultiJoin(groupedEvents(0), groupedEvents(1))
// how to join list of grouped events without listing each separately
// MultiJoin(groupedEvents)?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment