Created
April 8, 2019 19:59
-
-
Save Daenyth/eee68e71dbd68c3c6792d2380f52c528 to your computer and use it in GitHub Desktop.
ByteStream for fs2 - type-tagged byte encoding for Stream[F, Byte]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics | |
import java.nio.charset.{Charset, StandardCharsets} | |
import akka.util.ByteString | |
import fs2.{Chunk, Pipe, RaiseThrowable, Stream} | |
import teikametrics.ByteEncoding._ | |
/** Streamed bytes claiming to be encoded under `BE` | |
* | |
* @param chunkSize A chunk size to use for streaming operations, in order to use a consistent value and minimize array copying | |
*/ | |
case class ByteStream[F[_], BE <: ByteEncoding]( | |
bytes: Stream[F, Byte], | |
chunkSize: Option[Int] | |
) { | |
def recode[BE2 <: ByteEncoding]( | |
implicit rc: Recoder[F, BE, BE2]): ByteStream[F, BE2] = | |
copy(bytes = rc.recodeSized(chunkSize)(bytes)) | |
def decode[Entity](implicit dc: Decoder[F, BE, Entity]): Stream[F, Entity] = | |
dc.decodeSized(chunkSize)(bytes) | |
def decompress[BE2 <: ByteEncoding]( | |
implicit rc: Recoder[F, BE, BE2], | |
ev: BE <:< ByteCompression[BE2]): ByteStream[F, BE2] = { | |
val _ = ev | |
recode(rc) | |
} | |
} | |
// NB deliberately not sealed so use sites can define their own custom encodings if needed | |
trait ByteEncoding | |
trait ByteCompression[BE <: ByteEncoding] extends ByteEncoding | |
object ByteEncoding extends ByteEncodingInstances { | |
// All `sealed trait` because this is always used as a phantom type - we never want to instantiate | |
sealed trait Unknown extends ByteEncoding | |
sealed trait Gzip[BE <: ByteEncoding] extends ByteCompression[BE] | |
sealed trait Utf8 extends ByteEncoding | |
/** Utf8 but with a leading BOM */ | |
sealed trait Utf8Bom extends ByteEncoding | |
sealed trait Cp1252 extends ByteEncoding | |
sealed trait Latin1 extends ByteEncoding | |
/** Decodes streamed bytes into a stream of `Entity` */ | |
trait Decoder[F[_], BE <: ByteEncoding, Entity] { outer => | |
def decode: Pipe[F, Byte, Entity] | |
/** Decode with a suggested chunk size. Implementations are free to ignore the hint, | |
* but consistent sizing may lead to better performance */ | |
def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity] | |
final def decodeSized(sizeHint: Option[Int]): Pipe[F, Byte, Entity] = | |
sizeHint.fold(decode)(decodeSized) | |
def through[Entity2]( | |
pipe: Pipe[F, Entity, Entity2] | |
): Decoder[F, BE, Entity2] = | |
new Decoder[F, BE, Entity2] { | |
override def decode: Pipe[F, Byte, Entity2] = | |
_.through(outer.decode).through(pipe) | |
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity2] = | |
_.through(outer.decodeSized(sizeHint)).through(pipe) | |
} | |
def contraRecode[BE2 <: ByteEncoding]( | |
implicit rc: Recoder[F, BE2, BE] | |
): Decoder[F, BE2, Entity] = | |
rc.decodeTo(this) | |
} | |
object Decoder extends DecoderInstances { | |
/** Implicit summoner */ | |
def apply[F[_], BE <: ByteEncoding, Entity]( | |
implicit dc: Decoder[F, BE, Entity]): Decoder[F, BE, Entity] = dc | |
/** NB only single-byte charsets should be passed - UTF8 can break depending on chunk boundaries */ | |
def forCharset[F[_], BE <: ByteEncoding]( | |
charset: Charset | |
): Decoder[F, BE, String] = { | |
require( | |
!multibyteCharsets.contains(charset), | |
"Multibyte charsets cannot be used safely - use fromPipe instead with a specialized pipe" | |
) | |
Decoder.fromPipe[BE](_.mapChunks { c => | |
Chunk.singleton(new String(c.toBytes.toArray, charset)) | |
}) | |
} | |
private val multibyteCharsets = Set(StandardCharsets.UTF_8, | |
StandardCharsets.UTF_16, | |
StandardCharsets.UTF_16BE, | |
StandardCharsets.UTF_16LE) | |
/** Constructs a `Decoder` based on any Pipe. | |
* | |
* The returned decoder ignores size hints | |
* | |
* @tparam BE The encoding that you claim the pipe handles. Caller must be certain the encoding matches the pipe, | |
* or the resulting decoder may return failed streams when applied. | |
* @example {{{ | |
* // Note this decoder is actually available already | |
* implicit val dc: Decoder[F, Utf8, String] = Decoder.fromPipe[Utf8](fs2.text.utf8Decode[F]) | |
* }}} | |
*/ | |
def fromPipe[BE <: ByteEncoding]: FromPipe[BE] = new FromPipe[BE] | |
/** Like `fromPipe` but instead of ignoring size hints, uses them and has a default when not set */ | |
def fromPipeSized[BE <: ByteEncoding]: FromPipeSized[BE] = | |
new FromPipeSized[BE] | |
class FromPipe[BE <: ByteEncoding] { | |
def apply[F[_], Entity]( | |
pipe: Pipe[F, Byte, Entity]): Decoder[F, BE, Entity] = | |
new PipeDecoder[F, BE, Entity](pipe, _ => pipe) | |
} | |
class FromPipeSized[BE <: ByteEncoding] { | |
def apply[F[_], Entity]( | |
defaultSize: Int, | |
pipe: Int => Pipe[F, Byte, Entity]): Decoder[F, BE, Entity] = | |
new PipeDecoder[F, BE, Entity](pipe(defaultSize), pipe) | |
} | |
} | |
trait Recoder[F[_], In <: ByteEncoding, Out <: ByteEncoding] | |
extends Decoder[F, In, Byte] { self => | |
def recode: Pipe[F, Byte, Byte] = decode | |
def recodeSized(sizeHint: Int): Pipe[F, Byte, Byte] = | |
decodeSized(sizeHint) | |
final def recodeSized(sizeHint: Option[Int]): Pipe[F, Byte, Byte] = | |
sizeHint.fold(recode)(recodeSized) | |
def andThen[Out2 <: ByteEncoding]( | |
rc: Recoder[F, Out, Out2] | |
): Recoder[F, In, Out2] = | |
new Recoder[F, In, Out2] { | |
override def decode: Pipe[F, Byte, Byte] = | |
self.decode.andThen(rc.decode) | |
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Byte] = | |
self.decodeSized(sizeHint).andThen(rc.decodeSized(sizeHint)) | |
} | |
def decodeTo[Entity]( | |
implicit dc: Decoder[F, Out, Entity]): Decoder[F, In, Entity] = | |
new Decoder[F, In, Entity] { | |
override def decode: Pipe[F, Byte, Entity] = | |
self.decode.andThen(dc.decode) | |
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity] = | |
self.decodeSized(sizeHint).andThen(dc.decodeSized(sizeHint)) | |
} | |
} | |
object Recoder extends RecoderInstances { | |
def apply[F[_], BE <: ByteEncoding, BE2 <: ByteEncoding]( | |
implicit rc: Recoder[F, BE, BE2] | |
): Recoder[F, BE, BE2] = rc | |
} | |
} | |
trait ByteEncodingInstances { this: ByteEncoding.type => | |
trait DecoderInstances { | |
implicit def utf8Decoder[F[_]]: Decoder[F, Utf8, String] = | |
Decoder.fromPipe[Utf8](fs2.text.utf8Decode[F]) | |
implicit def cp1252Decoder[F[_]]: Decoder[F, Cp1252, String] = | |
Decoder.forCharset(Charset.forName("CP1252")) | |
implicit def latin1Decoder[F[_]]: Decoder[F, Latin1, String] = | |
Decoder.forCharset(StandardCharsets.ISO_8859_1) | |
// Loses encoding tracking, but needed for certain operations. | |
implicit def bytestringDecoder[F[_], BE <: ByteEncoding] | |
: Decoder[F, BE, ByteString] = | |
Decoder.fromPipe[BE](Fs2AkkaCompat.byteToByteString) | |
implicit def gunzipDecoder[ | |
F[_]: RaiseThrowable, | |
BE <: ByteEncoding, | |
Entity | |
]( | |
implicit dc: Decoder[F, BE, Entity], | |
rc: Recoder[F, Gzip[BE], BE] | |
): Decoder[F, Gzip[BE], Entity] = | |
new Decoder[F, Gzip[BE], Entity] { | |
override def decode: Pipe[F, Byte, Entity] = | |
_.through(rc.recode).through(dc.decode) | |
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Entity] = | |
_.through(rc.recodeSized(sizeHint)).through(dc.decodeSized(sizeHint)) | |
} | |
} | |
trait RecoderInstances { | |
implicit def identityRecoder[F[_], BE <: ByteEncoding]: Recoder[F, BE, BE] = | |
new IdentityRecoder | |
implicit def gunzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding] | |
: Recoder[F, Gzip[BE], BE] = new GunzipRecoder[F, BE] | |
implicit def gzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding] | |
: Recoder[F, BE, Gzip[BE]] = new GzipRecoder[F, BE] | |
implicit def stripUtf8Bom[F[_]]: Recoder[F, Utf8Bom, Utf8] = | |
new Recoder[F, Utf8Bom, Utf8] { | |
override def decode: Pipe[F, Byte, Byte] = | |
Fs2Utils.stripUtf8Bom[F] | |
override def decodeSized(sizeHint: Int): Pipe[F, Byte, Byte] = | |
Fs2Utils.stripUtf8Bom[F] | |
} | |
} | |
protected class PipeDecoder[F[_], BE <: ByteEncoding, Entity]( | |
pipe: Pipe[F, Byte, Entity], | |
sizePipe: Int => Pipe[F, Byte, Entity]) | |
extends Decoder[F, BE, Entity] { | |
final override def decode: Pipe[F, Byte, Entity] = pipe | |
final override def decodeSized(chunkSize: Int): Pipe[F, Byte, Entity] = | |
sizePipe(chunkSize) | |
} | |
protected class SizedPipeDecoder[F[_], BE <: ByteEncoding, Entity]( | |
defaultSize: Int, | |
sizePipe: Int => Pipe[F, Byte, Entity]) | |
extends PipeDecoder[F, BE, Entity](sizePipe(defaultSize), sizePipe) | |
private class IdentityRecoder[F[_], BE <: ByteEncoding] | |
extends PipeDecoder[F, BE, Byte](in => in, _ => in => in) | |
with Recoder[F, BE, BE] | |
private class GunzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding] | |
extends SizedPipeDecoder[F, Gzip[BE], Byte](4096, | |
fs2.compress.gunzip[F](_)) | |
with Recoder[F, Gzip[BE], BE] | |
private class GzipRecoder[F[_]: RaiseThrowable, BE <: ByteEncoding] | |
extends SizedPipeDecoder[F, BE, Byte](4096, fs2.compress.gzip[F]) | |
with Recoder[F, BE, Gzip[BE]] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2._ | |
import teikametrics.ByteEncoding._ | |
/** A more type-safe version of `S3Key => Stream[F, A]`, to avoid byte | |
* encoding errors while also being easy to stub for testing. | |
* | |
* Classes can use this dependency to become ignorant of what encoding their data source uses, | |
* leaving that to a higher composition layer. | |
*/ | |
trait S3Downloader[F[_], Entity] { self => | |
def getEntity(key: S3Key): Stream[F, Entity] | |
final def through[Entity2]( | |
pipe: Pipe[F, Entity, Entity2] | |
): S3Downloader[F, Entity2] = | |
(key: S3Key) => self.getEntity(key).through(pipe) | |
} | |
/** An S3Downloader based on some form of ByteStream with a known encoding */ | |
trait ByteS3Downloader[F[_], Entity] extends S3Downloader[F, Entity] { self => | |
protected type BE <: ByteEncoding | |
protected def getBytes(key: S3Key): ByteStream[F, BE] | |
protected implicit def dc: Decoder[F, BE, Entity] | |
def getEntity(key: S3Key): Stream[F, Entity] = | |
getBytes(key).decode[Entity] | |
} | |
object DummyS3Downloader { | |
def apply[F[_], E](items: Seq[E]) = | |
new DummyS3Downloader[F, E](Stream.emits(items).covary[F]) | |
def empty[F[_], E] = | |
new DummyS3Downloader[F, E](Stream.empty) | |
def error[F[_]: RaiseThrowable, E] = | |
new DummyS3Downloader[F, E]( | |
Stream.raiseError[F]( | |
new IllegalStateException(s"getBytes() called on ErrorS3Downloader"))) | |
} | |
class DummyS3Downloader[F[_], Entity](value: Stream[F, Entity]) | |
extends S3Downloader[F, Entity] { | |
override def getEntity(key: S3Key): Stream[F, Entity] = value | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment