Last active
September 17, 2020 19:59
-
-
Save steinybot/a1f79fe9a67693722164 to your computer and use it in GitHub Desktop.
Akka Stream and HTTP download resource examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import akka.http.javadsl.model.headers.ContentDisposition | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.client.RequestBuilding | |
import akka.http.scaladsl.model.{HttpResponse, Uri} | |
import akka.stream.scaladsl._ | |
import akka.stream.{Graph, Materializer, SinkShape} | |
import akka.util.ByteString | |
import scala.concurrent.Future | |
import scala.util.Try | |
object Downloader extends RequestBuilding { | |
def downloadViaFutures(uri: Uri, file: File): Future[Long] = { | |
val request = Get(uri) | |
val responseFuture = Http().singleRequest(request) | |
responseFuture.flatMap { response => | |
val source = response.entity.dataBytes | |
source.runWith(FileIO.toFile(file)) | |
} | |
} | |
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { | |
case (responseTry, context) => (responseTry.get, context) | |
} | |
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match { | |
case (response, _) => response.entity.dataBytes | |
} | |
def downloadViaFlow(uri: Uri, file: File): Future[Long] = { | |
val request = Get(uri) | |
val source = Source.single((request, ())) | |
val requestResponseFlow = Http().superPool[Unit]() | |
source. | |
via(requestResponseFlow). | |
map(responseOrFail). | |
flatMapConcat(responseToByteSource). | |
runWith(FileIO.toFile(file)) | |
} | |
def destinationFile(downloadDir: File, response: HttpResponse): File = { | |
val fileName = response.header[ContentDisposition].get.value | |
val file = new File(downloadDir, fileName) | |
file.createNewFile() | |
file | |
} | |
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = { | |
val request = Get(uri) | |
val responseFuture = Http().singleRequest(request) | |
responseFuture.flatMap { response => | |
val file = destinationFile(downloadDir, response) | |
val source = response.entity.dataBytes | |
source.runWith(FileIO.toFile(file)) | |
} | |
} | |
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match { | |
case (response, _) => | |
val source = responseToByteSource(in) | |
val file = destinationFile(downloadDir, response) | |
source.map((_, file)) | |
} | |
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = { | |
val sink = FileIO.toFile(destination, true) | |
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right) | |
} | |
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = { | |
val request = Get(uri) | |
val source = Source.single((request, ())) | |
val requestResponseFlow = Http().superPool[Unit]() | |
val sourceWithDest: Source[(ByteString, File), Unit] = source. | |
via(requestResponseFlow). | |
map(responseOrFail). | |
flatMapConcat(responseToByteSourceWithDest(_, downloadDir)) | |
sourceWithDest.runWithMap { | |
case (_, file) => destToSink(file) | |
} | |
} | |
implicit class SourceExt[Out, Mat](source: Source[Out, Mat]) { | |
// TODO: How can we implement this? | |
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ??? | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment