Skip to content

Instantly share code, notes, and snippets.

@trevorhreed
Last active August 22, 2023 21:40
Show Gist options
  • Save trevorhreed/9b3855a52502b61c3a7333759580b788 to your computer and use it in GitHub Desktop.
Save trevorhreed/9b3855a52502b61c3a7333759580b788 to your computer and use it in GitHub Desktop.
A distributed mutex service that uses S3 to manage locks.
/*
CRITIQUE: It's missing fences
I need a distributed lock system for a service I'm writing.
(There can be multiple instances of this service running at
a time, and I need to make sure that only one instance does
a particular cron job.)
According to [this page](https://aws.amazon.com/s3/consistency/),
S3 has strong consistency, so I thought I'd make an attempt
at implementing a distributed lock using S3 (the cron job
does a bunch of processing of S3 objects, so it seemed a
convenient option).
I realize distributed locks are complicated to implement.
I've tested a version of this code out, and it seems to work
okay. I've since added a timeout/heartbeat. Now, I want
someone to rip it apart and explain why it shouldn't be used
in production because of X, Y, or Z failure in the code.
By way of a quick outline of the algorithm:
- We attempt to get a lock on the resources by putting an
object at a particular place in s3
(see `s3InstanceLockObjectKey`).
- We then use the leader election pattern to determine
who, and in particular, if the current instance, is the
leader.
- We make a request to get all of the lock objects in S3
and ignore any that are prior to the timeout period
(e.g. any lock objects older than 10 minutes are
considered stale).
- We sort the lock objects first by last modified date and
then lexicographically, and choose the first one as the
leader. So, we should get whoever put the lock object
into S3 first, and if several did at the same time, the
lexicographical sorting makes the final determination.
— If the current instance is the leader, we setup a scheduled
task to maintain leadership by continually updating the
instance's lock object in S3 so that it doesn't become
stale. Finally, we return true from the `requestLock`
method, otherwise false.
- The `releaseLock` cancels the scheduled task and deletes
any lock objects in S3.
*/
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public class DistributedS3LockService {
public static class Builder {
private S3Client s3Client;
private String bucketName;
private String s3LockObjectPrefix = "__distributedS3Lock";
private long timeoutPeriod = 5;
private TimeUnit timeoutUnit = TimeUnit.MINUTES;
public Builder s3Client(S3Client s3Client) {
this.s3Client = s3Client;
return this;
}
public Builder bucketName(String bucketName) {
this.bucketName = bucketName;
return this;
}
public Builder s3LockObjectPrefix(String s3LockObjectPrefix) {
this.s3LockObjectPrefix = s3LockObjectPrefix;
return this;
}
public Builder timeoutPeriod(long timeoutPeriod) {
this.timeoutPeriod = timeoutPeriod;
return this;
}
public Builder timeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = timeoutUnit;
return this;
}
public DistributedS3LockService build() {
if (s3Client == null) {
throw new IllegalArgumentException("s3Client is required");
}
if (bucketName == null) {
throw new IllegalArgumentException("bucketName is required");
}
return new DistributedS3LockService(
s3Client, bucketName, s3LockObjectPrefix, timeoutPeriod, timeoutUnit);
}
}
private static final String INSTANCE_ID = UUID
.randomUUID()
.toString()
.replaceAll("-", "");
private final S3Client s3Client;
private final String bucketName;
private final String s3LockObjectPrefix;
private final String s3InstanceLockObjectKey;
private final long timeoutPeriod;
private final TimeUnit timeoutUnit;
private ScheduledExecutorService scheduledExecutorService;
public DistributedS3LockService(
S3Client s3Client,
String bucketName,
String s3LockObjectPrefix,
long timeoutPeriod,
TimeUnit timeoutUnit) {
this.s3Client = s3Client;
this.bucketName = bucketName;
this.s3LockObjectPrefix = s3LockObjectPrefix;
this.s3InstanceLockObjectKey = "%s_%s".formatted(s3LockObjectPrefix, INSTANCE_ID);
this.timeoutPeriod = timeoutPeriod;
this.timeoutUnit = timeoutUnit;
}
public boolean requestLock() {
putLockObject();
if (isLeader()) {
scheduleLockMaintenance();
return true;
}
return false;
}
public boolean releaseLock() {
scheduledExecutorService.shutdown();
try {
return scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
return false;
}
finally {
scheduledExecutorService = null;
deleteByPrefix(s3InstanceLockObjectKey);
}
}
private void scheduleLockMaintenance() {
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}
scheduledExecutorService.scheduleAtFixedRate(this::putLockObject, 0, timeoutPeriod, timeoutUnit);
}
private boolean isLeader() {
return s3InstanceLockObjectKey.equals(
getObjectsByPrefix(s3LockObjectPrefix)
.filter(o -> o.lastModified().isAfter(Instant.now().minus(5, ChronoUnit.MINUTES)))
.min(Comparator.comparing(S3Object::lastModified).thenComparing(S3Object::key))
.map(S3Object::key)
.orElse(null));
}
private Stream<S3Object> getObjectsByPrefix(String prefix) {
return s3Client.listObjectsV2Paginator(ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(prefix)
.build())
.stream()
.flatMap(response -> response.contents().stream());
}
private void putLockObject() {
s3Client.putObject(
PutObjectRequest.builder()
.bucket(bucketName)
.key(s3InstanceLockObjectKey)
.build(),
RequestBody.fromString(""));
}
private void deleteByPrefix(String prefix) {
List<ObjectIdentifier> objects = getObjectsByPrefix(prefix)
.map(o -> ObjectIdentifier.builder()
.key(o.key())
.build())
.toList();
if (objects.isEmpty()) {
return;
}
s3Client.deleteObjects(DeleteObjectsRequest.builder()
.bucket(bucketName)
.delete(Delete.builder()
.objects(objects)
.build())
.build());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment