Skip to content

Instantly share code, notes, and snippets.

@jrudolph
Last active February 13, 2023 18:09
Show Gist options
  • Star 33 You must be signed in to star a gist
  • Fork 15 You must be signed in to fork a gist
  • Save jrudolph/08d0d28e1eddcd64dbd0 to your computer and use it in GitHub Desktop.
Save jrudolph/08d0d28e1eddcd64dbd0 to your computer and use it in GitHub Desktop.
akka-http Multipart file-upload client + server example
package akka.http.scaladsl
import java.io.File
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.util.ByteString
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import akka.stream.ActorFlowMaterializer
import akka.stream.io.SynchronousFileSource
import akka.stream.scaladsl.Source
import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.Future
object TestMultipartFileUpload extends App {
val testConf: Config = ConfigFactory.parseString("""
akka.loglevel = INFO
akka.log-dead-letters = off""")
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
val testFile = new File(args(0))
def startTestServer(): Future[ServerBinding] = {
import akka.http.scaladsl.server.Directives._
val route: Route =
path("upload") {
entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) ⇒
val fileNamesFuture = formdata.parts.mapAsync(1) { p ⇒
println(s"Got part. name: ${p.name} filename: ${p.filename}")
println("Counting size...")
@volatile var lastReport = System.currentTimeMillis()
@volatile var lastSize = 0L
def receiveChunk(counter: (Long, Long), chunk: ByteString): (Long, Long) = {
val (oldSize, oldChunks) = counter
val newSize = oldSize + chunk.size
val newChunks = oldChunks + 1
val now = System.currentTimeMillis()
if (now > lastReport + 1000) {
val lastedTotal = now - lastReport
val bytesSinceLast = newSize - lastSize
val speedMBPS = bytesSinceLast.toDouble / 1000000 /* bytes per MB */ / lastedTotal * 1000 /* millis per second */
println(f"Already got $newChunks%7d chunks with total size $newSize%11d bytes avg chunksize ${newSize / newChunks}%7d bytes/chunk speed: $speedMBPS%6.2f MB/s")
lastReport = now
lastSize = newSize
}
(newSize, newChunks)
}
p.entity.dataBytes.runFold((0L, 0L))(receiveChunk).map {
case (size, numChunks) ⇒
println(s"Size is $size")
(p.name, p.filename, size)
}
}.runFold(Seq.empty[(String, Option[String], Long)])(_ :+ _).map(_.mkString(", "))
complete {
fileNamesFuture
}
}
}
Http().bindAndHandle(route, interface = "localhost", port = 0)
}
def createEntity(file: File): Future[RequestEntity] = {
require(file.exists())
val formData =
Multipart.FormData(
Source.single(
Multipart.FormData.BodyPart(
"test",
HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000)), // the chunk size here is currently critical for performance
Map("filename" -> file.getName))))
Marshal(formData).to[RequestEntity]
}
def createRequest(target: Uri, file: File): Future[HttpRequest] =
for {
e ← createEntity(file)
} yield HttpRequest(HttpMethods.POST, uri = target, entity = e)
try {
val result =
for {
ServerBinding(address) ← startTestServer()
_ = println(s"Server up at $address")
port = address.getPort
target = Uri(scheme = "http", authority = Uri.Authority(Uri.Host("localhost"), port = port), path = Uri.Path("/upload"))
req ← createRequest(target, testFile)
_ = println(s"Running request, uploading test file of size ${testFile.length} bytes")
response ← Http().singleRequest(req)
responseBodyAsString ← Unmarshal(response).to[String]
} yield responseBodyAsString
result.onComplete { res ⇒
println(s"The result was $res")
system.shutdown()
}
system.scheduler.scheduleOnce(60.seconds) {
println("Shutting down after timeout...")
system.shutdown()
}
} catch {
case _: Throwable ⇒ system.shutdown()
}
}
@tindzk
Copy link

tindzk commented Sep 17, 2015

Thanks. Note that ActorFlowMaterializer was renamed to ActorMaterializer.

To increase the size of file uploads, set this option in application.conf:
akka.http.server.parsing.max-content-length = 512m

@hhimanshu
Copy link

I have the following server

object FileUploadServer {
  implicit val system = ActorSystem("fileUploadServer")
  implicit val materializer = ActorMaterializer()
  implicit val ec = system.dispatcher


  val route =
    path("upload") {
      post {
        extractRequest { request =>
          println(request)
          val file = File.createTempFile("debug",".zip")
          val futureDone: Future[Long] = request.entity.dataBytes.runWith(Sink.file(file))

          onComplete(futureDone) { result =>
            complete(s"path:${file.getAbsolutePath}, size:${result.get}")
          }
        }
      }
    }

  def main(args: Array[String]) {
    val serverFuture: Future[ServerBinding] = Http().bindAndHandle(route, "localhost", 8080)

    println("FileUpload server is running at http://localhost:8080\nPress RETURN to stop ...")
    readLine()

    serverFuture
      .flatMap(_.unbind())
      .onComplete(_ => system.terminate())
  }
}

and I have a following property in application.conf

akka.http.parsing.max-content-length = 1000m

But when I try to upload a 300MB zip file, it returns only when 159MB

$ time curl -X POST -H 'Content-Type: application/octet-stream' -d @bluecoat_proxy_big.zip http://localhost:8080/upload 
path:/var/folders/_2/q9xz_8ks73d0h6hq80c7439s8_x7qx/T/debug3960545903381659311.zip, size:155858257
real    0m1.426s
user    0m0.375s
sys 0m0.324s

Any ideas?

@schikkam
Copy link

Thanks, Has it "akka.stream.io.SynchronousFileSource"? been moved to somewhere else? or deprecated?

@ariskk
Copy link

ariskk commented May 25, 2016

This needs to replace SynchronousFileSource(file, chunkSize = 100000) with FileIO.fromFile(file, chunkSize = 100000))
and ActorFlowMaterializer with ActorMaterializer to compile

@NikichXP
Copy link

actually, there is 1048576 (1024 * 1024) bytes per megabyte

@mal19992
Copy link

mal19992 commented Oct 30, 2021

I need an easy way to get form processing in scala.

All I need is:

a POST form of multipart/form-data type is submitted. I want

  1. all submitted files (which may be of >200Mb size) to be saved as temp files to disk, and
  2. submitted form fields received as say List[(String,String)].

Basically a Seq of posted files and a Seq of form parameters available simultaneously
I cannot find this functionality in akka. Avaiable solutions such as

akka/akka-http#807

will randomly fain on no-strict entities

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment