Skip to content

Instantly share code, notes, and snippets.

@mwmitchell
Created June 22, 2021 00:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwmitchell/9958d53d6b60481a2d401d658373921b to your computer and use it in GitHub Desktop.
Save mwmitchell/9958d53d6b60481a2d401d658373921b to your computer and use it in GitHub Desktop.
package com.lucidworks.connectors.jobs.leader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.redisson.api.RLockReactive;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
public class RedisLeaderElection implements LeaderElection {
private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class);
private static final int WAIT_SECONDS = 5;
private final String resourceId;
private final RedissonReactiveClient redissonClient;
private final List<Listener> listeners = new ArrayList<>();
private final AtomicBoolean isLeader = new AtomicBoolean();
private final AtomicBoolean stopping = new AtomicBoolean();
private Disposable subscription;
public RedisLeaderElection(RedissonClient redissonClient, String resourceId) {
this.resourceId = resourceId;
this.redissonClient = redissonClient.reactive();
}
public boolean isLeader() {
return isLeader.get();
}
@PostConstruct
@Override
public void start() {
logger.info("Starting leadership election for resourceId={}", resourceId);
RSemaphoreReactive sem = redissonClient.getSemaphore(resourceId);
subscription = sem
.trySetPermits(1)
.flatMap(__ -> sem.tryAcquire(1, WAIT_SECONDS, TimeUnit.SECONDS))
.flatMap(result -> {
if (result) {
return Mono.just(true);
}
return Mono.error(new RuntimeException());
})
.retry()
.repeat()
.doFinally(signalType -> {
try {
if(isLeader.get()){
logger.info("Unlocking resourceId={}", resourceId);
sem.release().flatMap(__ -> sem.drainPermits()).block();
logger.info("Unlocked resourceId={}", resourceId);
}
} catch (Exception e) {
logger.info("Error while unlocking resourceId={}, error={}", resourceId, e.getMessage());
} finally {
logger.info("Leader election subscription for resourceId={} has been disposed", resourceId);
}
})
.subscribe(lockObtained -> {
if (lockObtained) {
if (isLeader.compareAndSet(false, true)) {
listeners.forEach(Listener::onElected);
}
} else {
if (isLeader.compareAndSet(true, false)) {
listeners.forEach(Listener::onUnElected);
}
}
});
}
@PreDestroy
@Override
public void stop() {
if (subscription != null) {
subscription.dispose();
}
isLeader.set(false);
logger.info("Leadership instance for resourceId={} shutdown and released", resourceId);
}
@Override
public void addListener(Listener electionListener) {
if (listeners.contains(electionListener)) {
return;
}
listeners.add(electionListener);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment