Skip to content

Instantly share code, notes, and snippets.

@guersam
Last active August 9, 2017 12:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save guersam/9a9dd5d0202a55e2066a016ce413f180 to your computer and use it in GitHub Desktop.
Save guersam/9a9dd5d0202a55e2066a016ce413f180 to your computer and use it in GitHub Desktop.
fs2 cp949 decoder
import java.nio.charset.Charset
import fs2.{Chunk, Pipe, Pull, Pure, Stream}
object Fs2Util {
val cp949Charset: Charset = Charset.forName("cp949")
def cp949Decode[F[_]]: Pipe[F, Byte, String] =
_.chunks.through(cp949DecodeC)
def cp949DecodeC[F[_]]: Pipe[F, Chunk[Byte], String] = {
/**
* Returns the length of an incomplete multi-byte sequence at the end of
* `bs`. If `bs` ends with an ASCII byte or a complete multi-byte sequence,
* 0 is returned.
*/
def lastIncompleteBytes(bs: Array[Byte]): Int = {
var cur = 0
val len = bs.size
while (cur < len) {
if ((bs(cur) & 0xFF) > 0x80) cur += 2
else cur += 1
}
cur - len
}
def processSingleChunk(outputAndBuffer: (List[String], Chunk[Byte]), nextBytes: Chunk[Byte]): (List[String], Chunk[Byte]) = {
val (output, buffer) = outputAndBuffer
val allBytes = Array.concat(buffer.toArray, nextBytes.toArray)
val splitAt = allBytes.size - lastIncompleteBytes(allBytes)
if (splitAt == allBytes.size)
(new String(allBytes, cp949Charset) :: output, Chunk.empty)
else if (splitAt == 0)
(output, Chunk.bytes(allBytes))
else
(new String(allBytes.take(splitAt), cp949Charset) :: output, Chunk.bytes(allBytes.drop(splitAt)))
}
def doPull(buf: Chunk[Byte], s: Stream[Pure, Chunk[Byte]]): Pull[Pure, String, Option[Stream[Pure, Chunk[Byte]]]] = {
s.pull.unconsChunk.flatMap {
case Some((byteChunks, tail)) =>
val (output, nextBuffer) = byteChunks.toList.foldLeft((List.empty[String], buf))(processSingleChunk)
Pull.output(Chunk.seq(output.reverse)) >> doPull(nextBuffer, tail)
case None if buf.nonEmpty =>
Pull.output1(new String(buf.toArray, cp949Charset)) >> Pull.pure(None)
case None =>
Pull.pure(None)
}
}
((in: Stream[Pure, Chunk[Byte]]) => doPull(Chunk.empty, in).stream).covary[F]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment