Skip to content

Instantly share code, notes, and snippets.

@abrighton
Last active Aug 29, 2015
Embed
What would you like to do?
package test
import java.io.FileOutputStream
import akka.actor.ActorSystem
import akka.http.Http
import akka.http.model.HttpMethods._
import akka.http.model._
import akka.io.IO
import akka.pattern.ask
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ForeachSink, Sink, Source}
import akka.util.{ByteString, Timeout}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Try, Failure, Success}
// The test client tries to get /tmp/test.txt by getting http://localhost:8549/test.txt
// and stores the result in /tmp/out.txt.
object TestClient extends App {
implicit val system = ActorSystem("ServerTest")
import system.dispatcher
implicit val materializer = FlowMaterializer()
implicit val askTimeout: Timeout = 5000.millis
// Server info
val host = "localhost"
val port = 8549
// File under /tmp to get
val file = "test.txt"
// The local file in which to store the received data
val localFile = "/tmp/out.txt"
val out = new FileOutputStream(localFile)
val sink = ForeachSink[ByteString] { bytes =>
println(s"XXX writing ${bytes.size} bytes to $localFile")
out.write(bytes.toArray)
}
val result = for {
connection IO(Http).ask(Http.Connect(host, port)).mapTo[Http.OutgoingConnection]
response sendRequest(HttpRequest(GET, uri = s"/$file"), connection)
} yield response
def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = {
Source(List(request -> 'NoContext))
.to(Sink(connection.requestSubscriber))
.run()
Source(connection.responsePublisher).map(_._1).runWith(Sink.head)
}
result onComplete {
case Success(res) if res.status == StatusCodes.OK
val materialized = res.entity.getDataBytes().to(sink).run()
// ensure the output file is closed (otherwise some bytes may not be written)
materialized.get(sink).onComplete {
case Success(_) =>
println("Success: closing the file")
Try(out.close())
system.shutdown()
case Failure(e) =>
println(s"Failure: ${e.getMessage}")
Try(out.close())
system.shutdown()
}
case Success(res)
println(s"Got HTTP response code ${res.status}")
system.shutdown()
case Failure(error) println(s"Error: $error")
system.shutdown()
}
}
package test
import java.io.File
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}
import akka.actor.ActorSystem
import akka.http.Http
import akka.http.model.HttpEntity.ChunkStreamPart
import akka.http.model._
import akka.io.IO
import akka.pattern.ask
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, Timeout}
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
// A test HTTP server that returns files with a given name from /tmp.
// The test client tries to get /tmp/test.txt by getting http://localhost:8549/test.txt
// and stores the result in /tmp/out.txt.
object TestServer extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = FlowMaterializer()
implicit val askTimeout: Timeout = 5000.millis
val host = "localhost"
val port = 8549
// Serve files from this directory
val dir = new File("/tmp")
val chunkSize = 4096
import akka.http.model.HttpMethods._
val requestHandler: HttpRequest HttpResponse = {
case HttpRequest(GET, uri, headers, _, _)
val path = makePath(dir, new File(uri.path.toString()))
println(s"Received request for $path (uri = $uri)")
val result = Try {
val mappedByteBuffer = mmap(path)
val iterator = new ByteBufferIterator(mappedByteBuffer, chunkSize)
val chunks = Source(iterator).map(ChunkStreamPart.apply)
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks))
} recover {
case NonFatal(cause)
println(s"Nonfatal error: cause = ${cause.getMessage}")
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get
case _: HttpRequest HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}
val bindingFuture = IO(Http) ? Http.Bind(host, port)
bindingFuture foreach {
case Http.ServerBinding(localAddress, connectionStream)
println(s"Listening on http:/$localAddress/")
Source(connectionStream).foreach {
case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer)
println(s"Accepted new connection from $remoteAddress")
val materialized = Source(requestProducer).map(requestHandler).to(Sink(responseConsumer)).run()
}
}
bindingFuture.recover {
case ex =>
println("error: " + ex.getMessage)
system.shutdown()
}
def mmap(path: Path): MappedByteBuffer = {
val channel = FileChannel.open(path, StandardOpenOption.READ)
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size())
channel.close()
result
}
class ByteBufferIterator(buffer: ByteBuffer, chunkSize: Int) extends Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)
override def hasNext = buffer.hasRemaining
override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}
def makePath(dir: File, file: File): Path = {
Paths.get(dir.getPath, file.getName)
}
}
@abrighton
Copy link
Author

abrighton commented Nov 20, 2014

Note: This is based on:
val akkaStream = "com.typesafe.akka" %% "akka-stream-experimental" % "0.11"
val akkaHttp = "com.typesafe.akka" %% "akka-http-experimental" % "0.11"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment