Skip to content

Instantly share code, notes, and snippets.

@mtranter
Created September 28, 2018 04:31
Show Gist options
  • Save mtranter/1f71a892e1d0f5218dc52f4c50ef4a58 to your computer and use it in GitHub Desktop.
Save mtranter/1f71a892e1d0f5218dc52f4c50ef4a58 to your computer and use it in GitHub Desktop.
Dynamo Akka Streams
import akka.Done
import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic}
import akka.stream.{Attributes, Inlet, SinkShape}
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemRequest, PutRequest, WriteRequest}
import com.gu.scanamo.{DynamoFormat, Table}
import scala.collection.JavaConverters._
import scala.concurrent.Future
object Dynamo {
def simpleSink[T: DynamoFormat](client: AmazonDynamoDBAsync, table: String): DynamoSink[T] =
new DynamoSink[T](client, table)
def batchSink[T: DynamoFormat](client: AmazonDynamoDBAsync, table: String): DynamoBatchSink[T] =
new DynamoBatchSink[T](client, table)
}
class DynamoSink[T : DynamoFormat](client: AmazonDynamoDBAsync, table: String)
extends GraphStage[SinkShape[T]]{ stage =>
private val format = implicitly[DynamoFormat[T]]
private val writeTo = Table[T](table)
private val in = Inlet.create[T]("DynamoSink.in")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { logic =>
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val toSave = logic.grab(in)
client.putItem(table, format.write(toSave).getM)
logic.pull(in)
}
})
override def preStart(): Unit = pull(in)
}
override def shape: SinkShape[T] = SinkShape.of(in)
}
class DynamoBatchSink[T : DynamoFormat](client: AmazonDynamoDBAsync, table: String)
extends GraphStage[SinkShape[Seq[T]]]{ stage =>
private val format = implicitly[DynamoFormat[T]]
private val writeTo = Table[T](table)
private val in = Inlet.create[Seq[T]]("DynamoBatchSink.in")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { logic =>
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val toSave = logic.grab(in)
val reqItems = toSave
.map(format.write(_).getM)
.map(r => new WriteRequest(new PutRequest().withItem(r)))
.asJava
val request = new BatchWriteItemRequest().withRequestItems(Map(table -> reqItems).asJava)
client.batchWriteItem(request)
logic.pull(in)
}
})
override def preStart(): Unit = pull(in)
}
override def shape: SinkShape[Seq[T]] = SinkShape.of(in)
}
import com.amazonaws.services.dynamodbv2.model.{AttributeValue => DynamoValue}
import scala.collection.JavaConverters._
object AttributeValue {
implicit class PimpedAttributeValue(av: DynamoValue) {
val MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21
def sizeOf(): Int = sizeOf(av, 0)
//noinspection ScalaStyle
private def sizeOf(dv: DynamoValue, sum: Int): Int = {
sum + (if(dv.getB != null) dv.getB.array().length
else if(dv.getBOOL != null) dv.getBOOL.toString.getBytes("utf-8").length
else if(dv.getBS != null) dv.getBS.asScala.foldLeft(0){(s,n) => s + n.array().length}
else if(dv.getL != null) dv.getL.asScala.foldLeft(0){(s,n) => s + sizeOf(n,0)}
else if(dv.getM != null) dv.getM.asScala.foldLeft(0){(s, p) => s + p._1.getBytes("utf-8").length + sizeOf(p._2, 0)}
else if(dv.getN != null) MAX_NUMBER_OF_BYTES_FOR_NUMBER
else if(dv.getNS != null) MAX_NUMBER_OF_BYTES_FOR_NUMBER * dv.getNS.size()
else if(dv.getS != null) dv.getS.getBytes("utf-8").length
else if(dv.getSS != null) dv.getSS.asScala.foldLeft(0){(s,n) => s + n.getBytes("utf-8").length}
else if(dv.getNULL != null) 0
else 0)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment