Skip to content

Instantly share code, notes, and snippets.

@quelgar
Last active March 15, 2019 02:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quelgar/05f360745dce8c4cc5447058e1b6aac1 to your computer and use it in GitHub Desktop.
Save quelgar/05f360745dce8c4cc5447058e1b6aac1 to your computer and use it in GitHub Desktop.
A sink to frame a ZIO stream by delimited boundaries. (eg lines of text)
import scalaz.zio.console._
import scalaz.zio.stream._
import scalaz.zio.{Chunk, ZIO}
object Framing {
def lfDelimiter = Chunk('\n')
def crLfDelimiter = Chunk('\r', '\n')
def delimited[@specialized A](maxSize: Int, delimiter: Chunk[A]): Sink[Any, Nothing, Chunk[A], Chunk[A], Chunk[A]] = {
Sink.fold(Chunk.empty: Chunk[A]) { (buffer, in) =>
val searchBuffer = buffer ++ in
findDelimiter(delimiter)(searchBuffer)
.map {
case (found, remaining) =>
Sink.Step.done(found, Chunk.succeed(remaining))
}
.getOrElse {
if (buffer.length + in.length > maxSize) {
Sink.Step.done(searchBuffer, Chunk.succeed(Chunk.empty))
} else {
Sink.Step.more(searchBuffer)
}
}
}
}
private def findDelimiter[@specialized A](delim: Chunk[A])(chunk: Chunk[A]): Option[(Chunk[A], Chunk[A])] = {
val length = delim.length
@scala.annotation.tailrec
def help(pos: Int): Option[(Chunk[A], Chunk[A])] = {
val compare = chunk.drop(pos).take(length)
if (compare.length < length) {
None
} else if (compare == delim) {
val (matched, remaining) = chunk.splitAt(pos)
Some((matched, remaining.drop(delim.length)))
} else {
help(pos + 1)
}
}
help(0)
}
def groupSink[A](groupSize: Int): Sink[Any, Nothing, A, A, Chunk[A]] = {
Sink.fold(Chunk.empty: Chunk[A]) { (group, a) =>
val next = group ++ Chunk.succeed(a)
if (next.length == groupSize) {
Sink.Step.done(next, Chunk.empty)
} else {
Sink.Step.more(next)
}
}
}
}
object Test extends scalaz.zio.App {
def printChunk(c: Chunk[_]): String = c.mkString("[", ",", "]") + s" (${c.length})"
override def run(args: List[String]): ZIO[Test.Environment, Nothing, Int] = {
val data =
"""line 1
|line 2 ABCDEFGHIJKLMNO
|line 3 ABCDEF
|line 4 AB
""".stripMargin
val grouped = Stream.fromIterable(data).transduce(Framing.groupSink[Char](5))
for {
_ <- grouped.foreach(chunk => putStrLn(printChunk(chunk)))
_ <- putStrLn("-----")
_ <- grouped
.transduce(Framing.delimited(150, Framing.lfDelimiter))
.foreach(chunk => putStrLn(printChunk(chunk)))
} yield {
0
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment