Skip to content

Instantly share code, notes, and snippets.

@craigleonard
Created November 1, 2017 17:39
Show Gist options
  • Save craigleonard/a783cf3c9979a5df2b1a19090a1d08f9 to your computer and use it in GitHub Desktop.
Save craigleonard/a783cf3c9979a5df2b1a19090a1d08f9 to your computer and use it in GitHub Desktop.
Cassandra Leadership Election
package com.github.craigleonard.leadershipelection;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import java.net.InetSocketAddress;
public class Client {
static final long LEASE_TIME = 10;
static final long HEARTBEAT_TIME = 7;
static final String ELECTION_NAME = "test-election";
public static void main(String[] args) throws InterruptedException {
String clientName = args[0];
Session session = Cluster.builder().addContactPointsWithPorts(new InetSocketAddress("127.0.0.1", 9042)).build().connect();
PreparedStatement tryAcquire = session.prepare("INSERT INTO LE.ELECTIONS (ELECTION, WINNER) VALUES (?,?) IF NOT EXISTS USING TTL " + LEASE_TIME);
PreparedStatement heartbeat = session.prepare("UPDATE LE.ELECTIONS USING TTL " + LEASE_TIME + " SET WINNER = ? WHERE ELECTION = ? IF WINNER = ?");
while (true) {
// try acquire lock
boolean isLeader = session.execute(tryAcquire.bind(ELECTION_NAME, clientName)).wasApplied();
while (isLeader) {
System.out.println("Client " + clientName + " is the leader");
Thread.sleep(HEARTBEAT_TIME * 1000);
isLeader = session.execute(heartbeat.bind(clientName, ELECTION_NAME, clientName)).wasApplied();
}
System.out.println("Client " + clientName + " is not the leader");
Thread.sleep(LEASE_TIME * 1000);
}
}
}
}
CREATE KEYSPACE LE WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };
CREATE TABLE IF NOT EXISTS LE.ELECTIONS (
ELECTION TEXT,
WINNER TEXT
PRIMARY KEY(ELECTION)
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment