Skip to content

Instantly share code, notes, and snippets.

@jasonmartens
Last active November 30, 2016 18:25
Show Gist options
  • Save jasonmartens/ed6fd6f8aeabc5846aa2 to your computer and use it in GitHub Desktop.
Save jasonmartens/ed6fd6f8aeabc5846aa2 to your computer and use it in GitHub Desktop.
Akka Large File Downloader Hangs
name := "S3Downloader"
version := "1.0"
scalaVersion := "2.11.6"
val akkaStreamV = "1.0-RC3"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.11"
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.9.31"
libraryDependencies += "com.typesafe.akka" % "akka-stream-experimental_2.11" % akkaStreamV
libraryDependencies += "com.typesafe.akka" % "akka-http-core-experimental_2.11" % akkaStreamV
libraryDependencies += "com.typesafe.akka" % "akka-http-experimental_2.11" % akkaStreamV
libraryDependencies += "com.typesafe.akka" % "akka-http-testkit-experimental_2.11" % akkaStreamV
package com.jasonmartens.s3downloader
import akka.actor.Props
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.{ByteRange, Range}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.jasonmartens.s3downloader.ChunkPublisher.{DownloadTimeout, ChunkDownloaded, DownloadFailed, RequestChunk}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.Try
import scala.concurrent.duration._
/**
* Created by Jason Martens <me@jasonmartens.com> on 6/12/15.
*
*/
object ChunkPublisher {
case class DownloadedChunk(data: ByteString)
case class RequestChunk(number: Int, offset: Long, size: Long)
case class ChunkDownloaded(number: Int, response: Try[HttpResponse])
case class DownloadFailed(number: Int)
case class DownloadTimeout(number: Int)
def props(url: String, chunkList: List[RequestChunk]): Props =
Props(new ChunkPublisher(url, chunkList))
}
class ChunkPublisher(url: String, chunkList: List[RequestChunk]) extends ActorPublisher[Source[ByteString, Any]] {
implicit val system = context.system
implicit val materializer = ActorFlowMaterializer()
implicit def executor: ExecutionContextExecutor = system.dispatcher
sealed trait DownloadState
case object Ready extends DownloadState
case object Requested extends DownloadState
case object Completed extends DownloadState
var nextChunkToEmit = 1
var chunkMap: Map[Int, RequestChunk] = chunkList.map(c => c.number -> c).toMap
var chunkState: Map[Int, DownloadState] = chunkList.map(c => c.number -> Ready).toMap
val completedChunks: mutable.Map[Int, Source[ByteString, Any]] =
mutable.Map[Int, Source[ByteString, Any]]()
var inFlightDemand = 0
val downloadTimeout: FiniteDuration = 10 seconds
def netDemand = totalDemand - inFlightDemand
def downloadChunk(chunk: RequestChunk): Unit = {
val request = HttpRequest(
uri = url.toString,
headers = List(Range(ByteRange(chunk.offset, chunk.offset + chunk.size))))
val response: Future[HttpResponse] = Http().singleRequest(request)
println(s"Downloading chunk ${chunk.number}")
response.onComplete(data => self ! ChunkDownloaded(chunk.number, data))
response.onFailure {case ex: Exception => println(ex); self ! DownloadFailed(chunk.number)}
context.system.scheduler.scheduleOnce(downloadTimeout, self, DownloadTimeout(chunk.number))
}
def requestChunks(): Unit = {
while (netDemand > 0) {
println(s"requesting chunks: totalDemand: $totalDemand, inFlightDemand: $inFlightDemand")
val (num, _) = chunkState.filter(_._2 == Ready).toList.sortWith((l, r) => l._1 < r._1).head
chunkState = chunkState.updated(num, Requested)
downloadChunk(chunkMap(num))
inFlightDemand += 1
}
}
def emitChunks(): Unit = {
println(completedChunks)
while (completedChunks.contains(nextChunkToEmit)) {
println(s"emitting chunk $nextChunkToEmit")
onNext(completedChunks(nextChunkToEmit))
completedChunks.remove(nextChunkToEmit)
nextChunkToEmit += 1
inFlightDemand -= 1
requestChunks()
}
if (chunkState.forall(elem => elem._2 == Completed)) onCompleteThenStop()
}
def chunkComplete(number: Int, response: Try[HttpResponse]): Unit = {
println(s"chunkComplete: $number:${response.isSuccess}")
if (response.isSuccess) {
completedChunks.update(number, response.get.entity.dataBytes)
chunkState = chunkState.updated(number, Completed)
}
emitChunks()
}
override def receive: Receive = {
case Request(_) => requestChunks()
case Cancel => context.stop(self)
case ChunkDownloaded(n, r) => chunkComplete(n, r)
case DownloadFailed(n) => downloadChunk(chunkMap(n))
case DownloadTimeout(n) => downloadChunk(chunkMap(n))
}
}
package com.jasonmartens.s3downloader
import java.util.Calendar
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{FlattenStrategy, Sink, Source}
import akka.util.ByteString
import com.amazonaws.HttpMethod
import com.amazonaws.auth.PropertiesFileCredentialsProvider
import com.amazonaws.services.s3.AmazonS3Client
import com.jasonmartens.s3downloader.ChunkPublisher.RequestChunk
import scala.collection.immutable.NumericRange
/**
* Created by Jason Martens <me@jasonmartens.com> on 6/12/15.
*
*/
case class InitData(url: String, dataLength: Long)
object GetS3PresignedURL {
def apply: InitData = {
val credentials = new PropertiesFileCredentialsProvider("~/.aws/dev.properties")
val awsClient = new AmazonS3Client(credentials)
val bucketName = "bucket"
val keyName = "large_file"
val expiration = Calendar.getInstance()
expiration.add(Calendar.HOUR, 6)
val s3Url = awsClient.generatePresignedUrl(bucketName, keyName, expiration.getTime, HttpMethod.GET)
val s3Object = awsClient.getObject(bucketName, keyName)
val size = s3Object.getObjectMetadata.getInstanceLength
InitData(s3Url.toString, size)
}
}
object GenerateFixedChunks {
def apply(size: Long, chunkSize: Long): List[RequestChunk] = {
val offsetList = NumericRange[Long](0, size, chunkSize)
val sizeList = offsetList.init.map(_ => chunkSize) :+ size % chunkSize
val chunkNumbers = Range(1, offsetList.end.toInt)
val zipped = offsetList zip sizeList zip chunkNumbers
zipped.map {case ((o, s), n) => RequestChunk(n, o, s)}.toList
}
}
object S3Downloader extends App {
implicit val system = ActorSystem("s3download-system")
implicit val materializer = ActorFlowMaterializer()
val initData = GetS3PresignedURL.apply
val chunkList = GenerateFixedChunks(initData.dataLength, 1024 * 1024 * 5)
val source = Source.actorPublisher[Source[ByteString, Any]](ChunkPublisher.props(initData.url, chunkList))
source.flatten(FlattenStrategy.concat).runWith(Sink.ignore)
// system.shutdown()
}
requesting chunks: totalDemand: 4, inFlightDemand: 0
Downloading chunk 1
requesting chunks: totalDemand: 4, inFlightDemand: 1
Downloading chunk 2
requesting chunks: totalDemand: 4, inFlightDemand: 2
Downloading chunk 3
requesting chunks: totalDemand: 4, inFlightDemand: 3
Downloading chunk 4
chunkComplete: 4:true
Map(4 -> akka.stream.scaladsl.Source@7fba5c6c)
chunkComplete: 3:true
Map(4 -> akka.stream.scaladsl.Source@7fba5c6c, 3 -> akka.stream.scaladsl.Source@59766734)
chunkComplete: 2:true
Map(2 -> akka.stream.scaladsl.Source@1fc45691, 4 -> akka.stream.scaladsl.Source@7fba5c6c, 3 -> akka.stream.scaladsl.Source@59766734)
chunkComplete: 1:true
Map(2 -> akka.stream.scaladsl.Source@1fc45691, 4 -> akka.stream.scaladsl.Source@7fba5c6c, 1 -> akka.stream.scaladsl.Source@3e29208a, 3 -> akka.stream.scaladsl.Source@59766734)
emitting chunk 1
emitting chunk 2
emitting chunk 3
emitting chunk 4
requesting chunks: totalDemand: 2, inFlightDemand: 0
Downloading chunk 5
requesting chunks: totalDemand: 2, inFlightDemand: 1
Downloading chunk 6
chunkComplete: 5:true
Map(5 -> akka.stream.scaladsl.Source@15526293)
emitting chunk 5
chunkComplete: 6:true
Map(6 -> akka.stream.scaladsl.Source@6caa771f)
emitting chunk 6
requesting chunks: totalDemand: 2, inFlightDemand: 0
Downloading chunk 7
requesting chunks: totalDemand: 2, inFlightDemand: 1
Downloading chunk 8
requesting chunks: totalDemand: 4, inFlightDemand: 2
Downloading chunk 9
requesting chunks: totalDemand: 4, inFlightDemand: 3
Downloading chunk 10
chunkComplete: 7:true
Map(7 -> akka.stream.scaladsl.Source@15860405)
emitting chunk 7
chunkComplete: 10:true
Map(10 -> akka.stream.scaladsl.Source@136b0cb)
chunkComplete: 8:true
Map(8 -> akka.stream.scaladsl.Source@7b6af9ae, 10 -> akka.stream.scaladsl.Source@136b0cb)
emitting chunk 8
Downloading chunk 1
Downloading chunk 2
Downloading chunk 3
Downloading chunk 4
chunkComplete: 3:true
Map(10 -> akka.stream.scaladsl.Source@136b0cb, 3 -> akka.stream.scaladsl.Source@50a8be20)
requesting chunks: totalDemand: 4, inFlightDemand: 2
Downloading chunk 11
requesting chunks: totalDemand: 4, inFlightDemand: 3
Downloading chunk 12
chunkComplete: 9:true
Map(10 -> akka.stream.scaladsl.Source@136b0cb, 9 -> akka.stream.scaladsl.Source@5b03f61e, 3 -> akka.stream.scaladsl.Source@50a8be20)
emitting chunk 9
emitting chunk 10
Downloading chunk 5
Downloading chunk 6
chunkComplete: 1:true
Map(1 -> akka.stream.scaladsl.Source@489d2321, 3 -> akka.stream.scaladsl.Source@50a8be20)
requesting chunks: totalDemand: 4, inFlightDemand: 2
Downloading chunk 13
requesting chunks: totalDemand: 4, inFlightDemand: 3
Downloading chunk 14
chunkComplete: 11:true
Map(11 -> akka.stream.scaladsl.Source@36b198fb, 1 -> akka.stream.scaladsl.Source@489d2321, 3 -> akka.stream.scaladsl.Source@50a8be20)
emitting chunk 11
[INFO] [06/15/2015 08:57:08.336] [s3download-system-akka.actor.default-dispatcher-13] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:08.830] [s3download-system-akka.actor.default-dispatcher-13] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:08.830] [s3download-system-akka.actor.default-dispatcher-13] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:12.031] [s3download-system-akka.actor.default-dispatcher-16] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:12.031] [s3download-system-akka.actor.default-dispatcher-16] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:13.516] [s3download-system-akka.actor.default-dispatcher-9] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:13.868] [s3download-system-akka.actor.default-dispatcher-9] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:14.269] [s3download-system-akka.actor.default-dispatcher-26] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:14.412] [s3download-system-akka.actor.default-dispatcher-2] [akka://s3download-system/deadLetters] Message [com.jasonmartens.s3downloader.ChunkPublisher$ChunkDownloaded] from Actor[akka://s3download-system/deadLetters] to Actor[akka://s3download-system/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/15/2015 08:57:14.570] [s3download-system-akka.actor.default-dispatcher-7] [akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource] Message [com.jasonmartens.s3downloader.ChunkPublisher$DownloadTimeout] from Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] to Actor[akka://s3download-system/user/$a/flow-1-0-actorPublisherSource-actorPublisherSource#-1059692399] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment