Last active
November 30, 2016 18:25
-
-
Save jasonmartens/ed6fd6f8aeabc5846aa2 to your computer and use it in GitHub Desktop.
Akka Large File Downloader Hangs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "S3Downloader" | |
version := "1.0" | |
scalaVersion := "2.11.6" | |
val akkaStreamV = "1.0-RC3" | |
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.11" | |
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.9.31" | |
libraryDependencies += "com.typesafe.akka" % "akka-stream-experimental_2.11" % akkaStreamV | |
libraryDependencies += "com.typesafe.akka" % "akka-http-core-experimental_2.11" % akkaStreamV | |
libraryDependencies += "com.typesafe.akka" % "akka-http-experimental_2.11" % akkaStreamV | |
libraryDependencies += "com.typesafe.akka" % "akka-http-testkit-experimental_2.11" % akkaStreamV |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jasonmartens.s3downloader | |
import akka.actor.Props | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.headers.{ByteRange, Range} | |
import akka.http.scaladsl.model.{HttpRequest, HttpResponse} | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.actor.ActorPublisher | |
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request} | |
import akka.stream.scaladsl.Source | |
import akka.util.ByteString | |
import com.jasonmartens.s3downloader.ChunkPublisher.{DownloadTimeout, ChunkDownloaded, DownloadFailed, RequestChunk} | |
import scala.collection.mutable | |
import scala.concurrent.duration.FiniteDuration | |
import scala.concurrent.{ExecutionContextExecutor, Future} | |
import scala.util.Try | |
import scala.concurrent.duration._ | |
/** | |
* Created by Jason Martens <me@jasonmartens.com> on 6/12/15. | |
* | |
*/ | |
object ChunkPublisher { | |
case class DownloadedChunk(data: ByteString) | |
case class RequestChunk(number: Int, offset: Long, size: Long) | |
case class ChunkDownloaded(number: Int, response: Try[HttpResponse]) | |
case class DownloadFailed(number: Int) | |
case class DownloadTimeout(number: Int) | |
def props(url: String, chunkList: List[RequestChunk]): Props = | |
Props(new ChunkPublisher(url, chunkList)) | |
} | |
class ChunkPublisher(url: String, chunkList: List[RequestChunk]) extends ActorPublisher[Source[ByteString, Any]] { | |
implicit val system = context.system | |
implicit val materializer = ActorFlowMaterializer() | |
implicit def executor: ExecutionContextExecutor = system.dispatcher | |
sealed trait DownloadState | |
case object Ready extends DownloadState | |
case object Requested extends DownloadState | |
case object Completed extends DownloadState | |
var nextChunkToEmit = 1 | |
var chunkMap: Map[Int, RequestChunk] = chunkList.map(c => c.number -> c).toMap | |
var chunkState: Map[Int, DownloadState] = chunkList.map(c => c.number -> Ready).toMap | |
val completedChunks: mutable.Map[Int, Source[ByteString, Any]] = | |
mutable.Map[Int, Source[ByteString, Any]]() | |
var inFlightDemand = 0 | |
val downloadTimeout: FiniteDuration = 10 seconds | |
def netDemand = totalDemand - inFlightDemand | |
def downloadChunk(chunk: RequestChunk): Unit = { | |
val request = HttpRequest( | |
uri = url.toString, | |
headers = List(Range(ByteRange(chunk.offset, chunk.offset + chunk.size)))) | |
val response: Future[HttpResponse] = Http().singleRequest(request) | |
println(s"Downloading chunk ${chunk.number}") | |
response.onComplete(data => self ! ChunkDownloaded(chunk.number, data)) | |
response.onFailure {case ex: Exception => println(ex); self ! DownloadFailed(chunk.number)} | |
context.system.scheduler.scheduleOnce(downloadTimeout, self, DownloadTimeout(chunk.number)) | |
} | |
def requestChunks(): Unit = { | |
while (netDemand > 0) { | |
println(s"requesting chunks: totalDemand: $totalDemand, inFlightDemand: $inFlightDemand") | |
val (num, _) = chunkState.filter(_._2 == Ready).toList.sortWith((l, r) => l._1 < r._1).head | |
chunkState = chunkState.updated(num, Requested) | |
downloadChunk(chunkMap(num)) | |
inFlightDemand += 1 | |
} | |
} | |
def emitChunks(): Unit = { | |
println(completedChunks) | |
while (completedChunks.contains(nextChunkToEmit)) { | |
println(s"emitting chunk $nextChunkToEmit") | |
onNext(completedChunks(nextChunkToEmit)) | |
completedChunks.remove(nextChunkToEmit) | |
nextChunkToEmit += 1 | |
inFlightDemand -= 1 | |
requestChunks() | |
} | |
if (chunkState.forall(elem => elem._2 == Completed)) onCompleteThenStop() | |
} | |
def chunkComplete(number: Int, response: Try[HttpResponse]): Unit = { | |
println(s"chunkComplete: $number:${response.isSuccess}") | |
if (response.isSuccess) { | |
completedChunks.update(number, response.get.entity.dataBytes) | |
chunkState = chunkState.updated(number, Completed) | |
} | |
emitChunks() | |
} | |
override def receive: Receive = { | |
case Request(_) => requestChunks() | |
case Cancel => context.stop(self) | |
case ChunkDownloaded(n, r) => chunkComplete(n, r) | |
case DownloadFailed(n) => downloadChunk(chunkMap(n)) | |
case DownloadTimeout(n) => downloadChunk(chunkMap(n)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jasonmartens.s3downloader | |
import java.util.Calendar | |
import akka.actor.ActorSystem | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.scaladsl.{FlattenStrategy, Sink, Source} | |
import akka.util.ByteString | |
import com.amazonaws.HttpMethod | |
import com.amazonaws.auth.PropertiesFileCredentialsProvider | |
import com.amazonaws.services.s3.AmazonS3Client | |
import com.jasonmartens.s3downloader.ChunkPublisher.RequestChunk | |
import scala.collection.immutable.NumericRange | |
/** | |
* Created by Jason Martens <me@jasonmartens.com> on 6/12/15. | |
* | |
*/ | |
case class InitData(url: String, dataLength: Long) | |
object GetS3PresignedURL { | |
def apply: InitData = { | |
val credentials = new PropertiesFileCredentialsProvider("~/.aws/dev.properties") | |
val awsClient = new AmazonS3Client(credentials) | |
val bucketName = "bucket" | |
val keyName = "large_file" | |
val expiration = Calendar.getInstance() | |
expiration.add(Calendar.HOUR, 6) | |
val s3Url = awsClient.generatePresignedUrl(bucketName, keyName, expiration.getTime, HttpMethod.GET) | |
val s3Object = awsClient.getObject(bucketName, keyName) | |
val size = s3Object.getObjectMetadata.getInstanceLength | |
InitData(s3Url.toString, size) | |
} | |
} | |
object GenerateFixedChunks { | |
def apply(size: Long, chunkSize: Long): List[RequestChunk] = { | |
val offsetList = NumericRange[Long](0, size, chunkSize) | |
val sizeList = offsetList.init.map(_ => chunkSize) :+ size % chunkSize | |
val chunkNumbers = Range(1, offsetList.end.toInt) | |
val zipped = offsetList zip sizeList zip chunkNumbers | |
zipped.map {case ((o, s), n) => RequestChunk(n, o, s)}.toList | |
} | |
} | |
object S3Downloader extends App { | |
implicit val system = ActorSystem("s3download-system") | |
implicit val materializer = ActorFlowMaterializer() | |
val initData = GetS3PresignedURL.apply | |
val chunkList = GenerateFixedChunks(initData.dataLength, 1024 * 1024 * 5) | |
val source = Source.actorPublisher[Source[ByteString, Any]](ChunkPublisher.props(initData.url, chunkList)) | |
source.flatten(FlattenStrategy.concat).runWith(Sink.ignore) | |
// system.shutdown() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
requesting chunks: totalDemand: 4, inFlightDemand: 0 | |
Downloading chunk 1 | |
requesting chunks: totalDemand: 4, inFlightDemand: 1 | |
Downloading chunk 2 | |
requesting chunks: totalDemand: 4, inFlightDemand: 2 | |
Downloading chunk 3 | |
requesting chunks: totalDemand: 4, inFlightDemand: 3 | |
Downloading chunk 4 | |
chunkComplete: 4:true | |
Map(4 -> akka.stream.scaladsl.Source@7fba5c6c) | |
chunkComplete: 3:true | |
Map(4 -> akka.stream.scaladsl.Source@7fba5c6c, 3 -> akka.stream.scaladsl.Source@59766734) | |
chunkComplete: 2:true | |
Map(2 -> akka.stream.scaladsl.Source@1fc45691, 4 -> akka.stream.scaladsl.Source@7fba5c6c, 3 -> akka.stream.scaladsl.Source@59766734) | |
chunkComplete: 1:true | |
Map(2 -> akka.stream.scaladsl.Source@1fc45691, 4 -> akka.stream.scaladsl.Source@7fba5c6c, 1 -> akka.stream.scaladsl.Source@3e29208a, 3 -> akka.stream.scaladsl.Source@59766734) | |
emitting chunk 1 | |
emitting chunk 2 | |
emitting chunk 3 | |
emitting chunk 4 | |
requesting chunks: totalDemand: 2, inFlightDemand: 0 | |
Downloading chunk 5 | |
requesting chunks: totalDemand: 2, inFlightDemand: 1 | |
Downloading chunk 6 | |
chunkComplete: 5:true | |
Map(5 -> akka.stream.scaladsl.Source@15526293) | |
emitting chunk 5 | |
chunkComplete: 6:true | |
Map(6 -> akka.stream.scaladsl.Source@6caa771f) | |
emitting chunk 6 | |
requesting chunks: totalDemand: 2, inFlightDemand: 0 | |
Downloading chunk 7 | |
requesting chunks: totalDemand: 2, inFlightDemand: 1 | |
Downloading chunk 8 | |
requesting chunks: totalDemand: 4, inFlightDemand: 2 | |
Downloading chunk 9 | |
requesting chunks: totalDemand: 4, inFlightDemand: 3 | |
Downloading chunk 10 | |
chunkComplete: 7:true | |
Map(7 -> akka.stream.scaladsl.Source@15860405) | |
emitting chunk 7 | |
chunkComplete: 10:true | |
Map(10 -> akka.stream.scaladsl.Source@136b0cb) | |
chunkComplete: 8:true | |
Map(8 -> akka.stream.scaladsl.Source@7b6af9ae, 10 -> akka.stream.scaladsl.Source@136b0cb) | |
emitting chunk 8 | |
Downloading chunk 1 | |
Downloading chunk 2 | |
Downloading chunk 3 | |
Downloading chunk 4 | |
chunkComplete: 3:true | |
Map(10 -> akka.stream.scaladsl.Source@136b0cb, 3 -> akka.stream.scaladsl.Source@50a8be20) | |
requesting chunks: totalDemand: 4, inFlightDemand: 2 | |
Downloading chunk 11 | |
requesting chunks: totalDemand: 4, inFlightDemand: 3 | |
Downloading chunk 12 | |
chunkComplete: 9:true | |
Map(10 -> akka.stream.scaladsl.Source@136b0cb, 9 -> akka.stream.scaladsl.Source@5b03f61e, 3 -> akka.stream.scaladsl.Source@50a8be20) | |
emitting chunk 9 | |
emitting chunk 10 | |
Downloading chunk 5 | |
Downloading chunk 6 | |
chunkComplete: 1:true | |
Map(1 -> akka.stream.scaladsl.Source@489d2321, 3 -> akka.stream.scaladsl.Source@50a8be20) | |
requesting chunks: totalDemand: 4, inFlightDemand: 2 | |
Downloading chunk 13 | |
requesting chunks: totalDemand: 4, inFlightDemand: 3 | |
Downloading chunk 14 | |
chunkComplete: 11:true | |
Map(11 -> akka.stream.scaladsl.Source@36b198fb, 1 -> akka.stream.scaladsl.Source@489d2321, 3 -> akka.stream.scaladsl.Source@50a8be20) | |
emitting chunk 11 | |
[INFO] [06/15/2015 08:57:08.336] [s3download-system-akka.actor.default-dispatcher-13] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:08.830] [s3download-system-akka.actor.default-dispatcher-13] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:08.830] [s3download-system-akka.actor.default-dispatcher-13] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:12.031] [s3download-system-akka.actor.default-dispatcher-16] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:12.031] [s3download-system-akka.actor.default-dispatcher-16] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:13.516] [s3download-system-akka.actor.default-dispatcher-9] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:13.868] [s3download-system-akka.actor.default-dispatcher-9] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:14.269] [s3download-system-akka.actor.default-dispatcher-26] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:14.412] [s3download-system-akka.actor.default-dispatcher-2] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. | |
[INFO] [06/15/2015 08:57:14.570] [s3download-system-akka.actor.default-dispatcher-7] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment