Skip to content

Instantly share code, notes, and snippets.

@Bryji
Created July 12, 2022 15:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bryji/315ae3feced9ecb463ff1136044fc902 to your computer and use it in GitHub Desktop.
Save Bryji/315ae3feced9ecb463ff1136044fc902 to your computer and use it in GitHub Desktop.
Recursively fetch dynamo items chaining CompleteableFutures and wrap in Async[F]
type DynamoItems = util.List[util.Map[String, AttributeValue]]
val client: DynamoDbAsyncClient = ???
override def getByDeviceGuid(tableName: String, deviceGuid: String, consistentRead: Boolean)(implicit
traceContext: TraceContext
): F[DynamoItems] = {
val attributeValues = Map(":deviceGuidValue" -> AttributeValue.builder().s(deviceGuid).build()).asJava
val getAllRequest = QueryRequest
.builder()
.tableName(tableName)
.keyConditionExpression("%s = :deviceGuidValue".format(DeviceGuidKey))
.expressionAttributeValues(attributeValues)
.consistentRead(true)
.build()
def getPage(request: QueryRequest, prevResponse: Option[QueryResponse]): CompletableFuture[DynamoItems] = {
val (nextReq: QueryRequest, prevResults: DynamoItems) = prevResponse match {
case Some(lr) => (request.copy(rq => rq.exclusiveStartKey(lr.lastEvaluatedKey())), lr.items())
case None => (request, new util.ArrayList[util.Map[String, AttributeValue]]())
}
client
.query(nextReq)
.thenCompose { r =>
if (r.hasLastEvaluatedKey) getPage(request, Some(r))
else CompletableFuture.completedFuture(r.items())
}
.thenApply { r =>
util.stream.Stream
.concat(prevResults.stream(), r.stream())
.collect(util.stream.Collectors.toList());
}
}
Async[F]
.fromFuture(Async[F].delay(getPage(getAllRequest, None).asScala))
.recoverWith(dynamoErrorHandler(tableName, deviceGuid, "retrieving", None))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment