Skip to content

Instantly share code, notes, and snippets.

@zsedem
Created June 30, 2021 10:38
Show Gist options
  • Save zsedem/77f1be98434915dbc8aba7b24173f6e6 to your computer and use it in GitHub Desktop.
Save zsedem/77f1be98434915dbc8aba7b24173f6e6 to your computer and use it in GitHub Desktop.
A pure akka-streams implementation for retry dynamodb partial failures
trait DynamoWriteItem {
def id: String
}
trait DynamoService {
def writeBatch(dynamoWriteItems: List[DynamoWriteItem]): Future[Either[List[DynamoWriteItem], Done]]
}
object DynamoWriteStream {
def batchedFlow[PassThrough](
dynamoService: DynamoService
): Flow[(DynamoWriteItem, PassThrough), PassThrough, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
type BatchResult = (List[(DynamoWriteItem, PassThrough)], List[PassThrough])
import GraphDSL.Implicits._
// could extract these to config
val maxBatchSize = 25
val maxInflightRetry = 100
val parallelRequests = 4
val mergeRetries = MergePreferred[(DynamoWriteItem, PassThrough)](1, eagerComplete = true)
val fork =
builder.add(Broadcast[BatchResult](2, eagerCancel = true))
val collectRetries = Flow[BatchResult]
.mapConcat(x => x._1)
.buffer(maxInflightRetry, OverflowStrategy.backpressure)
val batchedWrites = Flow[(DynamoWriteItem, PassThrough)]
.buffer(maxBatchSize * (parallelRequests - 1), OverflowStrategy.backpressure)
.batch(maxBatchSize, List(_))(_ :+ _)
// retries would mess up the ordering anyway
.mapAsyncUnordered(parallelRequests)(batch => {
dynamoService
.writeBatch(batch.map(_._1))
.map({
case Left(value) =>
val retries = value.map(x => batch.find(_._1.id == x.id).getOrElse(sys.error("should not happen")))
val passthroughs = batch.collect({
case (write, passthrough) if !value.exists(_.id == write.id) => passthrough
})
(retries, passthroughs)
case Right(Done) =>
(List.empty, batch.map(_._2))
})(ExecutionContext.parasitic)
})
mergeRetries.preferred <~ collectRetries <~ fork.out(0)
mergeRetries.out ~> batchedWrites ~> fork.in
val passthroughs = fork.out(1) ~> Flow[BatchResult].mapConcat({ case (_, passthroughs) => passthroughs })
FlowShape(mergeRetries.in(0), passthroughs.outlet)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment