Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
akka http file server
package akkahttptest
import akka.http.Http
import akka.stream.ActorFlowMaterializer
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import akka.http.model._
object TestClient extends App {
implicit val system = ActorSystem("ServerTest")
implicit val materializer = ActorFlowMaterializer()
val host = "127.0.0.1"
val httpClient = Http(system).outgoingConnection(host, 80)
val printChunksConsumer = Sink.foreach[HttpResponse] { res =>
if(res.status == StatusCodes.OK) {
res.entity.getDataBytes().map { chunk =>
System.out.write(chunk.toArray)
System.out.flush()
}.to(Sink.ignore).run()
} else
println(res.status)
}
val finishFuture = Source.single(HttpRequest()).via(httpClient).runWith(printChunksConsumer)
System.in.read()
system.shutdown()
system.awaitTermination()
}
package akkahttptest
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}
import akka.actor.ActorSystem
import akka.http.Http
import akka.http.model.HttpEntity.ChunkStreamPart
import akka.http.model._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, Timeout}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)
override def hasNext = buffer.hasRemaining
override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}
object Main extends App {
def map(path: Path) : MappedByteBuffer = {
val channel = FileChannel.open(path, StandardOpenOption.READ)
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size())
channel.close()
result
}
implicit val system = ActorSystem()
implicit val materializer = ActorFlowMaterializer()
implicit val askTimeout: Timeout = 500.millis
import HttpMethods._
val requestHandler: HttpRequest ⇒ HttpResponse = {
case HttpRequest(GET, uri, headers, _, _) =>
val path = Paths.get(uri.path.toString())
val result = Try {
val mappedByteBuffer = map(path)
val iterator = new ByteBufferIterator(mappedByteBuffer, 4096)
val chunks = Source(() => iterator).map { x =>
println("Chunk of size " + x.size)
ChunkStreamPart(x)
}
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks))
} recover {
case NonFatal(cause) =>
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get
case _: HttpRequest ⇒ HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection =>
// foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
// ... and then actually handle the connection
connection.handleWithSyncHandler(requestHandler)
}).run()
System.in.read()
system.shutdown()
system.awaitTermination()
}
@abrighton

This comment has been minimized.

Copy link

abrighton commented Nov 19, 2014

Thanks for the example!
In the last line, I had to replace "connect" with "to" to get it to compile.

@rklaehn

This comment has been minimized.

Copy link
Owner Author

rklaehn commented Nov 20, 2014

Which version of akka-http are you using? I am using 0.10-M1

@abrighton

This comment has been minimized.

Copy link

abrighton commented Nov 20, 2014

I was using 0.11. Maybe that is why.

@briantopping

This comment has been minimized.

Copy link

briantopping commented Nov 20, 2014

I just forked this with a few edits for 0.11 (my first time using Gist, please don't hate on me! :) Not sure what to do with the Sink.future. I also forked a client at https://github.com/topping/akka-http-stream-example/blob/master/akka-http-example/src/main/scala/HttpClient.scala, but it also has errors. Hmm!

@rklaehn

This comment has been minimized.

Copy link
Owner Author

rklaehn commented Nov 20, 2014

It seems like Sink.future is now called Sink.head, since it just uses the first element (head) of the stream. And connect is called to. That should help you fix your compile errors.

@abrighton

This comment has been minimized.

Copy link

abrighton commented Nov 20, 2014

Here is a modified version of your test client and server that work with 0.11:

https://gist.github.com/abrighton/acd43a6cd9c0b997c456

The chunking part seems to work. You just need to be careful not to shutdown before it is done.

@rklaehn

This comment has been minimized.

Copy link
Owner Author

rklaehn commented Dec 23, 2014

I updated the example to work with 1.0-M2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.