Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ByteStream for fs2 - type-tagged byte encoding for Stream[F, Byte]
package teikametrics
import java.nio.charset.{Charset, StandardCharsets}
import akka.util.ByteString
import fs2.{Chunk, Pipe, RaiseThrowable, Stream}
import teikametrics.ByteEncoding._
/** Streamed bytes claiming to be encoded under `BE`
*
* @param chunkSize A chunk size to use for streaming operations, in order to use a consistent value and minimize array copying
*/
case class ByteStream[F[_], BE <: ByteEncoding](
bytes: Stream[F, Byte],
chunkSize: Option[Int]
) {
def recode[BE2 <: ByteEncoding](
implicit rc: Recoder[F, BE, BE2]): ByteStream[F, BE2] =
copy(bytes = rc.recodeSized(chunkSize)(bytes))
def decode[Entity](implicit dc: Decoder[F, BE, Entity]): Stream[F, Entity] =
dc.decodeSized(chunkSize)(bytes)
def decompress[BE2 <: ByteEncoding](
implicit rc: Recoder[F, BE, BE2],
ev: BE <:< ByteCompression[BE2]): ByteStream[F, BE2] = {
val _ = ev
recode(rc)
}
}
// NB deliberately not sealed so use sites can define their own custom encodings if needed
trait ByteEncoding
trait ByteCompression[BE <: ByteEncoding] extends ByteEncoding
object ByteEncoding extends ByteEncodingInstances {
// All `sealed trait` because this is always used as a phantom type - we never want to instantiate
sealed trait Unknown extends ByteEncoding
sealed trait Gzip[BE <: ByteEncoding] extends ByteCompression[BE]
sealed trait Utf8 extends ByteEncoding
/** Utf8 but with a leading BOM */
sealed trait Utf8Bom extends ByteEncoding
sealed trait Cp1252 extends ByteEncoding
sealed trait Latin1 extends ByteEncoding
/** Decodes streamed bytes into a stream of `Entity` */
trait Decoder[F[_], BE <: ByteEncoding, Entity] { outer =>
def decode: Pipe[F, Byte, Entity]
/** Decode with a suggested chunk size. Implementations are free to ignore the hint,
* but consistent sizing may lead to better performance */
def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity]
final def decodeSized(sizeHint: Option[Int]): Pipe[F, Byte, Entity] =
sizeHint.fold(decode)(decodeSized)
def through[Entity2](
pipe: Pipe[F, Entity, Entity2]
): Decoder[F, BE, Entity2] =
new Decoder[F, BE, Entity2] {
override def decode: Pipe[F, Byte, Entity2] =
_.through(outer.decode).through(pipe)
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity2] =
_.through(outer.decodeSized(sizeHint)).through(pipe)
}
def contraRecode[BE2 <: ByteEncoding](
implicit rc: Recoder[F, BE2, BE]
): Decoder[F, BE2, Entity] =
rc.decodeTo(this)
}
object Decoder extends DecoderInstances {
/** Implicit summoner */
def apply[F[_], BE <: ByteEncoding, Entity](
implicit dc: Decoder[F, BE, Entity]): Decoder[F, BE, Entity] = dc
/** NB only single-byte charsets should be passed - UTF8 can break depending on chunk boundaries */
def forCharset[F[_], BE <: ByteEncoding](
charset: Charset
): Decoder[F, BE, String] = {
require(
!multibyteCharsets.contains(charset),
"Multibyte charsets cannot be used safely - use fromPipe instead with a specialized pipe"
)
Decoder.fromPipe[BE](_.mapChunks { c =>
Chunk.singleton(new String(c.toBytes.toArray, charset))
})
}
private val multibyteCharsets = Set(StandardCharsets.UTF_8,
StandardCharsets.UTF_16,
StandardCharsets.UTF_16BE,
StandardCharsets.UTF_16LE)
/** Constructs a `Decoder` based on any Pipe.
*
* The returned decoder ignores size hints
*
* @tparam BE The encoding that you claim the pipe handles. Caller must be certain the encoding matches the pipe,
* or the resulting decoder may return failed streams when applied.
* @example {{{
* // Note this decoder is actually available already
* implicit val dc: Decoder[F, Utf8, String] = Decoder.fromPipe[Utf8](fs2.text.utf8Decode[F])
* }}}
*/
def fromPipe[BE <: ByteEncoding]: FromPipe[BE] = new FromPipe[BE]
/** Like `fromPipe` but instead of ignoring size hints, uses them and has a default when not set */
def fromPipeSized[BE <: ByteEncoding]: FromPipeSized[BE] =
new FromPipeSized[BE]
class FromPipe[BE <: ByteEncoding] {
def apply[F[_], Entity](
pipe: Pipe[F, Byte, Entity]): Decoder[F, BE, Entity] =
new PipeDecoder[F, BE, Entity](pipe, _ => pipe)
}
class FromPipeSized[BE <: ByteEncoding] {
def apply[F[_], Entity](
defaultSize: Int,
pipe: Int => Pipe[F, Byte, Entity]): Decoder[F, BE, Entity] =
new PipeDecoder[F, BE, Entity](pipe(defaultSize), pipe)
}
}
trait Recoder[F[_], In <: ByteEncoding, Out <: ByteEncoding]
extends Decoder[F, In, Byte] { self =>
def recode: Pipe[F, Byte, Byte] = decode
def recodeSized(sizeHint: Int): Pipe[F, Byte, Byte] =
decodeSized(sizeHint)
final def recodeSized(sizeHint: Option[Int]): Pipe[F, Byte, Byte] =
sizeHint.fold(recode)(recodeSized)
def andThen[Out2 <: ByteEncoding](
rc: Recoder[F, Out, Out2]
): Recoder[F, In, Out2] =
new Recoder[F, In, Out2] {
override def decode: Pipe[F, Byte, Byte] =
self.decode.andThen(rc.decode)
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Byte] =
self.decodeSized(sizeHint).andThen(rc.decodeSized(sizeHint))
}
def decodeTo[Entity](
implicit dc: Decoder[F, Out, Entity]): Decoder[F, In, Entity] =
new Decoder[F, In, Entity] {
override def decode: Pipe[F, Byte, Entity] =
self.decode.andThen(dc.decode)
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity] =
self.decodeSized(sizeHint).andThen(dc.decodeSized(sizeHint))
}
}
object Recoder extends RecoderInstances {
def apply[F[_], BE <: ByteEncoding, BE2 <: ByteEncoding](
implicit rc: Recoder[F, BE, BE2]
): Recoder[F, BE, BE2] = rc
}
}
trait ByteEncodingInstances { this: ByteEncoding.type =>
trait DecoderInstances {
implicit def utf8Decoder[F[_]]: Decoder[F, Utf8, String] =
Decoder.fromPipe[Utf8](fs2.text.utf8Decode[F])
implicit def cp1252Decoder[F[_]]: Decoder[F, Cp1252, String] =
Decoder.forCharset(Charset.forName("CP1252"))
implicit def latin1Decoder[F[_]]: Decoder[F, Latin1, String] =
Decoder.forCharset(StandardCharsets.ISO_8859_1)
// Loses encoding tracking, but needed for certain operations.
implicit def bytestringDecoder[F[_], BE <: ByteEncoding]
: Decoder[F, BE, ByteString] =
Decoder.fromPipe[BE](Fs2AkkaCompat.byteToByteString)
implicit def gunzipDecoder[
F[_]: RaiseThrowable,
BE <: ByteEncoding,
Entity
](
implicit dc: Decoder[F, BE, Entity],
rc: Recoder[F, Gzip[BE], BE]
): Decoder[F, Gzip[BE], Entity] =
new Decoder[F, Gzip[BE], Entity] {
override def decode: Pipe[F, Byte, Entity] =
_.through(rc.recode).through(dc.decode)
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity] =
_.through(rc.recodeSized(sizeHint)).through(dc.decodeSized(sizeHint))
}
}
trait RecoderInstances {
implicit def identityRecoder[F[_], BE <: ByteEncoding]: Recoder[F, BE, BE] =
new IdentityRecoder
implicit def gunzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding]
: Recoder[F, Gzip[BE], BE] = new GunzipRecoder[F, BE]
implicit def gzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding]
: Recoder[F, BE, Gzip[BE]] = new GzipRecoder[F, BE]
implicit def stripUtf8Bom[F[_]]: Recoder[F, Utf8Bom, Utf8] =
new Recoder[F, Utf8Bom, Utf8] {
override def decode: Pipe[F, Byte, Byte] =
Fs2Utils.stripUtf8Bom[F]
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Byte] =
Fs2Utils.stripUtf8Bom[F]
}
}
protected class PipeDecoder[F[_], BE <: ByteEncoding, Entity](
pipe: Pipe[F, Byte, Entity],
sizePipe: Int => Pipe[F, Byte, Entity])
extends Decoder[F, BE, Entity] {
final override def decode: Pipe[F, Byte, Entity] = pipe
final override def decodeSized(chunkSize: Int): Pipe[F, Byte, Entity] =
sizePipe(chunkSize)
}
protected class SizedPipeDecoder[F[_], BE <: ByteEncoding, Entity](
defaultSize: Int,
sizePipe: Int => Pipe[F, Byte, Entity])
extends PipeDecoder[F, BE, Entity](sizePipe(defaultSize), sizePipe)
private class IdentityRecoder[F[_], BE <: ByteEncoding]
extends PipeDecoder[F, BE, Byte](in => in, _ => in => in)
with Recoder[F, BE, BE]
private class GunzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding]
extends SizedPipeDecoder[F, Gzip[BE], Byte](4096,
fs2.compress.gunzip[F](_))
with Recoder[F, Gzip[BE], BE]
private class GzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding]
extends SizedPipeDecoder[F, BE, Byte](4096, fs2.compress.gzip[F])
with Recoder[F, BE, Gzip[BE]]
}
import fs2._
import teikametrics.ByteEncoding._
/** A more type-safe version of `S3Key => Stream[F, A]`, to avoid byte
* encoding errors while also being easy to stub for testing.
*
* Classes can use this dependency to become ignorant of what encoding their data source uses,
* leaving that to a higher composition layer.
*/
trait S3Downloader[F[_], Entity] { self =>
def getEntity(key: S3Key): Stream[F, Entity]
final def through[Entity2](
pipe: Pipe[F, Entity, Entity2]
): S3Downloader[F, Entity2] =
(key: S3Key) => self.getEntity(key).through(pipe)
}
/** An S3Downloader based on some form of ByteStream with a known encoding */
trait ByteS3Downloader[F[_], Entity] extends S3Downloader[F, Entity] { self =>
protected type BE <: ByteEncoding
protected def getBytes(key: S3Key): ByteStream[F, BE]
protected implicit def dc: Decoder[F, BE, Entity]
def getEntity(key: S3Key): Stream[F, Entity] =
getBytes(key).decode[Entity]
}
object DummyS3Downloader {
def apply[F[_], E](items: Seq[E]) =
new DummyS3Downloader[F, E](Stream.emits(items).covary[F])
def empty[F[_], E] =
new DummyS3Downloader[F, E](Stream.empty)
def error[F[_]: RaiseThrowable, E] =
new DummyS3Downloader[F, E](
Stream.raiseError[F](
new IllegalStateException(s"getBytes() called on ErrorS3Downloader")))
}
class DummyS3Downloader[F[_], Entity](value: Stream[F, Entity])
extends S3Downloader[F, Entity] {
override def getEntity(key: S3Key): Stream[F, Entity] = value
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment