Last active
December 22, 2017 19:45
-
-
Save aaronanderson/07ec17aabaa633f47b6d3b08c1c3559c to your computer and use it in GitHub Desktop.
Amazon AWS Lambda using Java SDK V2 and DynamoDB conditional updates to implement simple timestamp locking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws s3 mb s3://somebucket-dev | |
aws sqs create-queue --queue-name DeadLetter | |
#DynamoDB local: http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html | |
#java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb | |
aws dynamodb create-table --endpoint-url http://localhost:8000 --table-name DEV-SomeTable --attribute-definitions AttributeName=ID,AttributeType=S AttributeName=ReferenceID,AttributeType=S AttributeName=ArtifactType,AttributeType=S AttributeName=RepositoryID,AttributeType=S --key-schema AttributeName=ID,KeyType=HASH --global-secondary-indexes IndexName=ReferenceID,KeySchema=["{AttributeName=ReferenceID,KeyType=HASH}","{AttributeName=ArtifactType,KeyType=RANGE}"],Projection="{ProjectionType=KEYS_ONLY}",ProvisionedThroughput="{ReadCapacityUnits=5,WriteCapacityUnits=5}" IndexName=ArtifactType,KeySchema=["{AttributeName=ArtifactType","KeyType=HASH}"],Projection="{ProjectionType=KEYS_ONLY}",ProvisionedThroughput="{ReadCapacityUnits=5,WriteCapacityUnits=5}" IndexName=RepositoryID,KeySchema=["{AttributeName=RepositoryID,KeyType=HASH}"],Projection="{ProjectionType=KEYS_ONLY}",ProvisionedThroughput="{ReadCapacityUnits=5,WriteCapacityUnits=5}" --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 | |
#aws dynamodb delete-table --endpoint-url http://localhost:8000 --table-name DEV-SomeTable | |
aws iam create-role --role-name repository-manager-lambda-role --assume-role-policy-document file://repository-manager-lambda-create-role.json | |
aws iam put-role-policy --role-name repository-manager-lambda-role --policy-name LambdaServiceRolePolicy --policy-document file://repository-manager-lambda-put-role-policy.json | |
aws lambda create-function --function-name DEV-RepositoryManager --runtime java8 --role arn:aws:iam::XXXXXXXXXXXX:role/repository-manager-lambda-role --handler RepositoryHandler --timeout 300 --memory-size 1024 --dead-letter-config TargetArn=arn:aws:sqs:us-west-1:XXXXXXXXXXX:DeadLetter --environment Variables={project_stage=dev} --zip-file fileb://../repository/target/repository-SNAPSHOT.jar | |
aws lambda update-function-code --function-name DEV-RepositoryManager --zip-file fileb://../repository/target/repository-SNAPSHOT.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "lambda.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Action": [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents", | |
"logs:DescribeLogStreams" | |
], | |
"Resource": [ | |
"arn:aws:logs:*:*:*" | |
] | |
}, | |
{ | |
"Action": [ | |
"s3:GetObject", | |
"s3:PutObject", | |
"s3:GetObjectVersion", | |
"s3:ListBucket", | |
"s3:ListBucketVersions", | |
"s3:GetBucketLocation" | |
], | |
"Resource": [ | |
"arn:aws:s3:::somebucket-dev", | |
"arn:aws:s3:::somebucket-dev/*" | |
], | |
"Effect": "Allow" | |
}, | |
{ | |
"Action": [ | |
"sqs:SendMessage", | |
"sqs:ReceiveMessage" | |
], | |
"Resource": [ | |
"arn:aws:sqs:us-west-1:XXXXXXXXX:DeadLetter" | |
], | |
"Effect": "Allow" | |
}, | |
{ | |
"Action": [ | |
"dynamodb:*" | |
], | |
"Resource": [ | |
"arn:aws:dynamodb:us-west-1:XXXXXXXXX:table/DEV-SomeTable" | |
], | |
"Effect": "Allow" | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//This code has been edited to remove sensative information | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.io.StringWriter; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.time.Duration; | |
import java.time.LocalDateTime; | |
import java.time.format.DateTimeFormatter; | |
import java.time.temporal.ChronoUnit; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.UUID; | |
import javax.json.Json; | |
import javax.json.JsonObject; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.lambda.runtime.Context; | |
import com.amazonaws.services.lambda.runtime.RequestStreamHandler; | |
import RepositoryManager.RepositoryType; | |
import software.amazon.awssdk.core.regions.Region; | |
import software.amazon.awssdk.services.cloudsearch.CloudSearchClient; | |
import software.amazon.awssdk.services.dynamodb.DynamoDBClient; | |
import software.amazon.awssdk.services.dynamodb.DynamoDBClientBuilder; | |
import software.amazon.awssdk.services.dynamodb.model.AttributeValue; | |
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; | |
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; | |
import software.amazon.awssdk.services.dynamodb.model.QueryRequest; | |
import software.amazon.awssdk.services.dynamodb.model.QueryResponse; | |
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; | |
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest.Builder; | |
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; | |
import software.amazon.awssdk.services.s3.S3Client; | |
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; | |
import software.amazon.awssdk.services.s3.model.GetObjectRequest; | |
import software.amazon.awssdk.services.s3.model.GetObjectResponse; | |
import software.amazon.awssdk.services.s3.model.PutObjectRequest; | |
import software.amazon.awssdk.services.s3.model.S3Exception; | |
public class RepositoryHandler implements RequestStreamHandler { | |
static final Logger LOG = LoggerFactory.getLogger(RepositoryHandler.class); | |
final public static String PROJECT_STAGE_ENV = "project_stage"; | |
private final String projectStage; | |
private final String s3Bucket; | |
private final String dynamoDBTable; | |
private final String cloudSearchDomain; | |
private final Path tempDirectory; | |
private final S3Client s3Client; | |
private final DynamoDBClient dynamoDBClient; | |
private final CloudSearchClient cloudSearch; | |
private final ArchiveCommand archiveCommand; | |
private final RemoveCommand removeCommand; | |
public RepositoryHandler() throws IOException { | |
projectStage = System.getenv().get(PROJECT_STAGE_ENV); | |
s3Bucket = getS3Bucket(projectStage); | |
dynamoDBTable = getDynamoDBTable(projectStage); | |
cloudSearchDomain = getCloudSearchDomain(projectStage); | |
tempDirectory = Files.createTempDirectory("repository-"); | |
s3Client = S3Client.builder().region(Region.US_WEST_1).build(); | |
DynamoDBClientBuilder dynamoDBClientBuilder = DynamoDBClient.builder(); | |
/* if ("dev".equals(projectStage)) { | |
* try { | |
* dynamoDBClientBuilder.endpointOverride(new URI("http://localhost:8000")); | |
* } catch (URISyntaxException e) { | |
* LOG.error("", e); | |
* } | |
* } */ | |
dynamoDBClient = dynamoDBClientBuilder.region(Region.US_WEST_1).build(); | |
cloudSearch = CloudSearchClient.builder().region(Region.US_WEST_1).build(); | |
archiveCommand = new ArchiveCommand(this); | |
removeCommand = new RemoveCommand(); | |
} | |
public String getProjectStage() { | |
return projectStage; | |
} | |
public String getS3Bucket() { | |
return s3Bucket; | |
} | |
public String getDynamoDBTable() { | |
return dynamoDBTable; | |
} | |
public String getCloudSearchDomain() { | |
return cloudSearchDomain; | |
} | |
public Path getTempDirectory() { | |
return tempDirectory; | |
} | |
public S3Client getS3Client() { | |
return s3Client; | |
} | |
public DynamoDBClient getDynamoDBClient() { | |
return dynamoDBClient; | |
} | |
public CloudSearchClient getCloudSearch() { | |
return cloudSearch; | |
} | |
@Override | |
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { | |
JsonObject repositoryRequest = Json.createReader(input).readObject(); | |
JsonObject repositoryResponse = null; | |
StringWriter buffer = new StringWriter(); | |
Json.createWriter(buffer).writeObject(repositoryRequest); | |
LOG.info(String.format("Begin processing request %s - stage: %s", buffer, projectStage)); | |
RepositoryContext repContext = retrieveRepository(repositoryRequest); | |
String command = repositoryRequest.getString("command"); | |
if ("archive".equals(command)) { | |
repositoryResponse = archiveCommand.process(repositoryRequest, repContext); | |
} else if ("remove".equals(command)) { | |
repositoryResponse = removeCommand.process(repositoryRequest, repContext); | |
} else { | |
LOG.error(String.format("unsupported command %s", command)); | |
} | |
if (repositoryResponse != null) { | |
buffer = new StringWriter(); | |
Json.createWriter(buffer).write(repositoryResponse); | |
LOG.info(String.format("Finished processing result %s", buffer)); | |
Json.createWriter(output).write(repositoryResponse); | |
} | |
} | |
public static String getS3Bucket(String projectStage) { | |
switch (projectStage) { | |
case "dev": | |
return "somebucket-dev"; | |
case "qa": | |
return "somebucket-qa"; | |
} | |
return "somebucke"; | |
} | |
public static String getDynamoDBTable(String projectStage) { | |
switch (projectStage) { | |
case "dev": | |
return "DEV-SomeTable"; | |
case "qa": | |
return "QA-SomeTable"; | |
} | |
return "SomeTable"; | |
} | |
public static String getCloudSearchDomain(String projectStage) { | |
switch (projectStage) { | |
case "dev": | |
return "dev-some-domain"; | |
case "qa": | |
return "qa-some-domain"; | |
} | |
return "some-domain"; | |
} | |
public static interface RepositoryCommand { | |
public JsonObject process(JsonObject repositoryRequest, RepositoryContext context); | |
} | |
public class RepositoryContext { | |
private final AttributeValue dynamoDBID; | |
private final RepositoryType type; | |
private final String s3Key; | |
private final Path repositoryFilePath; | |
private final Duration TIMEOUT = Duration.of(5, ChronoUnit.MINUTES); | |
public RepositoryContext(RepositoryType type, AttributeValue dynamoDBID, String s3Key, Path repositoryFilePath) { | |
this.type = type; | |
this.dynamoDBID = dynamoDBID; | |
this.s3Key = s3Key; | |
this.repositoryFilePath = repositoryFilePath; | |
} | |
public RepositoryType getType() { | |
return type; | |
} | |
public Path getRepositoryFilePath() { | |
return repositoryFilePath; | |
} | |
public String getS3Key() { | |
return s3Key; | |
} | |
public AttributeValue getDynamoDBID() { | |
return dynamoDBID; | |
} | |
public void lock() throws InterruptedException { | |
LocalDateTime start = LocalDateTime.now(); | |
DateTimeFormatter lockTimeFormat = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); | |
while (true) { | |
Map<String, AttributeValue> keyValues = new HashMap<>(); | |
keyValues.put("ID", dynamoDBID); | |
Map<String, AttributeValue> conditionValues = new HashMap<>(); | |
conditionValues.put(":time", AttributeValue.builder().n(lockTimeFormat.format(LocalDateTime.now())).build()); | |
conditionValues.put(":timeout", AttributeValue.builder().n(lockTimeFormat.format(LocalDateTime.now().minus(TIMEOUT))).build()); | |
UpdateItemRequest lockRequest = UpdateItemRequest.builder().tableName(dynamoDBTable).key(keyValues).updateExpression("set RepositoryLock = :time").conditionExpression("attribute_not_exists(RepositoryLock) or RepositoryLock < :timeout").expressionAttributeValues(conditionValues).build(); | |
try { | |
dynamoDBClient.updateItem(lockRequest); | |
return; | |
} catch (ConditionalCheckFailedException ce) { | |
LOG.info(String.format("Unable to obtain lock for repository %s, waiting to retry", dynamoDBID.s())); | |
Thread.sleep(20000); | |
} | |
if (Duration.between(start, LocalDateTime.now()).compareTo(TIMEOUT) > 0) { | |
throw new InterruptedException(String.format("Unable to obtain repository lock for %s", dynamoDBID.s())); | |
} | |
} | |
} | |
public void unlock() { | |
Map<String, AttributeValue> keyValues = new HashMap<>(); | |
keyValues.put("ID", dynamoDBID); | |
UpdateItemRequest lockRequest = UpdateItemRequest.builder().tableName(dynamoDBTable).key(keyValues).updateExpression("REMOVE RepositoryLock").conditionExpression("attribute_exists(RepositoryLock)").build(); | |
try { | |
dynamoDBClient.updateItem(lockRequest); | |
} catch (ConditionalCheckFailedException ce) { | |
LOG.warn(String.format("Lock already removed for repository %s", dynamoDBID.s())); | |
} | |
} | |
public boolean isLocked() { | |
Map<String, AttributeValue> repoQueryValues = new HashMap<>(); | |
repoQueryValues.put(":id", dynamoDBID); | |
QueryRequest lockQuery = QueryRequest.builder().tableName(dynamoDBTable).keyConditionExpression("ID = :id").expressionAttributeValues(repoQueryValues).projectionExpression("RepositoryLock").build(); | |
QueryResponse lockResponse = dynamoDBClient.query(lockQuery); | |
if (lockResponse.count() == 0) { | |
throw new IllegalArgumentException(String.format("Unable to locate repository %s", dynamoDBID.s())); | |
} | |
Map<String, AttributeValue> itemValues = lockResponse.items().get(0); | |
AttributeValue lockTimeValue = itemValues.get("Lock"); | |
if (lockTimeValue != null) { | |
LocalDateTime lockTime = LocalDateTime.parse(lockTimeValue.s(), DateTimeFormatter.ISO_DATE_TIME); | |
if (Duration.between(lockTime, LocalDateTime.now()).compareTo(TIMEOUT) <= 0) { | |
return true; | |
} | |
} | |
return false; | |
} | |
} | |
public RepositoryContext retrieveRepository(JsonObject repositoryRequest) { | |
String repositoryIdentifer = null; | |
AttributeValue repositoryId; | |
RepositoryType repositoryType; | |
String repositoryS3Key; | |
Path repositoryLocalFile; | |
// check dynamoDB first | |
Map<String, AttributeValue> repoItemValues; | |
Map<String, AttributeValue> repoQueryValues = new HashMap<>(); | |
repoQueryValues.put(":repoID", AttributeValue.builder().s(repositoryIdentifer).build()); | |
repoQueryValues.put(":repoType", AttributeValue.builder().s("Repository").build()); | |
QueryResponse repoExistsResponse = dynamoDBClient.query(QueryRequest.builder().tableName(dynamoDBTable).indexName("ReferenceID").keyConditionExpression("ReferenceID = :repoID and ArtifactType = :repoType").expressionAttributeValues(repoQueryValues).build()); | |
if (repoExistsResponse.count() > 0) { | |
LOG.info("Found existing repository!"); | |
repositoryId = repoExistsResponse.items().get(0).get("ID"); | |
// perform a second lookup on the primary index to return the repository values | |
repoQueryValues.clear(); | |
repoQueryValues.put(":id", repositoryId); | |
QueryResponse repoResponse = dynamoDBClient.query(QueryRequest.builder().tableName(dynamoDBTable).keyConditionExpression("ID = :id").expressionAttributeValues(repoQueryValues).projectionExpression("S3Key").build()); | |
repoItemValues = repoResponse.items().get(0); | |
} else { | |
// create | |
LOG.info("Creating new repository!"); | |
repoItemValues = new HashMap<>(); | |
String repositoryFileName = String.format("%s.zip", repositoryIdentifer); | |
repositoryId = AttributeValue.builder().s(UUID.randomUUID().toString().replaceAll("-", "")).build(); | |
repoItemValues.put("ID", repositoryId); | |
repoItemValues.put("RepositoryID", repositoryId); | |
repoItemValues.put("ArtifactType", AttributeValue.builder().s("Repository").build()); | |
repoItemValues.put("ReferenceID", AttributeValue.builder().s(repositoryIdentifer).build()); | |
repoItemValues.put("S3Key", AttributeValue.builder().s(String.format("repository/%s/%s", repositoryType.getDirectoryName(), repositoryFileName)).build()); | |
dynamoDBClient.putItem(PutItemRequest.builder().tableName(dynamoDBTable).conditionExpression("attribute_not_exists(ID)").item(repoItemValues).build()); | |
} | |
repositoryS3Key = repoItemValues.get("S3Key").s(); | |
repositoryLocalFile = tempDirectory.resolve(String.format("repository-%d.zip", System.currentTimeMillis())); | |
// System.exit(-1); | |
try { | |
LOG.info(String.format("Repository: Downloading s3p://%s/%s to %s", s3Bucket, repositoryS3Key, repositoryLocalFile)); | |
GetObjectResponse response = s3Client.getObject(GetObjectRequest.builder().bucket(s3Bucket).key(repositoryS3Key).build(), repositoryLocalFile); | |
} catch (S3Exception se) { | |
LOG.info(String.format("Repository file unavailable, new repository will be created: s3p://%s/%s", s3Bucket, repositoryS3Key)); | |
} | |
return new RepositoryContext(repositoryType, repositoryId, repositoryS3Key, repositoryLocalFile); | |
} | |
public static void uploadRepository(RepositoryContext repositoryFile, S3Client s3Client, String s3Bucket) { | |
LOG.info(String.format("Repository: Uploading %s to s3p://%s/%s", repositoryFile.repositoryFilePath, s3Bucket, repositoryFile.s3Key)); | |
s3Client.putObject(PutObjectRequest.builder().bucket(s3Bucket).key(repositoryFile.s3Key).build(), repositoryFile.repositoryFilePath); | |
} | |
public static void deleteRepository(RepositoryContext repositoryFile, S3Client s3Client, String s3Bucket) { | |
LOG.info(String.format("Repository: Deleting s3p://%s/%s", s3Bucket, repositoryFile.s3Key)); | |
s3Client.deleteObject(DeleteObjectRequest.builder().bucket(s3Bucket).key(repositoryFile.s3Key).build()); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment