Last active
August 22, 2023 21:40
-
-
Save trevorhreed/9b3855a52502b61c3a7333759580b788 to your computer and use it in GitHub Desktop.
A distributed mutex service that uses S3 to manage locks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
CRITIQUE: It's missing fences | |
I need a distributed lock system for a service I'm writing. | |
(There can be multiple instances of this service running at | |
a time, and I need to make sure that only one instance does | |
a particular cron job.) | |
According to [this page](https://aws.amazon.com/s3/consistency/), | |
S3 has strong consistency, so I thought I'd make an attempt | |
at implementing a distributed lock using S3 (the cron job | |
does a bunch of processing of S3 objects, so it seemed a | |
convenient option). | |
I realize distributed locks are complicated to implement. | |
I've tested a version of this code out, and it seems to work | |
okay. I've since added a timeout/heartbeat. Now, I want | |
someone to rip it apart and explain why it shouldn't be used | |
in production because of X, Y, or Z failure in the code. | |
By way of a quick outline of the algorithm: | |
- We attempt to get a lock on the resources by putting an | |
object at a particular place in s3 | |
(see `s3InstanceLockObjectKey`). | |
- We then use the leader election pattern to determine | |
who, and in particular, if the current instance, is the | |
leader. | |
- We make a request to get all of the lock objects in S3 | |
and ignore any that are prior to the timeout period | |
(e.g. any lock objects older than 10 minutes are | |
considered stale). | |
- We sort the lock objects first by last modified date and | |
then lexicographically, and choose the first one as the | |
leader. So, we should get whoever put the lock object | |
into S3 first, and if several did at the same time, the | |
lexicographical sorting makes the final determination. | |
— If the current instance is the leader, we setup a scheduled | |
task to maintain leadership by continually updating the | |
instance's lock object in S3 so that it doesn't become | |
stale. Finally, we return true from the `requestLock` | |
method, otherwise false. | |
- The `releaseLock` cancels the scheduled task and deletes | |
any lock objects in S3. | |
*/ | |
import software.amazon.awssdk.core.sync.RequestBody; | |
import software.amazon.awssdk.services.s3.S3Client; | |
import software.amazon.awssdk.services.s3.model.Delete; | |
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; | |
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; | |
import software.amazon.awssdk.services.s3.model.ObjectIdentifier; | |
import software.amazon.awssdk.services.s3.model.PutObjectRequest; | |
import software.amazon.awssdk.services.s3.model.S3Object; | |
import java.time.Instant; | |
import java.time.temporal.ChronoUnit; | |
import java.util.Comparator; | |
import java.util.List; | |
import java.util.UUID; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import java.util.stream.Stream; | |
public class DistributedS3LockService { | |
public static class Builder { | |
private S3Client s3Client; | |
private String bucketName; | |
private String s3LockObjectPrefix = "__distributedS3Lock"; | |
private long timeoutPeriod = 5; | |
private TimeUnit timeoutUnit = TimeUnit.MINUTES; | |
public Builder s3Client(S3Client s3Client) { | |
this.s3Client = s3Client; | |
return this; | |
} | |
public Builder bucketName(String bucketName) { | |
this.bucketName = bucketName; | |
return this; | |
} | |
public Builder s3LockObjectPrefix(String s3LockObjectPrefix) { | |
this.s3LockObjectPrefix = s3LockObjectPrefix; | |
return this; | |
} | |
public Builder timeoutPeriod(long timeoutPeriod) { | |
this.timeoutPeriod = timeoutPeriod; | |
return this; | |
} | |
public Builder timeoutUnit(TimeUnit timeoutUnit) { | |
this.timeoutUnit = timeoutUnit; | |
return this; | |
} | |
public DistributedS3LockService build() { | |
if (s3Client == null) { | |
throw new IllegalArgumentException("s3Client is required"); | |
} | |
if (bucketName == null) { | |
throw new IllegalArgumentException("bucketName is required"); | |
} | |
return new DistributedS3LockService( | |
s3Client, bucketName, s3LockObjectPrefix, timeoutPeriod, timeoutUnit); | |
} | |
} | |
private static final String INSTANCE_ID = UUID | |
.randomUUID() | |
.toString() | |
.replaceAll("-", ""); | |
private final S3Client s3Client; | |
private final String bucketName; | |
private final String s3LockObjectPrefix; | |
private final String s3InstanceLockObjectKey; | |
private final long timeoutPeriod; | |
private final TimeUnit timeoutUnit; | |
private ScheduledExecutorService scheduledExecutorService; | |
public DistributedS3LockService( | |
S3Client s3Client, | |
String bucketName, | |
String s3LockObjectPrefix, | |
long timeoutPeriod, | |
TimeUnit timeoutUnit) { | |
this.s3Client = s3Client; | |
this.bucketName = bucketName; | |
this.s3LockObjectPrefix = s3LockObjectPrefix; | |
this.s3InstanceLockObjectKey = "%s_%s".formatted(s3LockObjectPrefix, INSTANCE_ID); | |
this.timeoutPeriod = timeoutPeriod; | |
this.timeoutUnit = timeoutUnit; | |
} | |
public boolean requestLock() { | |
putLockObject(); | |
if (isLeader()) { | |
scheduleLockMaintenance(); | |
return true; | |
} | |
return false; | |
} | |
public boolean releaseLock() { | |
scheduledExecutorService.shutdown(); | |
try { | |
return scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS); | |
} | |
catch (InterruptedException e) { | |
return false; | |
} | |
finally { | |
scheduledExecutorService = null; | |
deleteByPrefix(s3InstanceLockObjectKey); | |
} | |
} | |
private void scheduleLockMaintenance() { | |
if (scheduledExecutorService == null) { | |
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); | |
} | |
scheduledExecutorService.scheduleAtFixedRate(this::putLockObject, 0, timeoutPeriod, timeoutUnit); | |
} | |
private boolean isLeader() { | |
return s3InstanceLockObjectKey.equals( | |
getObjectsByPrefix(s3LockObjectPrefix) | |
.filter(o -> o.lastModified().isAfter(Instant.now().minus(5, ChronoUnit.MINUTES))) | |
.min(Comparator.comparing(S3Object::lastModified).thenComparing(S3Object::key)) | |
.map(S3Object::key) | |
.orElse(null)); | |
} | |
private Stream<S3Object> getObjectsByPrefix(String prefix) { | |
return s3Client.listObjectsV2Paginator(ListObjectsV2Request.builder() | |
.bucket(bucketName) | |
.prefix(prefix) | |
.build()) | |
.stream() | |
.flatMap(response -> response.contents().stream()); | |
} | |
private void putLockObject() { | |
s3Client.putObject( | |
PutObjectRequest.builder() | |
.bucket(bucketName) | |
.key(s3InstanceLockObjectKey) | |
.build(), | |
RequestBody.fromString("")); | |
} | |
private void deleteByPrefix(String prefix) { | |
List<ObjectIdentifier> objects = getObjectsByPrefix(prefix) | |
.map(o -> ObjectIdentifier.builder() | |
.key(o.key()) | |
.build()) | |
.toList(); | |
if (objects.isEmpty()) { | |
return; | |
} | |
s3Client.deleteObjects(DeleteObjectsRequest.builder() | |
.bucket(bucketName) | |
.delete(Delete.builder() | |
.objects(objects) | |
.build()) | |
.build()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment