Last active
September 15, 2015 13:11
-
-
Save olix0r/5ba0569ef39af48d3351 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class WatchEvent(kind: String) | |
class Api(client: Service[Request, Resopnse]) { | |
def mkreq(): Request = ??? | |
def watch(): AsyncStream[WatchEvent] = | |
for { | |
rsp <- AsyncStream.fromFuture(client(mkreq())) | |
watch <- rsp.status match { | |
case Status.Ok => | |
Json.readStream[WatchEvent](rsp.reader) | |
case _ => | |
AsyncStream.fromFuture( | |
Future.exception(UnexpectedResponse(rsp))) | |
} | |
} yield watch | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Json { | |
private[this] val mapper = | |
new ObjectMapper with ScalaObjectMapper | |
private[this] lazy val factory: JsonFactory = mapper.getFactory | |
/** | |
* Given a chunk of bytes, read a stream of objects, and return the remaining unread buffer. | |
*/ | |
def readChunked[T: Manifest](chunk: Buf): (Seq[T], Buf) = { | |
var objs = mutable.Buffer.empty[T] | |
var offset = 0L | |
parse(chunk) { json => | |
var reading = true | |
while (reading) { | |
try { | |
json.readValueAs(classManifest[T].erasure) match { | |
case obj: T if obj != null => | |
objs.append(obj) | |
offset = json.getCurrentLocation.getByteOffset | |
reading = offset < chunk.length | |
case _ => | |
val Buf.Utf8(chunkstr) = chunk | |
throw new IllegalStateException( | |
s"could not decode json object in chunk @ ${offset} bytes: ${chunkstr}" | |
) | |
} | |
} catch { | |
case Incomplete() => | |
reading = false | |
} | |
} | |
} | |
val rest = chunk.slice(offset.toInt, chunk.length) | |
(objs, rest) | |
} | |
def readStream[T: Manifest](reader: Reader, bufsiz: Int = 8 * 1024): AsyncStream[T] = { | |
def chunks(init: Buf): AsyncStream[T] = | |
for { | |
read <- AsyncStream.fromFuture(reader.read(bufsiz)) | |
buf <- AsyncStream.fromOption(read) | |
item <- { | |
val (items, tail) = readChunked[T](init concat buf) | |
AsyncStream.fromSeq(items) concat chunks(tail) | |
} | |
} yield item | |
chunks(Buf.Empty) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment