Skip to content

Instantly share code, notes, and snippets.

@alexeygrigorev
Last active December 16, 2015 01:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alexeygrigorev/5353822 to your computer and use it in GitHub Desktop.
Save alexeygrigorev/5353822 to your computer and use it in GitHub Desktop.
Cassandra TTL test
import me.prettyprint.cassandra.serializers.ObjectSerializer;
import me.prettyprint.cassandra.service.ThriftCfDef;
import me.prettyprint.hector.api.*;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.ddl.*;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.QueryResult;
import org.slf4j.*;
import static me.prettyprint.hector.api.factory.HFactory.*;
/**
* Service that allows interaction with a single column family in a key-space on a cluster
*/
public class CassandraService {
private static final ObjectSerializer SERIALIZER = ObjectSerializer.get();
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraService.class);
private Cluster cluster;
private Keyspace keyspace;
private String columnFamilyName;
private KeyspaceDefinition keyspaceDefinition;
private void init(Cluster cluster, String keySpaceName, String columnFamilyName) {
this.cluster = cluster;
this.columnFamilyName = columnFamilyName;
this.keyspaceDefinition = getOrCreateKeyspaceDefinition(keySpaceName);
LOGGER.info("We are going to create KeySpace {}", keySpaceName);
this.keyspace = createKeyspace(keySpaceName, cluster, new GmiPublisherConsistencyLevelPolicy());
}
public CassandraService(Cluster cluster, String keySpaceName, String columnFamilyName) {
init(cluster, keySpaceName, columnFamilyName);
LOGGER.info("KeyspaceDefinition {} for keyspace {}", keyspaceDefinition, keySpaceName);
if (!columnFamilyExists(columnFamilyName, keyspaceDefinition)) {
ColumnFamilyDefinition columnFamilyDefinition = createColumnFamilyDefinition(keySpaceName, columnFamilyName);
cluster.addColumnFamily(columnFamilyDefinition);
LOGGER.info("Added column family {} / {}", keySpaceName, columnFamilyName);
}
}
public CassandraService(Cluster cluster, String keySpaceName, String columnFamilyName, int gcGraceSeconds) {
init(cluster, keySpaceName, columnFamilyName);
LOGGER.info("KeyspaceDefinition {} for keyspace {}", keyspaceDefinition, keySpaceName);
if (!columnFamilyExists(columnFamilyName, keyspaceDefinition)) {
addColumnFamilyWithGCGraceSeconds(keySpaceName, columnFamilyName, gcGraceSeconds);
LOGGER.info("Added column family {} / {} with gcGraceSeconds period {}",
new Object[] { keySpaceName, columnFamilyName, gcGraceSeconds });
}
}
private KeyspaceDefinition getOrCreateKeyspaceDefinition(String keySpaceName) {
KeyspaceDefinition keyspaceDef = cluster.describeKeyspace(keySpaceName);
if (keyspaceDef == null) {
keyspaceDef = createKeyspaceDefinition(keySpaceName);
}
return keyspaceDef;
}
private KeyspaceDefinition createKeyspaceDefinition(String keySpaceName) {
KeyspaceDefinition keyspaceDef;
cluster.addKeyspace(HFactory.createKeyspaceDefinition(keySpaceName));
LOGGER.info("Added keyspace {}", keySpaceName);
keyspaceDef = cluster.describeKeyspace(keySpaceName);
return keyspaceDef;
}
private void addColumnFamilyWithGCGraceSeconds(String keySpaceName, String columnFamilyName, int gcGraceSeconds) {
ColumnFamilyDefinition columnFamilyDefinition = createColumnFamilyDefinition(keySpaceName, columnFamilyName);
ThriftCfDef columnFamilyWithGCGraceSeconds = new ThriftCfDef(columnFamilyDefinition);
columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);
cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}
private boolean columnFamilyExists(String columnFamily, KeyspaceDefinition keyspaceDefinition) {
if (keyspaceDefinition != null) {
for (ColumnFamilyDefinition columnFamilyDefinition : keyspaceDefinition.getCfDefs()) {
if (columnFamily.equals(columnFamilyDefinition.getName())) {
return true;
}
}
}
return false;
}
public QueryResult<HColumn<Object, Object>> queryObject(Object rowKey, Object columnName) {
return createColumnQuery(keyspace, SERIALIZER, SERIALIZER, SERIALIZER)
.setColumnFamily(this.columnFamilyName)
.setKey(rowKey)
.setName(columnName)
.execute();
}
public void persistObject(Object rowKey, Object columnName, Object obj) {
LOGGER.debug("Persist {} / {}", rowKey, columnName);
HColumn<Object, Object> column = createColumn(columnName, obj, SERIALIZER, SERIALIZER);
executeInsertion(rowKey, column);
}
public void persistObjectWithTtl(Object rowKey, Object columnName, Object obj, int ttl) {
LOGGER.debug("Persist {} / {}", rowKey, columnName);
HColumn<Object, Object> column = createColumn(columnName, obj, SERIALIZER, SERIALIZER);
column.setTtl(ttl);
executeInsertion(rowKey, column);
}
private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
mutator.addInsertion(rowKey, this.columnFamilyName, column);
mutator.execute();
}
/**
* API doesn't currently support completely removing row, only deletes contents, use truncate to get rid of empty
* rows
*/
public void remove(Object rowKey, Object columnName) {
createMutator(this.keyspace, SERIALIZER)
.addDeletion(rowKey, this.columnFamilyName, columnName, SERIALIZER)
.execute();
}
/**
* Only returns true if removed entries are truncated, as remove leaves empty row
*/
public boolean isColumnFamilyEmpty() {
return HFactory.createRangeSlicesQuery(this.keyspace, SERIALIZER, SERIALIZER, SERIALIZER)
.setColumnFamily(this.columnFamilyName)
.setRange("", "", false, 1)
.setRowCount(1)
.execute().get().getList().isEmpty();
}
public void truncateColumnFamily() {
this.cluster.truncate(this.keyspace.getKeyspaceName(), this.columnFamilyName);
}
}
import java.util.Iterator;
import java.util.concurrent.*;
import me.prettyprint.cassandra.model.QuorumAllConsistencyLevelPolicy;
import me.prettyprint.cassandra.serializers.ObjectSerializer;
import me.prettyprint.hector.api.*;
import me.prettyprint.hector.api.beans.*;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.*;
import org.apache.cassandra.service.StorageService;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.*;
import org.slf4j.LoggerFactory;
import com.jayway.awaitility.Duration;
import static com.jayway.awaitility.Awaitility.waitAtMost;
import static me.prettyprint.hector.api.factory.HFactory.createKeyspace;
import static org.junit.Assert.*;
/**
* Tests that cassandra actually removes empty rows after compaction.
*
* @author Grigorev Alexey
*/
public class CassandraTtlIntergrationTest {
private static final Logger logger = LoggerFactory.getLogger(CassandraTtlIntergrationTest.class);
private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";
private static final String COLUMN_NAME = "columnName";
private static final int GC_CRACE_SECONDS = 20;
// sut
private CassandraService cassandraService;
// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr", "localhost:9160");
private Keyspace keyspace;
@BeforeClass
public static void setupBeforeClass() {
EmbeddedCassandraDaemon daemon = EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
StorageService.instance.registerDaemon(daemon.getCassandraDaemon());
}
@Before
public void setUp() throws Exception {
keyspace = createKeyspace(KEYSPACE, cluster, new QuorumAllConsistencyLevelPolicy());
cassandraService = new CassandraService(cluster, KEYSPACE, COLUMN_FAMILY, GC_CRACE_SECONDS);
}
@Test
public void objectRemovedAfterTtlPasses() throws Exception {
String rowKey = RandomStringUtils.randomAlphanumeric(128);
Object obj = RandomStringUtils.randomAlphanumeric(1000);
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);
logger.info("after persisting rows count is {}", countRows());
Object value = retrieve(rowKey, COLUMN_NAME);
assertNotNull(value);
logger.info("before TTL passes rows count is {}", countRows());
logger.info("sleeping 25 seconds...");
TimeUnit.SECONDS.sleep(25);
Object nullValue = retrieve(rowKey, COLUMN_NAME);
assertNull(nullValue);
}
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
final int expectedAmount = 50000;
logger.info("before persisting rows count is {}", countRows());
for (int i = 0; i < expectedAmount; i++) {
String rowKey = RandomStringUtils.randomAlphanumeric(128);
Object obj = RandomStringUtils.randomAlphanumeric(1000);
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);
if (i % 100 == 0) {
StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
}
}
logger.info("causing major compaction...");
causeCompaction();
logger.info("after major compaction rows count is {}", countRows());
waitAtMost(Duration.TWO_MINUTES)
.pollDelay(Duration.TWO_SECONDS)
.pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int countRows = countRows();
logger.info("the rows count is {}", countRows);
return countRows < expectedAmount;
}
});
}
public void causeCompaction() throws Exception {
StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
}
public int countRows() {
// http://stackoverflow.com/questions/8418448
int rowCount = 100;
ObjectSerializer serializer = ObjectSerializer.get();
RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(keyspace, serializer, serializer, serializer)
.setColumnFamily(COLUMN_FAMILY)
.setRange(null, null, false, 10).setRowCount(rowCount);
Object lastKey = null;
int i = 0;
while (true) {
rangeSlicesQuery.setKeys(lastKey, null);
QueryResult<OrderedRows<Object, Object, Object>> result = rangeSlicesQuery.execute();
OrderedRows<Object, Object, Object> rows = result.get();
Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();
// we'll skip this first one, since it is the same as the last one from previous time we executed
if (lastKey != null && rowsIterator != null) {
rowsIterator.next();
}
while (rowsIterator.hasNext()) {
Row<Object, Object, Object> row = rowsIterator.next();
lastKey = row.getKey();
i++;
if (row.getColumnSlice().getColumns().isEmpty()) {
continue;
}
}
if (rows.getCount() < rowCount) {
break;
}
}
return i;
}
private Object retrieve(String rowKey, String columnName) {
HColumn<Object, Object> hColumn = cassandraService.queryObject(rowKey, columnName).get();
if (hColumn == null) {
return null;
}
return hColumn.getValue();
}
@After
public void tearDown() {
cassandraService.truncateColumnFamily();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment