Skip to content

Instantly share code, notes, and snippets.

@trane
Last active May 19, 2017 16:16
Show Gist options
  • Save trane/76e16a1cc1cfd1fa55a7a597ada84f3e to your computer and use it in GitHub Desktop.
Save trane/76e16a1cc1cfd1fa55a7a597ada84f3e to your computer and use it in GitHub Desktop.
package domino.server.sync
import java.io.InputStream
import java.nio.ByteBuffer
import domino.replicator.functional.{Id, Monad}
import domino.replicator.functional.Syntax._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
trait Reader[F[_]] {
def read(n: Int): F[Option[ByteBuffer]]
def discard: Unit
}
object Reader {
type Null[A] = A
type EC[_] = ExecutionContext
def readAll[F[_]: Monad](r: Reader[F]): F[ByteBuffer] = {
def loop(left: ByteBuffer): F[ByteBuffer] =
r.read(Int.MaxValue).flatMap {
case Some(right) => loop(left.put(right))
case None => Monad[F].pure(left)
}
loop(ByteBuffer.allocate(0))
}
private[sync] object NullReader extends Reader[Null] {
override def read(n: Int): Null[Option[ByteBuffer]] = None
override def discard: Unit = ()
}
private[sync] case class InputStreamReader[F[_]: Monad](is: InputStream, bufferSize: Int) extends Reader[F] {
private val EOF = -1
override def read(n: Int): F[Option[ByteBuffer]] = {
val arr = Array.fill[Byte](Math.min(n, bufferSize))(0)
Monad[F].pure {
is.read(arr, 0, n) match {
case EOF => None
case _ => Some(ByteBuffer.wrap(arr))
}
}
}
override def discard: Unit = is.close()
}
def asyncInputReader[_: EC](is: InputStream, bufferSize: Int): Reader[Future] =
new InputStreamReader[Future](is, bufferSize)
def syncInputReader(is: InputStream, bufferSize: Int): Reader[Id] =
new InputStreamReader[Id](is, bufferSize)
val empty = NullReader
}
import java.io.OutputStream
import java.nio.ByteBuffer
import domino.replicator.functional.Monad
import scala.language.higherKinds
trait Writer[F[_]] {
def write(buf: ByteBuffer): F[Unit]
}
object Writer {
def outWriter[F[_]: Monad](os: OutputStream, bufferSize: Int): Writer[F] =
OutputStreamWriter(os, bufferSize)
private[sync] case class OutputStreamWriter[F[_]: Monad](os: OutputStream, bufferSize: Int) extends Writer[F] {
private[this] val bytes = Array.fill[Byte](bufferSize)(0)
override def write(buf: ByteBuffer): F[Unit] = {
Monad[F].pure {
val n = Math.min(bufferSize, buf.remaining())
buf.get(bytes, 0, n)
os.write(bytes, 0, n)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment