Skip to content

Instantly share code, notes, and snippets.

@olix0r
Last active May 15, 2017 03:31
Show Gist options
  • Save olix0r/08fe4836c9dd533542c0 to your computer and use it in GitHub Desktop.
Save olix0r/08fe4836c9dd533542c0 to your computer and use it in GitHub Desktop.
testing streaming
import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.time._
import com.twitter.finagle._
import com.twitter.finagle.util.DefaultTimer
import com.twitter.io.{Buf, Reader}
import com.twitter.util._
object StreamProxy {
implicit val timer = DefaultTimer.twitter
val CHUNKS = 60
// the "downstream" service generates some data every second for a minute
val app = Service.mk[http.Request, http.Response] { req =>
val writer = Reader.writable()
def chunkLoop(n: Int): Unit =
if (n == CHUNKS) {
println("write: close")
writer.close()
} else {
Future.sleep(1.second).before {
val chunk = s"chunk $n"
println(s"write> $chunk")
writer.write(Buf.Utf8(s"$chunk\n")).respond {
case Return(_) => chunkLoop(n+1)
case Throw(e) => println(s"write: error ${e.getMessage}")
}
}
}
chunkLoop(0) // runs asynchronously without blocking response
Future.value(http.Response(req.version, http.Status.Ok, writer))
}
val appServer = Http.server.withStreaming(true).serve("127.1:8081", app)
// we proxy requests to it (without doing anything to the response bodies)
val appClient = Http.client.withStreaming(true).newService("/$/inet/127.1/8081")
val proxy = Service.mk[http.Request, http.Response] { req =>
// insert whatever proxy logic you want here
println(s"proxying req: $req")
appClient(req)
}
val proxyServer = Http.server.withStreaming(true).serve("127.1:8080", proxy)
val proxyClient = Http.client.withStreaming(true).newService("/$/inet/127.1/8080")
val closable = Closable.sequence(proxyClient, proxyServer, appClient, appServer)
def close(): Unit = Await.result(closable.close(), 1.second)
def test(client: Service[http.Request, http.Response]): Unit = {
// When we make a request on the proxy, we should get back a response immediately.
val rsp = Await.result(client(http.Request()), 500.millis)
assert(rsp.status == http.Status.Ok)
// then we should receive all of the chunks asynchronously
// there's probably a better way to do this with AsyncStream
val reader = rsp.reader
def readLoop(): Future[Unit] =
reader.read(100).transform {
case Return(Some(Buf.Utf8(chunk))) =>
println(s"read> $chunk")
readLoop()
case Return(None) =>
println(s"read: closed")
Future.Unit
case Throw(e) =>
println(s"read: error ${e.getMessage}")
Future.Unit
}
val done = readLoop()
Await.result(done, 1.minute + 10.seconds)
}
def testDirect(): Unit = test(appClient)
def testProxied(): Unit = test(proxyClient)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment