Skip to content

Instantly share code, notes, and snippets.

@aaronanderson
Last active December 22, 2017 19:45
Show Gist options
  • Save aaronanderson/07ec17aabaa633f47b6d3b08c1c3559c to your computer and use it in GitHub Desktop.
Save aaronanderson/07ec17aabaa633f47b6d3b08c1c3559c to your computer and use it in GitHub Desktop.
Amazon AWS Lambda using Java SDK V2 and DynamoDB conditional updates to implement simple timestamp locking
aws s3 mb s3://somebucket-dev
aws sqs create-queue --queue-name DeadLetter
#DynamoDB local: http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html
#java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb
aws dynamodb create-table --endpoint-url http://localhost:8000 --table-name DEV-SomeTable --attribute-definitions AttributeName=ID,AttributeType=S AttributeName=ReferenceID,AttributeType=S AttributeName=ArtifactType,AttributeType=S AttributeName=RepositoryID,AttributeType=S --key-schema AttributeName=ID,KeyType=HASH --global-secondary-indexes IndexName=ReferenceID,KeySchema=["{AttributeName=ReferenceID,KeyType=HASH}","{AttributeName=ArtifactType,KeyType=RANGE}"],Projection="{ProjectionType=KEYS_ONLY}",ProvisionedThroughput="{ReadCapacityUnits=5,WriteCapacityUnits=5}" IndexName=ArtifactType,KeySchema=["{AttributeName=ArtifactType","KeyType=HASH}"],Projection="{ProjectionType=KEYS_ONLY}",ProvisionedThroughput="{ReadCapacityUnits=5,WriteCapacityUnits=5}" IndexName=RepositoryID,KeySchema=["{AttributeName=RepositoryID,KeyType=HASH}"],Projection="{ProjectionType=KEYS_ONLY}",ProvisionedThroughput="{ReadCapacityUnits=5,WriteCapacityUnits=5}" --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
#aws dynamodb delete-table --endpoint-url http://localhost:8000 --table-name DEV-SomeTable
aws iam create-role --role-name repository-manager-lambda-role --assume-role-policy-document file://repository-manager-lambda-create-role.json
aws iam put-role-policy --role-name repository-manager-lambda-role --policy-name LambdaServiceRolePolicy --policy-document file://repository-manager-lambda-put-role-policy.json
aws lambda create-function --function-name DEV-RepositoryManager --runtime java8 --role arn:aws:iam::XXXXXXXXXXXX:role/repository-manager-lambda-role --handler RepositoryHandler --timeout 300 --memory-size 1024 --dead-letter-config TargetArn=arn:aws:sqs:us-west-1:XXXXXXXXXXX:DeadLetter --environment Variables={project_stage=dev} --zip-file fileb://../repository/target/repository-SNAPSHOT.jar
aws lambda update-function-code --function-name DEV-RepositoryManager --zip-file fileb://../repository/target/repository-SNAPSHOT.jar
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
],
"Resource": [
"arn:aws:logs:*:*:*"
]
},
{
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:GetObjectVersion",
"s3:ListBucket",
"s3:ListBucketVersions",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::somebucket-dev",
"arn:aws:s3:::somebucket-dev/*"
],
"Effect": "Allow"
},
{
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage"
],
"Resource": [
"arn:aws:sqs:us-west-1:XXXXXXXXX:DeadLetter"
],
"Effect": "Allow"
},
{
"Action": [
"dynamodb:*"
],
"Resource": [
"arn:aws:dynamodb:us-west-1:XXXXXXXXX:table/DEV-SomeTable"
],
"Effect": "Allow"
}
]
}
//This code has been edited to remove sensative information
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import RepositoryManager.RepositoryType;
import software.amazon.awssdk.core.regions.Region;
import software.amazon.awssdk.services.cloudsearch.CloudSearchClient;
import software.amazon.awssdk.services.dynamodb.DynamoDBClient;
import software.amazon.awssdk.services.dynamodb.DynamoDBClientBuilder;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest.Builder;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
public class RepositoryHandler implements RequestStreamHandler {
static final Logger LOG = LoggerFactory.getLogger(RepositoryHandler.class);
final public static String PROJECT_STAGE_ENV = "project_stage";
private final String projectStage;
private final String s3Bucket;
private final String dynamoDBTable;
private final String cloudSearchDomain;
private final Path tempDirectory;
private final S3Client s3Client;
private final DynamoDBClient dynamoDBClient;
private final CloudSearchClient cloudSearch;
private final ArchiveCommand archiveCommand;
private final RemoveCommand removeCommand;
public RepositoryHandler() throws IOException {
projectStage = System.getenv().get(PROJECT_STAGE_ENV);
s3Bucket = getS3Bucket(projectStage);
dynamoDBTable = getDynamoDBTable(projectStage);
cloudSearchDomain = getCloudSearchDomain(projectStage);
tempDirectory = Files.createTempDirectory("repository-");
s3Client = S3Client.builder().region(Region.US_WEST_1).build();
DynamoDBClientBuilder dynamoDBClientBuilder = DynamoDBClient.builder();
/* if ("dev".equals(projectStage)) {
* try {
* dynamoDBClientBuilder.endpointOverride(new URI("http://localhost:8000"));
* } catch (URISyntaxException e) {
* LOG.error("", e);
* }
* } */
dynamoDBClient = dynamoDBClientBuilder.region(Region.US_WEST_1).build();
cloudSearch = CloudSearchClient.builder().region(Region.US_WEST_1).build();
archiveCommand = new ArchiveCommand(this);
removeCommand = new RemoveCommand();
}
public String getProjectStage() {
return projectStage;
}
public String getS3Bucket() {
return s3Bucket;
}
public String getDynamoDBTable() {
return dynamoDBTable;
}
public String getCloudSearchDomain() {
return cloudSearchDomain;
}
public Path getTempDirectory() {
return tempDirectory;
}
public S3Client getS3Client() {
return s3Client;
}
public DynamoDBClient getDynamoDBClient() {
return dynamoDBClient;
}
public CloudSearchClient getCloudSearch() {
return cloudSearch;
}
@Override
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
JsonObject repositoryRequest = Json.createReader(input).readObject();
JsonObject repositoryResponse = null;
StringWriter buffer = new StringWriter();
Json.createWriter(buffer).writeObject(repositoryRequest);
LOG.info(String.format("Begin processing request %s - stage: %s", buffer, projectStage));
RepositoryContext repContext = retrieveRepository(repositoryRequest);
String command = repositoryRequest.getString("command");
if ("archive".equals(command)) {
repositoryResponse = archiveCommand.process(repositoryRequest, repContext);
} else if ("remove".equals(command)) {
repositoryResponse = removeCommand.process(repositoryRequest, repContext);
} else {
LOG.error(String.format("unsupported command %s", command));
}
if (repositoryResponse != null) {
buffer = new StringWriter();
Json.createWriter(buffer).write(repositoryResponse);
LOG.info(String.format("Finished processing result %s", buffer));
Json.createWriter(output).write(repositoryResponse);
}
}
public static String getS3Bucket(String projectStage) {
switch (projectStage) {
case "dev":
return "somebucket-dev";
case "qa":
return "somebucket-qa";
}
return "somebucke";
}
public static String getDynamoDBTable(String projectStage) {
switch (projectStage) {
case "dev":
return "DEV-SomeTable";
case "qa":
return "QA-SomeTable";
}
return "SomeTable";
}
public static String getCloudSearchDomain(String projectStage) {
switch (projectStage) {
case "dev":
return "dev-some-domain";
case "qa":
return "qa-some-domain";
}
return "some-domain";
}
public static interface RepositoryCommand {
public JsonObject process(JsonObject repositoryRequest, RepositoryContext context);
}
public class RepositoryContext {
private final AttributeValue dynamoDBID;
private final RepositoryType type;
private final String s3Key;
private final Path repositoryFilePath;
private final Duration TIMEOUT = Duration.of(5, ChronoUnit.MINUTES);
public RepositoryContext(RepositoryType type, AttributeValue dynamoDBID, String s3Key, Path repositoryFilePath) {
this.type = type;
this.dynamoDBID = dynamoDBID;
this.s3Key = s3Key;
this.repositoryFilePath = repositoryFilePath;
}
public RepositoryType getType() {
return type;
}
public Path getRepositoryFilePath() {
return repositoryFilePath;
}
public String getS3Key() {
return s3Key;
}
public AttributeValue getDynamoDBID() {
return dynamoDBID;
}
public void lock() throws InterruptedException {
LocalDateTime start = LocalDateTime.now();
DateTimeFormatter lockTimeFormat = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
while (true) {
Map<String, AttributeValue> keyValues = new HashMap<>();
keyValues.put("ID", dynamoDBID);
Map<String, AttributeValue> conditionValues = new HashMap<>();
conditionValues.put(":time", AttributeValue.builder().n(lockTimeFormat.format(LocalDateTime.now())).build());
conditionValues.put(":timeout", AttributeValue.builder().n(lockTimeFormat.format(LocalDateTime.now().minus(TIMEOUT))).build());
UpdateItemRequest lockRequest = UpdateItemRequest.builder().tableName(dynamoDBTable).key(keyValues).updateExpression("set RepositoryLock = :time").conditionExpression("attribute_not_exists(RepositoryLock) or RepositoryLock < :timeout").expressionAttributeValues(conditionValues).build();
try {
dynamoDBClient.updateItem(lockRequest);
return;
} catch (ConditionalCheckFailedException ce) {
LOG.info(String.format("Unable to obtain lock for repository %s, waiting to retry", dynamoDBID.s()));
Thread.sleep(20000);
}
if (Duration.between(start, LocalDateTime.now()).compareTo(TIMEOUT) > 0) {
throw new InterruptedException(String.format("Unable to obtain repository lock for %s", dynamoDBID.s()));
}
}
}
public void unlock() {
Map<String, AttributeValue> keyValues = new HashMap<>();
keyValues.put("ID", dynamoDBID);
UpdateItemRequest lockRequest = UpdateItemRequest.builder().tableName(dynamoDBTable).key(keyValues).updateExpression("REMOVE RepositoryLock").conditionExpression("attribute_exists(RepositoryLock)").build();
try {
dynamoDBClient.updateItem(lockRequest);
} catch (ConditionalCheckFailedException ce) {
LOG.warn(String.format("Lock already removed for repository %s", dynamoDBID.s()));
}
}
public boolean isLocked() {
Map<String, AttributeValue> repoQueryValues = new HashMap<>();
repoQueryValues.put(":id", dynamoDBID);
QueryRequest lockQuery = QueryRequest.builder().tableName(dynamoDBTable).keyConditionExpression("ID = :id").expressionAttributeValues(repoQueryValues).projectionExpression("RepositoryLock").build();
QueryResponse lockResponse = dynamoDBClient.query(lockQuery);
if (lockResponse.count() == 0) {
throw new IllegalArgumentException(String.format("Unable to locate repository %s", dynamoDBID.s()));
}
Map<String, AttributeValue> itemValues = lockResponse.items().get(0);
AttributeValue lockTimeValue = itemValues.get("Lock");
if (lockTimeValue != null) {
LocalDateTime lockTime = LocalDateTime.parse(lockTimeValue.s(), DateTimeFormatter.ISO_DATE_TIME);
if (Duration.between(lockTime, LocalDateTime.now()).compareTo(TIMEOUT) <= 0) {
return true;
}
}
return false;
}
}
public RepositoryContext retrieveRepository(JsonObject repositoryRequest) {
String repositoryIdentifer = null;
AttributeValue repositoryId;
RepositoryType repositoryType;
String repositoryS3Key;
Path repositoryLocalFile;
// check dynamoDB first
Map<String, AttributeValue> repoItemValues;
Map<String, AttributeValue> repoQueryValues = new HashMap<>();
repoQueryValues.put(":repoID", AttributeValue.builder().s(repositoryIdentifer).build());
repoQueryValues.put(":repoType", AttributeValue.builder().s("Repository").build());
QueryResponse repoExistsResponse = dynamoDBClient.query(QueryRequest.builder().tableName(dynamoDBTable).indexName("ReferenceID").keyConditionExpression("ReferenceID = :repoID and ArtifactType = :repoType").expressionAttributeValues(repoQueryValues).build());
if (repoExistsResponse.count() > 0) {
LOG.info("Found existing repository!");
repositoryId = repoExistsResponse.items().get(0).get("ID");
// perform a second lookup on the primary index to return the repository values
repoQueryValues.clear();
repoQueryValues.put(":id", repositoryId);
QueryResponse repoResponse = dynamoDBClient.query(QueryRequest.builder().tableName(dynamoDBTable).keyConditionExpression("ID = :id").expressionAttributeValues(repoQueryValues).projectionExpression("S3Key").build());
repoItemValues = repoResponse.items().get(0);
} else {
// create
LOG.info("Creating new repository!");
repoItemValues = new HashMap<>();
String repositoryFileName = String.format("%s.zip", repositoryIdentifer);
repositoryId = AttributeValue.builder().s(UUID.randomUUID().toString().replaceAll("-", "")).build();
repoItemValues.put("ID", repositoryId);
repoItemValues.put("RepositoryID", repositoryId);
repoItemValues.put("ArtifactType", AttributeValue.builder().s("Repository").build());
repoItemValues.put("ReferenceID", AttributeValue.builder().s(repositoryIdentifer).build());
repoItemValues.put("S3Key", AttributeValue.builder().s(String.format("repository/%s/%s", repositoryType.getDirectoryName(), repositoryFileName)).build());
dynamoDBClient.putItem(PutItemRequest.builder().tableName(dynamoDBTable).conditionExpression("attribute_not_exists(ID)").item(repoItemValues).build());
}
repositoryS3Key = repoItemValues.get("S3Key").s();
repositoryLocalFile = tempDirectory.resolve(String.format("repository-%d.zip", System.currentTimeMillis()));
// System.exit(-1);
try {
LOG.info(String.format("Repository: Downloading s3p://%s/%s to %s", s3Bucket, repositoryS3Key, repositoryLocalFile));
GetObjectResponse response = s3Client.getObject(GetObjectRequest.builder().bucket(s3Bucket).key(repositoryS3Key).build(), repositoryLocalFile);
} catch (S3Exception se) {
LOG.info(String.format("Repository file unavailable, new repository will be created: s3p://%s/%s", s3Bucket, repositoryS3Key));
}
return new RepositoryContext(repositoryType, repositoryId, repositoryS3Key, repositoryLocalFile);
}
public static void uploadRepository(RepositoryContext repositoryFile, S3Client s3Client, String s3Bucket) {
LOG.info(String.format("Repository: Uploading %s to s3p://%s/%s", repositoryFile.repositoryFilePath, s3Bucket, repositoryFile.s3Key));
s3Client.putObject(PutObjectRequest.builder().bucket(s3Bucket).key(repositoryFile.s3Key).build(), repositoryFile.repositoryFilePath);
}
public static void deleteRepository(RepositoryContext repositoryFile, S3Client s3Client, String s3Bucket) {
LOG.info(String.format("Repository: Deleting s3p://%s/%s", s3Bucket, repositoryFile.s3Key));
s3Client.deleteObject(DeleteObjectRequest.builder().bucket(s3Bucket).key(repositoryFile.s3Key).build());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment