Skip to content

Instantly share code, notes, and snippets.

@rklaehn
Last active June 8, 2020 12:38
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save rklaehn/3f26c3f80e5870831f52 to your computer and use it in GitHub Desktop.
Save rklaehn/3f26c3f80e5870831f52 to your computer and use it in GitHub Desktop.
akka http file server
package akkahttptest
import akka.http.Http
import akka.stream.ActorFlowMaterializer
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import akka.http.model._
object TestClient extends App {
implicit val system = ActorSystem("ServerTest")
implicit val materializer = ActorFlowMaterializer()
val host = "127.0.0.1"
val httpClient = Http(system).outgoingConnection(host, 80)
val printChunksConsumer = Sink.foreach[HttpResponse] { res =>
if(res.status == StatusCodes.OK) {
res.entity.getDataBytes().map { chunk =>
System.out.write(chunk.toArray)
System.out.flush()
}.to(Sink.ignore).run()
} else
println(res.status)
}
val finishFuture = Source.single(HttpRequest()).via(httpClient).runWith(printChunksConsumer)
System.in.read()
system.shutdown()
system.awaitTermination()
}
package akkahttptest
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}
import akka.actor.ActorSystem
import akka.http.Http
import akka.http.model.HttpEntity.ChunkStreamPart
import akka.http.model._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, Timeout}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)
override def hasNext = buffer.hasRemaining
override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}
object Main extends App {
def map(path: Path) : MappedByteBuffer = {
val channel = FileChannel.open(path, StandardOpenOption.READ)
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size())
channel.close()
result
}
implicit val system = ActorSystem()
implicit val materializer = ActorFlowMaterializer()
implicit val askTimeout: Timeout = 500.millis
import HttpMethods._
val requestHandler: HttpRequest ⇒ HttpResponse = {
case HttpRequest(GET, uri, headers, _, _) =>
val path = Paths.get(uri.path.toString())
val result = Try {
val mappedByteBuffer = map(path)
val iterator = new ByteBufferIterator(mappedByteBuffer, 4096)
val chunks = Source(() => iterator).map { x =>
println("Chunk of size " + x.size)
ChunkStreamPart(x)
}
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks))
} recover {
case NonFatal(cause) =>
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get
case _: HttpRequest ⇒ HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection =>
// foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
// ... and then actually handle the connection
connection.handleWithSyncHandler(requestHandler)
}).run()
System.in.read()
system.shutdown()
system.awaitTermination()
}
@abrighton
Copy link

Here is a modified version of your test client and server that work with 0.11:

https://gist.github.com/abrighton/acd43a6cd9c0b997c456

The chunking part seems to work. You just need to be careful not to shutdown before it is done.

@rklaehn
Copy link
Author

rklaehn commented Dec 23, 2014

I updated the example to work with 1.0-M2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment