Skip to content

Instantly share code, notes, and snippets.

@olix0r
Last active September 15, 2015 13:11
Show Gist options
  • Save olix0r/5ba0569ef39af48d3351 to your computer and use it in GitHub Desktop.
Save olix0r/5ba0569ef39af48d3351 to your computer and use it in GitHub Desktop.
case class WatchEvent(kind: String)
class Api(client: Service[Request, Resopnse]) {
def mkreq(): Request = ???
def watch(): AsyncStream[WatchEvent] =
for {
rsp <- AsyncStream.fromFuture(client(mkreq()))
watch <- rsp.status match {
case Status.Ok =>
Json.readStream[WatchEvent](rsp.reader)
case _ =>
AsyncStream.fromFuture(
Future.exception(UnexpectedResponse(rsp)))
}
} yield watch
}
object Json {
private[this] val mapper =
new ObjectMapper with ScalaObjectMapper
private[this] lazy val factory: JsonFactory = mapper.getFactory
/**
* Given a chunk of bytes, read a stream of objects, and return the remaining unread buffer.
*/
def readChunked[T: Manifest](chunk: Buf): (Seq[T], Buf) = {
var objs = mutable.Buffer.empty[T]
var offset = 0L
parse(chunk) { json =>
var reading = true
while (reading) {
try {
json.readValueAs(classManifest[T].erasure) match {
case obj: T if obj != null =>
objs.append(obj)
offset = json.getCurrentLocation.getByteOffset
reading = offset < chunk.length
case _ =>
val Buf.Utf8(chunkstr) = chunk
throw new IllegalStateException(
s"could not decode json object in chunk @ ${offset} bytes: ${chunkstr}"
)
}
} catch {
case Incomplete() =>
reading = false
}
}
}
val rest = chunk.slice(offset.toInt, chunk.length)
(objs, rest)
}
def readStream[T: Manifest](reader: Reader, bufsiz: Int = 8 * 1024): AsyncStream[T] = {
def chunks(init: Buf): AsyncStream[T] =
for {
read <- AsyncStream.fromFuture(reader.read(bufsiz))
buf <- AsyncStream.fromOption(read)
item <- {
val (items, tail) = readChunked[T](init concat buf)
AsyncStream.fromSeq(items) concat chunks(tail)
}
} yield item
chunks(Buf.Empty)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment