Created
September 28, 2018 04:31
-
-
Save mtranter/1f71a892e1d0f5218dc52f4c50ef4a58 to your computer and use it in GitHub Desktop.
Dynamo Akka Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.Done | |
import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic} | |
import akka.stream.{Attributes, Inlet, SinkShape} | |
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync | |
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemRequest, PutRequest, WriteRequest} | |
import com.gu.scanamo.{DynamoFormat, Table} | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.Future | |
object Dynamo { | |
def simpleSink[T: DynamoFormat](client: AmazonDynamoDBAsync, table: String): DynamoSink[T] = | |
new DynamoSink[T](client, table) | |
def batchSink[T: DynamoFormat](client: AmazonDynamoDBAsync, table: String): DynamoBatchSink[T] = | |
new DynamoBatchSink[T](client, table) | |
} | |
class DynamoSink[T : DynamoFormat](client: AmazonDynamoDBAsync, table: String) | |
extends GraphStage[SinkShape[T]]{ stage => | |
private val format = implicitly[DynamoFormat[T]] | |
private val writeTo = Table[T](table) | |
private val in = Inlet.create[T]("DynamoSink.in") | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { logic => | |
setHandler(in, new AbstractInHandler { | |
override def onPush(): Unit = { | |
val toSave = logic.grab(in) | |
client.putItem(table, format.write(toSave).getM) | |
logic.pull(in) | |
} | |
}) | |
override def preStart(): Unit = pull(in) | |
} | |
override def shape: SinkShape[T] = SinkShape.of(in) | |
} | |
class DynamoBatchSink[T : DynamoFormat](client: AmazonDynamoDBAsync, table: String) | |
extends GraphStage[SinkShape[Seq[T]]]{ stage => | |
private val format = implicitly[DynamoFormat[T]] | |
private val writeTo = Table[T](table) | |
private val in = Inlet.create[Seq[T]]("DynamoBatchSink.in") | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { logic => | |
setHandler(in, new AbstractInHandler { | |
override def onPush(): Unit = { | |
val toSave = logic.grab(in) | |
val reqItems = toSave | |
.map(format.write(_).getM) | |
.map(r => new WriteRequest(new PutRequest().withItem(r))) | |
.asJava | |
val request = new BatchWriteItemRequest().withRequestItems(Map(table -> reqItems).asJava) | |
client.batchWriteItem(request) | |
logic.pull(in) | |
} | |
}) | |
override def preStart(): Unit = pull(in) | |
} | |
override def shape: SinkShape[Seq[T]] = SinkShape.of(in) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.services.dynamodbv2.model.{AttributeValue => DynamoValue} | |
import scala.collection.JavaConverters._ | |
object AttributeValue { | |
implicit class PimpedAttributeValue(av: DynamoValue) { | |
val MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21 | |
def sizeOf(): Int = sizeOf(av, 0) | |
//noinspection ScalaStyle | |
private def sizeOf(dv: DynamoValue, sum: Int): Int = { | |
sum + (if(dv.getB != null) dv.getB.array().length | |
else if(dv.getBOOL != null) dv.getBOOL.toString.getBytes("utf-8").length | |
else if(dv.getBS != null) dv.getBS.asScala.foldLeft(0){(s,n) => s + n.array().length} | |
else if(dv.getL != null) dv.getL.asScala.foldLeft(0){(s,n) => s + sizeOf(n,0)} | |
else if(dv.getM != null) dv.getM.asScala.foldLeft(0){(s, p) => s + p._1.getBytes("utf-8").length + sizeOf(p._2, 0)} | |
else if(dv.getN != null) MAX_NUMBER_OF_BYTES_FOR_NUMBER | |
else if(dv.getNS != null) MAX_NUMBER_OF_BYTES_FOR_NUMBER * dv.getNS.size() | |
else if(dv.getS != null) dv.getS.getBytes("utf-8").length | |
else if(dv.getSS != null) dv.getSS.asScala.foldLeft(0){(s,n) => s + n.getBytes("utf-8").length} | |
else if(dv.getNULL != null) 0 | |
else 0) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment