Skip to content

Instantly share code, notes, and snippets.

@dsebban
Last active August 26, 2018 13:20
Show Gist options
  • Save dsebban/dcfe284961135a975bf46f5e4aec7d55 to your computer and use it in GitHub Desktop.
Save dsebban/dcfe284961135a975bf46f5e4aec7d55 to your computer and use it in GitHub Desktop.
create dynamo db table using akka-streams
import $ivy.`com.lightbend.akka::akka-stream-alpakka-dynamodb:0.20`
import akka.actor.ActorSystem
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.stream.ActorMaterializer
import akka.stream.alpakka.dynamodb.scaladsl._
import akka.stream.alpakka.dynamodb.impl.DynamoSettings
import akka.stream.scaladsl._
import scala.concurrent.{ Await , Future }
import scala.concurrent.duration._
import DynamoImplicits._
import com.amazonaws.services.dynamodbv2.model._
import scala.collection.JavaConverters._
val config = """
| akka.stream.alpakka.dynamodb {
| region = "us-west-1"
| host = "localhost"
| port: 4569
| parallelism = 32
| credentials {
| access-key-id = "dummy-access-key"
| secret-key-id = "dummy-secret-key"
| }
|}""".stripMargin
implicit val system = ActorSystem("system", ConfigFactory.parseString(config))
implicit val actorMaterializer = ActorMaterializer()
import system.dispatcher
val settings = DynamoSettings(system)
val client = DynamoClient(settings)
val listTablesResult: Future[ListTablesResult] = client.single(new ListTablesRequest())
val res = Await.result(listTablesResult, 1000 seconds)
println(s"tables before : $res")
val keySchemaType = (name: String, keyType: KeyType) => new KeySchemaElement().withAttributeName(name).withKeyType(keyType)
val keySchema = (name: String) => keySchemaType(name, KeyType.HASH)
val tr = new CreateTableRequest().withTableName("testTabl1e").withKeySchema(keySchema("my_key"))
val tableName = "table_2"
val keyCol = "kkey"
val sortCol = "sort"
val createTableRequest = new CreateTableRequest().withTableName(tableName).withKeySchema(new KeySchemaElement().withAttributeName(keyCol).withKeyType(KeyType.HASH),new KeySchemaElement().withAttributeName(sortCol).withKeyType(KeyType.RANGE)).withAttributeDefinitions(new AttributeDefinition().withAttributeName(keyCol).withAttributeType("S"),new AttributeDefinition().withAttributeName(sortCol).withAttributeType("N")).withProvisionedThroughput(new ProvisionedThroughput().withReadCapacityUnits(10L).withWriteCapacityUnits(10L))
val s = Source.single(createTableRequest.toOp).via(client.flow).map(_.getTableDescription.getTableArn).toMat(Sink.seq)(Keep.right).run()
println(Await.result(s, 1000 seconds))
val listTablesResult: Future[ListTablesResult] = client.single(new ListTablesRequest())
val res = Await.result(listTablesResult, 1000 seconds)
println(s"tables before : $res")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment