Skip to content

Instantly share code, notes, and snippets.

@rklaehn
Last active June 8, 2020 12:38
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save rklaehn/3f26c3f80e5870831f52 to your computer and use it in GitHub Desktop.
Save rklaehn/3f26c3f80e5870831f52 to your computer and use it in GitHub Desktop.
akka http file server
package akkahttptest
import akka.http.Http
import akka.stream.ActorFlowMaterializer
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import akka.http.model._
object TestClient extends App {
implicit val system = ActorSystem("ServerTest")
implicit val materializer = ActorFlowMaterializer()
val host = "127.0.0.1"
val httpClient = Http(system).outgoingConnection(host, 80)
val printChunksConsumer = Sink.foreach[HttpResponse] { res =>
if(res.status == StatusCodes.OK) {
res.entity.getDataBytes().map { chunk =>
System.out.write(chunk.toArray)
System.out.flush()
}.to(Sink.ignore).run()
} else
println(res.status)
}
val finishFuture = Source.single(HttpRequest()).via(httpClient).runWith(printChunksConsumer)
System.in.read()
system.shutdown()
system.awaitTermination()
}
package akkahttptest
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}
import akka.actor.ActorSystem
import akka.http.Http
import akka.http.model.HttpEntity.ChunkStreamPart
import akka.http.model._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, Timeout}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)
override def hasNext = buffer.hasRemaining
override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}
object Main extends App {
def map(path: Path) : MappedByteBuffer = {
val channel = FileChannel.open(path, StandardOpenOption.READ)
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size())
channel.close()
result
}
implicit val system = ActorSystem()
implicit val materializer = ActorFlowMaterializer()
implicit val askTimeout: Timeout = 500.millis
import HttpMethods._
val requestHandler: HttpRequest ⇒ HttpResponse = {
case HttpRequest(GET, uri, headers, _, _) =>
val path = Paths.get(uri.path.toString())
val result = Try {
val mappedByteBuffer = map(path)
val iterator = new ByteBufferIterator(mappedByteBuffer, 4096)
val chunks = Source(() => iterator).map { x =>
println("Chunk of size " + x.size)
ChunkStreamPart(x)
}
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks))
} recover {
case NonFatal(cause) =>
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get
case _: HttpRequest ⇒ HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection =>
// foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
// ... and then actually handle the connection
connection.handleWithSyncHandler(requestHandler)
}).run()
System.in.read()
system.shutdown()
system.awaitTermination()
}
@abrighton
Copy link

Thanks for the example!
In the last line, I had to replace "connect" with "to" to get it to compile.

@rklaehn
Copy link
Author

rklaehn commented Nov 20, 2014

Which version of akka-http are you using? I am using 0.10-M1

@abrighton
Copy link

I was using 0.11. Maybe that is why.

@briantopping
Copy link

I just forked this with a few edits for 0.11 (my first time using Gist, please don't hate on me! :) Not sure what to do with the Sink.future. I also forked a client at https://github.com/topping/akka-http-stream-example/blob/master/akka-http-example/src/main/scala/HttpClient.scala, but it also has errors. Hmm!

@rklaehn
Copy link
Author

rklaehn commented Nov 20, 2014

It seems like Sink.future is now called Sink.head, since it just uses the first element (head) of the stream. And connect is called to. That should help you fix your compile errors.

@abrighton
Copy link

Here is a modified version of your test client and server that work with 0.11:

https://gist.github.com/abrighton/acd43a6cd9c0b997c456

The chunking part seems to work. You just need to be careful not to shutdown before it is done.

@rklaehn
Copy link
Author

rklaehn commented Dec 23, 2014

I updated the example to work with 1.0-M2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment