Skip to content

Instantly share code, notes, and snippets.

@a-h
Last active August 17, 2020 20:39
Show Gist options
  • Save a-h/9d1b0c769db823b6e3ce1a9a61632b37 to your computer and use it in GitHub Desktop.
Save a-h/9d1b0c769db823b6e3ce1a9a61632b37 to your computer and use it in GitHub Desktop.
Importing data into DynamoDB - code for blog post
ngram year match_count page_count volume_count
# 1574 1 1 1
# 1584 6 6 1
# 1614 1 1 1
# 1631 115 100 1
# 1632 3 3 1
# 1635 1 1 1
# 1640 1 1 1
# 1641 1 1 1
1935 2 2 1
1936 1 1 1
1938 6 6 4
1940 8 8 3
1941 3 3 2
firm 2006 153 108 21
firm 2007 417 264 31
firm 2008 255 215 28
five 1899 3 3 2
five 1993 15 13 1
five 1997 4 4 1
five 1998 39 35 4
five 1999 84 63 7
five 2000 10 7 2
five 2002 5 5 1
five 2003 6 6 1
five 2004 41 36 3
five 2005 335 284 13
five 2006 378 346 27
five 2007 649 552 46
five 2008 1217 982 45
flat 1997 1 1 1
flat 1998 6 6 4
flat 1999 5 5 4
flat 2005 36 30 9
flat 2006 38 38 15
flat 2007 86 80 26
flat 2008 116 101 21
flexible 1997 12 12 1
flexible 1998 14 12 4
flexible 1999 20 18 4
flexible 2003 4 4 1
flexible 2004 4 4 2
flexible 2005 46 44 6
flexible 2006 36 36 13
flexible 2007 125 114 29
flexible 2008 190 171 28
flow 1899 3 3 3
flow 1993 1 1 1
flow 1997 1 1 1
flow 1998 9 8 3
flow 1999 13 12 5
flow 2000 1 1 1
flow 2002 1 1 1
flow 2003 6 6 1
flow 2004 2 2 2
flow 2005 79 72 11
flow 2006 112 97 19
flow 2007 219 184 33
flow 2008 756 462 32
aws dynamodb create-table \
--table-name ddbimport \
--attribute-definitions AttributeName=ngram,AttributeType=S AttributeName=year,AttributeType=N \
--key-schema AttributeName=ngram,KeyType=HASH AttributeName=year,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
cat data.csv|wc -l|xargs printf "%'d\n"
# 26,266,996
$ ./01-nodeimport/index.js --region=eu-west-2 --table ddbimport --csv data.csv --delimiter=tab --numericFields=year
Inserted 2500 records in 8.979666334998913 - 278.4067811357384 records per second
Inserted 5000 records in 17.551751922001133 - 284.87184767763824 records per second
Inserted 7500 records in 25.7213841009943285 - 291.58617477782104 records per second
Inserted 10000 recoxds in 34.162606240002795 - 292.7177139164071 records per second
Inserted 12500 records in 43.178351169000965 - 289.49692754766716 records per second
$ ./01-n0deimport/index.js --region=eu-west-2 -—table ddbimport —-csv data.csv —-delimiter=tab --numericFields=year
Inserted 2500 records in 3.14841040800092745 - 794.0514977484675 records per second
Inserted 5000 records in 6.04728225000144455 - 826.8176998020566 records per second
Inserted 7500 records in 8.9385348590003563 - 839.0636853027572 records per second
Inserted 10000 records in 11.7898299570078965 - 848.1886538198952 records per second
Inserted 12500 records in 15.0327680839982355 - 831.5168523956501 records per second
Inserted 15000 records in 17.839218031003845 - 840.8440310517315 records per second
Inserted 17500 recoxds in 20.6496771810052455 - 847.4708755300786 records per second
make 02-goimport
go run 02-goimport/main.go -region=eu-west-2 -table=ddbimport -csv=data.csv -delimiter=tab -numericFields=year
2020/05/07 19:29:45 importing "data.csv" to ddbimport in region eu-west-2
2020/05/07 19:29:48 804 records per second
2020/05/07 19:29:51 850 records per second
2020/05/07 19:29:53 866 records per second
2020/05/07 19:29:56 875 records peI second
2020/05/07 19:29:59 882 records per second
2020/05/07 19:30:02 884 records per second
2020/05/07 19:30:05 882 records per second
2020/05/07 19:30:07 882 records per second
2020/05/07 19:30:10 883 records per second
2020/05/07 19:30:13 882 records per second
// Push data into the job queue.
for {
batch, _, err := reader.ReadBatch()
if err != nil && err != io.EOF {
logger.Fatal("failed to read batch from input",
zap.Int64("batchCount", batchCount),
zap.Error(err))
}
batches <- batch
if err == io.EOF {
break
}
}
close(batches)
$ /ddbimport_macos -region=eu-west-2 -table=ddbimport -csv=../data.csv -delimiter=tab -numericFields=year -concurrency=8
2020/05/08 10:31:07 importing "../data.csv" to ddbimport in region eu-west-2
2020/05/08 10:31:08 3361 records per second
2020/05/08 10:31:08 4240 records per second
2020/05/08 10:31:09 4622 records per second
2020/05/08 10:31:09 4859 records per second
2020/05/08 10:31:10 4998 records per second
2020/05/08 10:31:10 5173 records per second
2020/05/08 10:31:11 5240 records per second
2020/05/08 10:31:11 5275 records per second
2020/05/08 10:31:11 5338 records per second
2020/05/08 10:31:12 5351 records per second
2020/05/08 10:31:12 5393 records per second
2020/05/08 10:31:13 5429 records per second
2020/05/08 10:31:13 5465 records per second
2020/05/08 10:31:14 5490 records per second
ssh—keygen -f ddbimport.key -N ""
aws ec2 import-key-pair \
--key-name=ddbimport \
--public-key-material=file://ddbimport.key.pub
VPCID=`aws ec2 describe-vpcs \
--filters Name=isDefault,Values=true \
--query 'Vpcs[*].VpcId' \
--output text`
VPCSG=`aws ec2 create-security-group \
--group-name ddbimport \
--description "ddbimport" \
--vpc-id $VPCID \
--output=text`
aws ec2 authorize-security-group-ingress \
--group-id $VPCSG \
--ip-permissions '[{"IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "SSH access from anywhere."}]}]' \
--output=text
aws ec2 authorize-security-group-ingress \
--group-id $VPCSG \
--ip-permissions '[{"IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": "SSH access from anywhere."}]}]' \
--output=text
{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"Service": "ec2.amazonaws.com"},
"Action": "sts:AssumeRole"
}
}
aws iam create-role --role-name ddbimport --assume-role-policy-document=file://ddbimport_role.json
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess --role-name=ddbimport
aws iam create-instance-profile --instance-profile-name ddbimport-instance-profile
aws iam add-role-to-instance-profile --role-name ddbimport --instance-profile-name ddbimport-instance-profile
AMI=`aws ec2 describe-images --owners amazon --filters 'Name=name,Values=amzn-ami-hvm-????.??.?.????????-x86_64-gp2' 'Name=state,Values=available' --query 'reverse(sort_by(Images, &CreationDate))[:1].ImageId' --output text`
INSTANCEID=`aws ec2 run-instances --image-id=$AMI --instance-type m5.4xlarge --key-name ddbimport --security-groups ddbimport --tag-specification 'ResourceType=instance,Tags=[{Key=Name,Value=ddbimport}]' --output=text --query 'Reservations[*].Instances[*].InstanceId'`
aws ec2 associate-iam-instance-profile --instnce-id $INSTANCEID --iam-instance-profile Name=ddbimport-instance-profilea
scp -i ddbimport.key ../data.csv ec2-user@ec2-35-178-249-154.eu-west-2.compute.amazonaws.com:/home/ec2-user
GOOS=linux go build ../03-go-import-gr/main.go -o ddbimport
scp -i ddbimport.key -r ddbimport ec2-user@ec2-35-178-249-154.eu-west-2.compute.amazonaws.com:/home/ec2-user
[ec2-user@ip—172-31—15-99 ~]$ ./ddbimport_linux —region=eu-west-2 -table=ddbimport
-csv=data500k.csv -delimiter=tab -numericFields=year -concurrency=16
2020/05/08 10:19:46 importing "data500k.csv" to ddbimport in region eu-west-2
2020/05/08 10:19:46 25004 records per second
2020/05/08 10:19:46 31351 records per second
2020/05/08 10:19:46 35392 records per second
2020/05/08 10:19:47 37158 records per second
2020/05/08 10:19:47 38886 records per second
2020/05/08 10:19:47 40106 records per second
2020/05/08 10:19:47 41436 records per second
2020/05/08 10:19:47 42471 records per second
2020/05/08 10:19:47 43337 records per second
2020/05/08 10:19:47 43956 records per second
2020/05/08 10:19:53 29370 records per second
2020/05/08 10:19:53 29252 records per second
2020/05/08 10:19:53 28998 records per second
2020/05/08 10:19:54 28847 records per second
2020/05/08 10:19:54 28788 records per second
2020/05/08 10:19:54 28579 records per second
2020/05/08 10:19:54 28524 records per second
2020/05/08 10:19:54 28512 records per second
2020/05/08 10:19:54 28523 records per second
aws s3 cp ../data.csv s3://infinityworks-ddbimport/data.csv
service: ddbimport-lambda
provider:
name: aws
memorySize: 2048 # optional, in MB, default is 1024
timeout: 900 # optional, in seconds, default is 6
versionFunctions: false # optional, default is true
runtime: go1.x
stage: dev
region: eu-west-2
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:BatchWriteItem
Resource: "*"
- Effect: "Allow"
Action:
- "s3:ListBucket"
Resource: "*"
- Effect: "Allow"
Action:
- "s3:GetObject"
Resource: "*"
package:
exclude:
- ./**
include:
- ./bin/**
functions:
ddbimport:
handler: bin/ddbimport
package main
import (
"context"
"github.com/aws/aws-lambda-go/lambda"
)
// Request to the ddbimport Lambda.
type Request struct {
Source Source `json:"source"`
Worker Worker `json:"worker"`
Target Target `json:"target"`
}
// Source of the CSV data to import.
type Source struct {
Region string `json:"region"`
Bucket string `json:"bucket"`
Key string `json:"key"`
NumericFields []string `json:"numericFields"`
BooleanFields []string `json:"booleanFields"`
Delimiter string `json:"delimiter"`
}
// Target DynamoDB table.
type Target struct {
Region string `json:"region"`
TableName string `json:"tableName"`
}
// Worker details.
type Worker struct {
// Index of the worker in the set, e.g. 0, 1, 2, 3 out of a 4 worker set.
Index int `json:"index"`
// Count of total workers, e.g. 4.
Count int `json:"count"`
// Concurrency of each worker. Defaults to 1.
Concurrency int `json:"concurrency"`
}
// Response from the Lambda.
type Response struct {
Worker Worker `json:"worker"`
Error error `json:"error"`
ProcessedCount int64 `json:"processedCount"`
DurationMS int64 `json:"durationMs"`
}
func Handler(ctx context.Context, req Request) (resp Response, err error) {
// The stuff.
return
}
func main() {
lambda.Start(Handler)
}
$ sls deploy
Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Creating Stack...
Serverless: Checking Stack create progress...
{
"source": {
"region": "eu-west-2",
"bucket": "infinityworks-ddbimport",
"key": "data500k.csv",
"numericFields": ["year"],
"delimiter": "\t"
},
"worker": {
"concurrency": 8
},
"target": {
"region": "eu-west-2",
"tableName": "ddbimport"
}
}
aws lambda invoke \
--function-name ddbimport-lambda-dev-ddbimport \
--payload "`cat payload.json`" \
--log-type Tail output.json
sls logs -f ddbimport|tail
2020/05/08 10:44:51 inserted 166700 batches (4167475 records) in 4m31.539588387s - 15347 records per second
2020/05/08 10:44:51 inserted 166800 batches (4169975 records) in 4m31.633527803s - 15351 records per second
2020/05/08 10:44:51 inserted 166900 batches (4172475 records) in 4m31.762724752s - 15353 records per second
2020/05/08 10:44:51 inserted 167000 batches (4174975 records) in 4m31.930057284s - 15353 records per second
REPORT RequestId: a196ab78-9bb1—4feb—9c38—57775f154a09 Duration: 900099.79 ms Billed Duration: 900000 ms Memory Size: 2048 MB Max Memory Used: 64 MB Init Duration: 102.32 ms
2020-05-08T10:55:20.009Z a196ab78-9bb1-4feb-9c38-57775f154a09 Task timed out after 900.10 seconds
{
"src": {
"region": "eu-west-2",
"bucket": "infinityworks-ddbimport",
"key": "data4M.csv",
"numFlds": ["year"],
"delim": "\t"
},
"cnf": {
"lambdaConcur": 8,
"lambdaDurSecs": 300
},
"tgt": {
"region": "eu-west-2",
"table": "ddbimport"
}
}
ddbimport:
name: ddbimport
definition:
Comment: "Imports data into DynamoDB in parallel."
StartAt: preflight
States:
preflight:
Type: Task
Resource:
Fn::GetAtt: [preflight, Arn]
Next: continue
continue:
Type: Choice
Choices:
- Variable: "$.prefl.cnt"
BooleanEquals: false
Next: process
Default: preflight
process:
Type: Map
InputPath: "$"
ItemsPath: "$.batches"
MaxConcurrency: 50
Parameters:
"src.$": "$.src"
"cnf.$": "$.cnf"
"tgt.$": "$.tgt"
"cols.$": "$.prefl.cols"
"range.$": "$$.Map.Item.Value"
Iterator:
StartAt: import
States:
import:
Type: Task
Resource:
Fn::GetAtt: [import, Arn]
End: true
End: true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment