-
-
Save jrudolph/08d0d28e1eddcd64dbd0 to your computer and use it in GitHub Desktop.
package akka.http.scaladsl | |
import java.io.File | |
import akka.http.scaladsl.unmarshalling.Unmarshal | |
import akka.util.ByteString | |
import scala.concurrent.duration._ | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http.ServerBinding | |
import akka.http.scaladsl.marshalling.Marshal | |
import akka.http.scaladsl.model._ | |
import akka.http.scaladsl.server.Route | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.io.SynchronousFileSource | |
import akka.stream.scaladsl.Source | |
import com.typesafe.config.{ ConfigFactory, Config } | |
import scala.concurrent.Future | |
object TestMultipartFileUpload extends App { | |
val testConf: Config = ConfigFactory.parseString(""" | |
akka.loglevel = INFO | |
akka.log-dead-letters = off""") | |
implicit val system = ActorSystem("ServerTest", testConf) | |
import system.dispatcher | |
implicit val materializer = ActorFlowMaterializer() | |
val testFile = new File(args(0)) | |
def startTestServer(): Future[ServerBinding] = { | |
import akka.http.scaladsl.server.Directives._ | |
val route: Route = | |
path("upload") { | |
entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) ⇒ | |
val fileNamesFuture = formdata.parts.mapAsync(1) { p ⇒ | |
println(s"Got part. name: ${p.name} filename: ${p.filename}") | |
println("Counting size...") | |
@volatile var lastReport = System.currentTimeMillis() | |
@volatile var lastSize = 0L | |
def receiveChunk(counter: (Long, Long), chunk: ByteString): (Long, Long) = { | |
val (oldSize, oldChunks) = counter | |
val newSize = oldSize + chunk.size | |
val newChunks = oldChunks + 1 | |
val now = System.currentTimeMillis() | |
if (now > lastReport + 1000) { | |
val lastedTotal = now - lastReport | |
val bytesSinceLast = newSize - lastSize | |
val speedMBPS = bytesSinceLast.toDouble / 1000000 /* bytes per MB */ / lastedTotal * 1000 /* millis per second */ | |
println(f"Already got $newChunks%7d chunks with total size $newSize%11d bytes avg chunksize ${newSize / newChunks}%7d bytes/chunk speed: $speedMBPS%6.2f MB/s") | |
lastReport = now | |
lastSize = newSize | |
} | |
(newSize, newChunks) | |
} | |
p.entity.dataBytes.runFold((0L, 0L))(receiveChunk).map { | |
case (size, numChunks) ⇒ | |
println(s"Size is $size") | |
(p.name, p.filename, size) | |
} | |
}.runFold(Seq.empty[(String, Option[String], Long)])(_ :+ _).map(_.mkString(", ")) | |
complete { | |
fileNamesFuture | |
} | |
} | |
} | |
Http().bindAndHandle(route, interface = "localhost", port = 0) | |
} | |
def createEntity(file: File): Future[RequestEntity] = { | |
require(file.exists()) | |
val formData = | |
Multipart.FormData( | |
Source.single( | |
Multipart.FormData.BodyPart( | |
"test", | |
HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000)), // the chunk size here is currently critical for performance | |
Map("filename" -> file.getName)))) | |
Marshal(formData).to[RequestEntity] | |
} | |
def createRequest(target: Uri, file: File): Future[HttpRequest] = | |
for { | |
e ← createEntity(file) | |
} yield HttpRequest(HttpMethods.POST, uri = target, entity = e) | |
try { | |
val result = | |
for { | |
ServerBinding(address) ← startTestServer() | |
_ = println(s"Server up at $address") | |
port = address.getPort | |
target = Uri(scheme = "http", authority = Uri.Authority(Uri.Host("localhost"), port = port), path = Uri.Path("/upload")) | |
req ← createRequest(target, testFile) | |
_ = println(s"Running request, uploading test file of size ${testFile.length} bytes") | |
response ← Http().singleRequest(req) | |
responseBodyAsString ← Unmarshal(response).to[String] | |
} yield responseBodyAsString | |
result.onComplete { res ⇒ | |
println(s"The result was $res") | |
system.shutdown() | |
} | |
system.scheduler.scheduleOnce(60.seconds) { | |
println("Shutting down after timeout...") | |
system.shutdown() | |
} | |
} catch { | |
case _: Throwable ⇒ system.shutdown() | |
} | |
} |
I have the following server
object FileUploadServer {
implicit val system = ActorSystem("fileUploadServer")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val route =
path("upload") {
post {
extractRequest { request =>
println(request)
val file = File.createTempFile("debug",".zip")
val futureDone: Future[Long] = request.entity.dataBytes.runWith(Sink.file(file))
onComplete(futureDone) { result =>
complete(s"path:${file.getAbsolutePath}, size:${result.get}")
}
}
}
}
def main(args: Array[String]) {
val serverFuture: Future[ServerBinding] = Http().bindAndHandle(route, "localhost", 8080)
println("FileUpload server is running at http://localhost:8080\nPress RETURN to stop ...")
readLine()
serverFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
}
and I have a following property in application.conf
akka.http.parsing.max-content-length = 1000m
But when I try to upload a 300MB
zip file, it returns only when 159MB
$ time curl -X POST -H 'Content-Type: application/octet-stream' -d @bluecoat_proxy_big.zip http://localhost:8080/upload
path:/var/folders/_2/q9xz_8ks73d0h6hq80c7439s8_x7qx/T/debug3960545903381659311.zip, size:155858257
real 0m1.426s
user 0m0.375s
sys 0m0.324s
Any ideas?
Thanks, Has it "akka.stream.io.SynchronousFileSource"? been moved to somewhere else? or deprecated?
This needs to replace SynchronousFileSource(file, chunkSize = 100000)
with FileIO.fromFile(file, chunkSize = 100000))
and ActorFlowMaterializer
with ActorMaterializer
to compile
actually, there is 1048576 (1024 * 1024) bytes per megabyte
I need an easy way to get form processing in scala.
All I need is:
a POST form of multipart/form-data type is submitted. I want
- all submitted files (which may be of >200Mb size) to be saved as temp files to disk, and
- submitted form fields received as say List[(String,String)].
Basically a Seq of posted files and a Seq of form parameters available simultaneously
I cannot find this functionality in akka. Avaiable solutions such as
will randomly fain on no-strict entities
Thanks. Note that
ActorFlowMaterializer
was renamed toActorMaterializer
.To increase the size of file uploads, set this option in
application.conf
:akka.http.server.parsing.max-content-length = 512m