Skip to content

Instantly share code, notes, and snippets.

@rrmckinley
Last active December 24, 2015 11:19
Show Gist options
  • Save rrmckinley/6789833 to your computer and use it in GitHub Desktop.
Save rrmckinley/6789833 to your computer and use it in GitHub Desktop.
val sql = """select grandparent_id, parent_id, child_id
from children
order by grandparent_id, parent_id, child_id"""
def elementsR[R](invoker: scala.slick.jdbc.Invoker[R]): Process[Task, R] =
resource(Task.delay(invoker.elements()))(
src => Task.delay(src.close)) { src =>
Task.delay { if (src.hasNext) src.next else throw End }
}
def chunkWhen[I](f: (I, I) => Boolean): Process1[I, Vector[I]] = {
def go(acc: Vector[I]): Process1[I,Vector[I]] =
await1[I].flatMap { i =>
acc.lastOption match {
case Some(last) if ! f(last, i) => emit(acc) then go(Vector(i))
case _ => go(acc :+ i)
}
} orElse emit(acc)
go(Vector())
}
def dbWookie {
// (grandparent_id, parent_id, child_id)
val invoker = Q.queryNA[(Int, Int, Int)](sql)
val p = elementsR(invoker, t) |> chunkWhen(_._1 == _._1)
p.collect.attempt.run.foreach(_.foreach(println(_)))
}
@rrmckinley
Copy link
Author

Hi, I'm giving it a go in scalaz-stream. I'm using a chunkWhen to group known-to-be-sorted data. I think the source and emit side is going well. However, I can't understand how to consume the results in a streaming way. I can get it to work with collect, but that has a builder and pulls everything into memory. How can I do this without the collect? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment