Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tunggnu/33b452465f17be366bd7e972d6cefc42 to your computer and use it in GitHub Desktop.
Save tunggnu/33b452465f17be366bd7e972d6cefc42 to your computer and use it in GitHub Desktop.
A pattern for creating a reliable unique constraint in Cassandra

Unique Constraints in Cassandra

Cassandra is a NoSQL database, and is meant for different applications than traditional relational databases. Nevertheless, developers will want (and try) to implement RDBMS features in Cassandra. Unique constraints is one such example.

Many people have asked about creating unique constraints in Cassandra: 1, 2, 3, 4. There is no definitive answer. The most popular answer seems to be to not even try it. Square peg, round hole. If you need unique constraints, Cassandra may not be the tool for the job. Others have suggested to use LWT/CAS. It should be noted that, simply using LWT/CAS does not give us reliable unique constraints that we get in traditional RDBMS systems. And finally, people have suggested using full blown locking solutions such as zookeeper.

The example in the official LWT documentation demonstrates using LWT to prevent two users from creating an account with the same customerID. However, due to the ambiguity of write failures this approach is not reliable. Or, at least, can not reliably indicate success or failure to a caller.

What follows is a pattern for creating a reliable unique constraint in Cassandra using leases and LWTs.

First Attempt (Incomplete)

A first attempt is to simply use LWT. First we define the table as follows:

    CREATE TABLE customer_account (
        customerID text PRIMARY KEY,
        customer_email text
    );

Now, to create a customer_account row using the datastax java library, we'd do something like:

public void create (Customer customer) {
    try {
        Statement stmt = new SimpleStatement("INSERT INTO customer_account (customerID, customer_email) VALUES (?, ?) IF NOT EXISTS", customer.getID(), customer.getEmail());
        stmt.setConsistencyLevel(ConsistencyLevel.QUORUM);
        ResultSet rs = session.execute(stmt);
        if (false == rs.wasApplied()) {
            throw new DuplicateCustomerIDException("User with the Customer ID {} already exists", customer.getID());
        }
    } catch (WriteTimeoutException wte) {
        // Data may or may not be written
        // TODO: Add awe inspiring code here
    } catch (UnavailableException uae) {
        // Data may or may not be written
        // TODO: Add awe inspiring code here
    } catch (Exception e) {
        // Data will not be written. Fall through.
        throw e;
    }
}

According to Cassandra error handling done right, write exceptions thrown for CAS operations may or may not result in the data actually being written. Since I want to reliably indicate if the data was written, this does not work for me.

Second Attempt (Close but no cigar)

In our second attempt, due to the ambiguity in Cassandra write exceptions, we move the CAS into our code and out of the Cassandra. Further, notice that on any error we assume complete failure, and try to delete the data (This part of the approach was inspired from here).

public void create (Customer customer) {
    try {
        if (false == isCustomerUnique(customer)) {
            throw new DuplicateCustomerIDException("User with the Customer ID {} already exists", customer.getID());
        }
        
        Statement stmt = new SimpleStatement("INSERT INTO customer_account (customerID, customer_email) VALUES (?, ?)", customer.getID(), customer.getEmail());
        stmt.setConsistencyLevel(ConsistencyLevel.QUORUM);
        ResultSet rs = session.execute(stmt);
        if (false == rs.wasApplied()) {
            throw new DuplicateCustomerIDException("User with the Customer ID {} already exists", customer.getID());
        }
    } catch (Exception e) {
        // Any other exception is from the Cassandra driver. We should assume failure, and delete any created rows.
        Statement stmt = new SimpleStatement("DELETE FROM customer_account WHERE customerID=?", customer.getID());
        stmt.setConsistencyLevel(ConsistencyLevel.ANY);
        session.execute(stmt);
    }
}

This approach has the issue that two threads writing the same customerID simultaneously, could potentially determine that their customerID is unique and try to write the data, in which case, both may try to write the data. While two threads trying to write the data is okay, the issue is that on failure, one thread might delete the other threads data - that is very, very, bad.

Third attempt

In our third attempt, we basically block two threads from ever working on a customer with the same name. We use a lock/lease mechanism inspired from here, and we use a TTL on the lock/lease that will be greater than the time required to create a row.

First, we create a new table as follows:

    CREATE TABLE customer_lock (
        customerID text PRIMARY KEY,
    );

And modify the create() method as follows:

public void create (Customer customer) {
    boolean lock = false;
        
    try {
        lock = acquire(customer);
        if (false == lock) {
            throw new CustomerCreateException("Error creating customer with ID {}. Please try again.", customer.getID());
        }
    
        if (false == isCustomerUnique(customer)) {
            throw new DuplicateCustomerIDException("User with the Customer ID {} already exists", customer.getID());
        }
        
        Statement stmt = new SimpleStatement("INSERT INTO customer_account (customerID, customer_email) VALUES (?, ?)", customer.getID(), customer.getEmail());
        stmt.setConsistencyLevel(ConsistencyLevel.QUORUM);
        ResultSet rs = session.execute(stmt);
        if (false == rs.wasApplied()) {
            throw new DuplicateCustomerIDException("User with the Customer ID {} already exists", customer.getID());
        }
    } catch (CustomerCreateException cce) {
        // This is an exception from our internal validation. Just rethrow.
        throw cce;
    } catch (DuplicateCustomerIDException dcie) {
        // This indicates that customerID already existed. Just rethrow.
        throw dcie;
    } catch (Exception e) {
        // Any other exception is from the Cassandra driver. We should assume failure, and delete any created rows.
        Statement stmt = new SimpleStatement("DELETE FROM customer_account WHERE customerID=?", customer.getID());
        stmt.setConsistencyLevel(ConsistencyLevel.ANY);
        session.execute(stmt);
    } finally {
        if (lock) {
            release(customer);
        }
    }
}

Appendix - Helper methods

The helper methods used above are described here.

/*
 * Helper methods must only throw exceptions of type CustomerCreateException
 */

/*
 * This method acquires a lock 
 * Note the TTL should be long enough to allow a row to be created successfully.
 */
public boolean acquire (Customer customer) {
    try {
        Statement statement = new SimpleStatement("INSERT INTO customer_lock (customerID) VALUES (?) IF NOT EXISTS USING TTL 10", customer.getID())
        statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
        ResultSet rs = session.execute(statement);
        if (rs.wasApplied()) {
            return true;
        }
    } catch (Exception e) {
        // Assume failure. Fall through, return false.
        // If a row will eventually be created it will self delete in 10 TTL seconds.
    }
    return false;
}

public void release (Customer customer) {
    try {
        Statement statement = new SimpleStatement("DELETE FROM customer_lock WHERE customerID = ?", customer.getID())
        statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
        session.execute(statement);
    } catch (Exception e) {
        // Ignore any exceptions
    }
}

public boolean isCustomerUnique (Customer customer) {
    try {
        Statement statement = new SimpleStatement("SELECT * FROM customer_account WHERE customerID = ?", customer.getID())
        ResultSet rs = session.execute(statement);
        if (rs.all().size() == 0) {
            return true;
        }
    } catch (Exception e) {
        throw new CustomerCreateException("Error reading customer", e)
    }
    return false;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment