Skip to content

Instantly share code, notes, and snippets.

@andlaz
Created July 30, 2012 15:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andlaz/3207894 to your computer and use it in GitHub Desktop.
Save andlaz/3207894 to your computer and use it in GitHub Desktop.
Cassandra perf
cluster_name: 'TestCluster'
initial_token: 0
hinted_handoff_enabled: true
max_hint_window_in_ms: 3600000 # one hour
hinted_handoff_throttle_delay_in_ms: 1
authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
authority: org.apache.cassandra.auth.AllowAllAuthority
partitioner: org.apache.cassandra.dht.RandomPartitioner
data_file_directories:
- /tmp/profilefront-cassandra/data
commitlog_directory: /tmp/profilefront-cassandra/commitlog
saved_caches_directory: /tmp/profilefront-cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 60000
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "127.0.0.1"
flush_largest_memtables_at: 0.85
reduce_cache_sizes_at: 0.85
reduce_cache_capacity_to: 0.6
concurrent_reads: 2
concurrent_writes: 2
memtable_total_space_in_mb: 1024
memtable_flush_writers: 1
memtable_flush_queue_size: 4
storage_port: 7000
ssl_storage_port: 7001
listen_address: 127.0.0.1
broadcast_address: 127.0.0.1
rpc_address: 0.0.0.0
rpc_port: 10160
rpc_keepalive: true
rpc_server_type: hsha
thrift_framed_transport_size_in_mb: 15
thrift_max_message_length_in_mb: 16
incremental_backups: false
snapshot_before_compaction: false
column_index_size_in_kb: 64
in_memory_compaction_limit_in_mb: 64
multithreaded_compaction: false
compaction_throughput_mb_per_sec: 16
compaction_preheat_key_cache: true
rpc_timeout_in_ms: 10000
endpoint_snitch: PropertyFileSnitch
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.1
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
index_interval: 128
encryption_options:
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
truststore: conf/.truststore
truststore_password: cassandra
package com.ignitionone.profilefront.store.keeper;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ignitionone.Profile;
import com.ignitionone.ProfileAttribute;
import com.ignitionone.ProfileBucket;
import com.ignitionone.profilefront.exception.AttributeKeyTypeCollisionException;
import com.ignitionone.profilefront.exception.AttributeValueTypeCollisionException;
import com.ignitionone.profilefront.exception.StoreException;
import com.ignitionone.profilefront.store.StoreKeeper;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.AstyanaxContext.Builder;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.annotations.Component;
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.serializers.AnnotatedCompositeSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
public class CassandraAstyanaxStoreKeeper extends StoreKeeper {
/* These are now read from configuration
------------------------------------------ */
protected String columnFamilyName;
protected String keyspaceName;
private int cfKeyCacheSize = 0;
private int cfRowCacheSize = 0;
private String ksReplicationStrategy;
private Map<String, String> ksReplicationStrategyOptions = new HashMap<String,String>();
/* Content-encoding flags
-------------------------- */
public static final int ENCODING_INTEGER = 1;
public static final int ENCODING_STRING = 2;
private static final Logger LOG = LoggerFactory.getLogger(CassandraAstyanaxStoreKeeper.class);
/* These represent Astyanax state
------------------------------- */
protected AstyanaxContext<Keyspace> keyspaceContext;
protected AstyanaxContext<Cluster> clusterContext;
protected Keyspace keyspace;
protected Cluster cluster;
public static class StrAttribute {
@Component(ordinal=0) String bucketName;
@Component(ordinal=1) String attributeKey;
@Component(ordinal=2) Integer valueType;
public StrAttribute() {
}
public String toString() {
return "bucketName: "+bucketName+", attributeKey: "+attributeKey+", valueType: "+valueType;
}
}
public static class IntAttribute {
@Component(ordinal=0) String bucketName;
@Component(ordinal=1) Integer attributeKey;
@Component(ordinal=2) Integer valueType;
public IntAttribute() {
}
public String toString() {
return "bucketName: "+bucketName+", attributeKey: "+attributeKey+", valueType: "+valueType;
}
}
protected AnnotatedCompositeSerializer<StrAttribute> strAttributeSerializer;
protected ColumnFamily<String, StrAttribute> cfProfileStrAttributes;
protected AnnotatedCompositeSerializer<IntAttribute> intAttributeSerializer;
protected ColumnFamily<String, IntAttribute> cfProfileIntAttributes;
/* Metrics
----------- */
protected final Timer readLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "read-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer writeLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "write-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer pageLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "page-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer pagePrepareLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "page-prepare-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer deserializationLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "deserialization-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
public CassandraAstyanaxStoreKeeper(Configuration configuration) throws Exception {
super(configuration);
// set schema-related properties
if (!configuration.containsKey("settings.keyspaceName")) throw new ConfigurationException("Keyspace name not defined");
if (!configuration.containsKey("settings.columnFamilyName")) throw new ConfigurationException("Column family name not defined");
setColumnFamilyName(configuration.getString("settings.columnFamilyName"));
strAttributeSerializer = new AnnotatedCompositeSerializer<StrAttribute>(StrAttribute.class);
cfProfileStrAttributes = new ColumnFamily<String, StrAttribute>(getColumnFamilyName(), StringSerializer.get(), strAttributeSerializer);
intAttributeSerializer = new AnnotatedCompositeSerializer<IntAttribute>(IntAttribute.class);
cfProfileIntAttributes = new ColumnFamily<String, IntAttribute>(getColumnFamilyName(), StringSerializer.get(), intAttributeSerializer);
setKeyspaceName(configuration.getString("settings.keyspaceName"));
setCfKeyCacheSize(configuration.getInt("settings.columnFamilyKeyCacheSize", 200000));
setCfRowCacheSize(configuration.getInt("settings.columnFamilyRowCacheSize", 25000));
setKsReplicationStrategy(configuration.getString("settings.replicationStrategy", "SimpleStrategy"));
// setKsReplicationStrategyOptions(configuration.getString("settings.replicationStrategyOptions", "replication_factor:2"));
String[] rSOpts = configuration.getString("settings.replicationStrategyOptions", "replication_factor:2").split(",");
for(String rsOpt:Arrays.asList(rSOpts)) {
String[] rSOptKV = rsOpt.split(":");
if (rSOptKV.length == 2) getKsReplicationStrategyOptions().put(rSOptKV[0], rSOptKV[1]);
}
LOG.trace("Connecting to Cassandra cluster..");
// load hosts
Object property = configuration.getProperty("nodes.node.host");
int nodes = 1;
if (null == property) {
// TODO maybe give more details about the name
throw new ConfigurationException("No nodes defined for the store keeper.");
} else if (property instanceof List) {
nodes = ((List) property).size();
}
LOG.debug("Seed: "+configuration.getString("nodes.node(0).host")+":"+configuration.getInt("nodes.node(0).port"));
ConnectionPoolConfiguration pool = new ConnectionPoolConfigurationImpl(configuration.getString("name")+"-Pool")
.setInitConnsPerHost(configuration.getInt("settings.initConnsPerHost", 10))
.setMaxConnsPerHost(configuration.getInt("settings.maxConnsPerHost", 25))
.setSeeds(configuration.getString("nodes.node(0).host")+":"+configuration.getInt("nodes.node(0).port"));
try {
Builder builder = new AstyanaxContext.Builder()
.forCluster(configuration.getString("name"))
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE).setDefaultReadConsistencyLevel(ConsistencyLevel.CL_ONE).setDefaultWriteConsistencyLevel(ConsistencyLevel.CL_ANY).setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE))
.withConnectionPoolConfiguration(pool)
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor());
clusterContext = builder.buildCluster(ThriftFamilyFactory.getInstance());
clusterContext.start();
cluster = clusterContext.getEntity();
try {
KeyspaceDefinition ksDef = cluster.makeKeyspaceDefinition();
ksDef.setName(getKeyspaceName())
.setStrategyClass(getKsReplicationStrategy())
.setStrategyOptions(getKsReplicationStrategyOptions())
.addColumnFamily(
cluster.makeColumnFamilyDefinition()
.setName(getColumnFamilyName())
);
cluster.addKeyspace(ksDef);
} catch(ConnectionException e) {
//System.out.println(e);
}
keyspace = cluster.getKeyspace(getKeyspaceName());
} catch(Exception e) {
LOG.error("Caught exception while initializing store {}", e);
}
}
@Override
public Profile get(String id) throws StoreException {
// TODO Auto-generated method stub
final TimerContext readLatencyContext = readLatency.time();
// create the new Profile instance
Profile profile = new Profile(id);
try {
// get int keys..
OperationResult<ColumnList<IntAttribute>> intRead = keyspace.prepareQuery(cfProfileIntAttributes).getKey(id).execute();
for (Column<IntAttribute> result : intRead.getResult()) {
IntAttribute iAttribute = result.getName();
LOG.debug("Found IntAttribute {}", iAttribute);
switch(iAttribute.valueType) {
case CassandraAstyanaxStoreKeeper.ENCODING_INTEGER:
if (iAttribute.attributeKey != null) profile.setAttribute(iAttribute.bucketName, iAttribute.attributeKey, result.getIntegerValue(), result.getTimestamp());
break;
case CassandraAstyanaxStoreKeeper.ENCODING_STRING:
if (iAttribute.attributeKey != null) profile.setAttribute(iAttribute.bucketName, iAttribute.attributeKey, result.getStringValue(), result.getTimestamp());
break;
}
}
// get str keys..
OperationResult<ColumnList<StrAttribute>> strRead = keyspace.prepareQuery(cfProfileStrAttributes).getKey(id).execute();
for (Column<StrAttribute> result : strRead.getResult()) {
StrAttribute sAttribute = result.getName();
LOG.debug("Found StrAttribute {}", sAttribute);
switch(sAttribute.valueType) {
case CassandraAstyanaxStoreKeeper.ENCODING_INTEGER:
if (sAttribute.attributeKey != null) profile.setAttribute(sAttribute.bucketName, sAttribute.attributeKey, result.getIntegerValue(), result.getTimestamp());
break;
case CassandraAstyanaxStoreKeeper.ENCODING_STRING:
if (sAttribute.attributeKey != null) profile.setAttribute(sAttribute.bucketName, sAttribute.attributeKey, result.getStringValue(), result.getTimestamp());
break;
}
}
} catch (ConnectionException e) {
throw new StoreException("Caught Astyanax exception while trying to get profile", e);
} catch (AttributeKeyTypeCollisionException e) {
// TODO Auto-generated catch block
throw new StoreException("Caught AttributeKeyTypeCollisionException while trying to get profile", e);
} catch (AttributeValueTypeCollisionException e) {
// TODO Auto-generated catch block
throw new StoreException("Caught AttributeValueTypeCollisionException while trying to get profile", e);
} catch (Exception e) {
throw new StoreException("Caught Exception while trying to get profile", e);
}
readLatencyContext.stop();
return profile;
}
}
package com.ignitionone.profilefront.storekeepers;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.LoggerFactory;
import com.ignitionone.profilefront.store.StoreManager;
public class CassandraAstyanaxStoreKeeperTest extends StoreKeeperTest {
private static CassandraDaemon cassandra;
public CassandraAstyanaxStoreKeeperTest() {
LOG = LoggerFactory.getLogger(CassandraAstyanaxStoreKeeperTest.class);
}
@BeforeClass
public static void setup() throws IOException, SecurityException,
IllegalArgumentException, NoSuchMethodException,
ClassNotFoundException, IllegalAccessException,
InvocationTargetException, InstantiationException, ConfigurationException, InterruptedException {
(new File("/tmp/profilefront-cassandra")).mkdir();
CassandraAstyanaxStoreKeeperTest.cassandra = new CassandraDaemon();
CassandraAstyanaxStoreKeeperTest.cassandra.init(null);
CassandraAstyanaxStoreKeeperTest.cassandra.start();
// Thread.sleep(60000);
XMLConfiguration configuration = new XMLConfiguration();
configuration.addProperty("configuration.stores.store.name", "TestCassandra");
configuration.addProperty("configuration.stores.store.keeper", "CassandraAstyanaxStoreKeeper");
configuration.addProperty("configuration.stores.store.settings.keyspaceName", "ProfilefrontTest");
configuration.addProperty("configuration.stores.store.settings.columnFamilyName", "Profiles");
configuration.addProperty("configuration.stores.store.settings.columnFamilyKeyCacheSize", "10000");
configuration.addProperty("configuration.stores.store.settings.columnFamilyRowCacheSize", "0");
configuration.addProperty("configuration.stores.store.settings.replicationStrategy", "SimpleStrategy");
configuration.addProperty("configuration.stores.store.settings.replicationStrategyOptions", "replication_factor:1");
configuration.addProperty("configuration.stores.store.nodes.node.host", "127.0.0.1");
configuration.addProperty("configuration.stores.store.nodes.node.port", "10160");
configuration.addProperty("configuration.stores.store.cassandraThriftSocketTimeout", "10000");
setManager(new StoreManager());
getManager().registerAndInitializeStore(configuration.subset("configuration.stores"));
}
@Before
public void setUp() throws Exception {
storekeeper = getManager().getStoreKeeper("TestCassandra");
}
@AfterClass
public static void destroy() {
// getManager().shutdown();
cassandra.stop();
try {
FileUtils.deleteDirectory(new File("/tmp/profilefront-cassandra"));
} catch (IOException e) {
}
}
}
package com.ignitionone.profilefront.store.keeper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import me.prettyprint.cassandra.connection.DynamicLoadBalancingPolicy;
import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
import me.prettyprint.cassandra.model.BasicKeyspaceDefinition;
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.cassandra.serializers.AbstractSerializer;
import me.prettyprint.cassandra.serializers.CompositeSerializer;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ColumnSliceIterator;
import me.prettyprint.cassandra.service.ThriftCfDef;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.Composite;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.SliceQuery;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ignitionone.Profile;
import com.ignitionone.ProfileAttribute;
import com.ignitionone.ProfileBucket;
import com.ignitionone.profilefront.exception.StoreException;
import com.ignitionone.profilefront.store.StoreKeeper;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
public class CassandraStoreKeeper extends StoreKeeper {
/* These are now read from configuration
------------------------------------------ */
protected String columnFamilyName;
protected String keyspaceName;
private int cfKeyCacheSize = 0;
private int cfRowCacheSize = 0;
private String ksReplicationStrategy;
private Map<String, String> ksReplicationStrategyOptions = new HashMap<String,String>();
public static HConsistencyLevel READ_CONSISTENCY = HConsistencyLevel.ONE; // TODO expose in configuration
public static HConsistencyLevel WRITE_CONSISTENCY = HConsistencyLevel.ANY; // TODO expose in configuration
/* Content-encoding flags
-------------------------- */
public static final int ENCODING_INTEGER = 1;
public static final int ENCODING_STRING = 2;
private static final Logger LOG = LoggerFactory.getLogger(CassandraStoreKeeper.class);
/* These repesent Hector state
------------------------------- */
protected Cluster cluster;
protected Keyspace profileKeyspace;
/* Metrics
----------- */
protected final Timer readLatency = Metrics.newTimer(CassandraStoreKeeper.class, "read-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer writeLatency = Metrics.newTimer(CassandraStoreKeeper.class, "write-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer pageLatency = Metrics.newTimer(CassandraStoreKeeper.class, "page-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer pagePrepareLatency = Metrics.newTimer(CassandraStoreKeeper.class, "page-prepare-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
protected final Timer deserializationLatency = Metrics.newTimer(CassandraStoreKeeper.class, "deserialization-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
public CassandraStoreKeeper(Configuration configuration) throws Exception {
super(configuration);
try {
// set schema-related properties
if (!configuration.containsKey("settings.keyspaceName")) throw new ConfigurationException("Keyspace name not defined");
if (!configuration.containsKey("settings.columnFamilyName")) throw new ConfigurationException("Column family name not defined");
setColumnFamilyName(configuration.getString("settings.columnFamilyName"));
setKeyspaceName(configuration.getString("settings.keyspaceName"));
//setCfKeyCacheSize(configuration.getInt("settings.columnFamilyKeyCacheSize", 200000));
//setCfRowCacheSize(configuration.getInt("settings.columnFamilyRowCacheSize", 25000));
setKsReplicationStrategy(configuration.getString("settings.replicationStrategy", "SimpleStrategy"));
// setKsReplicationStrategyOptions(configuration.getString("settings.replicationStrategyOptions", "replication_factor:2"));
String[] rSOpts = configuration.getString("settings.replicationStrategyOptions", "replication_factor:2").split(",");
for(String rsOpt:Arrays.asList(rSOpts)) {
String[] rSOptKV = rsOpt.split(":");
if (rSOptKV.length == 2) getKsReplicationStrategyOptions().put(rSOptKV[0], rSOptKV[1]);
}
// try to connect to the Cassandra nodes & set Hector-related (connection) properties
LOG.trace("Connecting to Cassandra cluster..");
// load hosts
Object property = configuration.getProperty("nodes.node.host");
int nodes = 1;
if (null == property) {
// TODO maybe give more details about the name
throw new ConfigurationException("No nodes defined for the store keeper.");
} else if (property instanceof List) {
nodes = ((List) property).size();
}
String hosts = "";
for(int nodesIterator=0;nodesIterator<nodes;nodesIterator++) {
hosts = hosts.concat(configuration.getString("nodes.node("+nodesIterator+").host")+":"+configuration.getInt("nodes.node("+nodesIterator+").port")+",");
}
LOG.debug("Hosts {}",hosts);
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator();
cassandraHostConfigurator.setHosts(hosts);
cassandraHostConfigurator.setLoadBalancingPolicy(new DynamicLoadBalancingPolicy());
// cassandraHostConfigurator.setCassandraThriftSocketTimeout(configuration.getInt("settings.cassandraThriftSocketTimeout", 100));
// cassandraHostConfigurator.setMaxActive(configuration.getInt("settings.maxActive", 50));
// cassandraHostConfigurator.setUseSocketKeepalive(configuration.getBoolean("settings.useSocketKeepalive", true));
cassandraHostConfigurator.setAutoDiscoverHosts(configuration.getBoolean("settings.runAutoDiscoveryAtStartup", true));
cassandraHostConfigurator.setUseThriftFramedTransport(true);
setCluster(HFactory.getOrCreateCluster(configuration.getString("name"),cassandraHostConfigurator));
ThriftKsDef pfKeySpace = (ThriftKsDef) getCluster()
.describeKeyspace(getKeyspaceName());
if (pfKeySpace == null) {
LOG.warn("Keyspace {} was not found, creating it", getKeyspaceName());
pfKeySpace =
(ThriftKsDef) HFactory.createKeyspaceDefinition(getKeyspaceName());
pfKeySpace.setStrategyClass(getKsReplicationStrategy());
pfKeySpace.setStrategyOptions(getKsReplicationStrategyOptions());
getCluster().addKeyspace(pfKeySpace, true);
}
List<String> cfNames = new ArrayList<String>();
for (ColumnFamilyDefinition cfFound : pfKeySpace.getCfDefs()) {
cfNames.add(cfFound.getName());
}
if (!cfNames.contains(new String(getColumnFamilyName()))) {
LOG.warn("ColumnFamily {} was not found, creating it", getColumnFamilyName());
BasicColumnFamilyDefinition pfProfilesDef = new BasicColumnFamilyDefinition();
pfProfilesDef.setKeyspaceName(pfKeySpace.getName());
pfProfilesDef.setName(getColumnFamilyName());
ColumnFamilyDefinition cfDefStandard = new ThriftCfDef(pfProfilesDef);
cfDefStandard.setReadRepairChance(0.3);
getCluster().addColumnFamily(cfDefStandard, true);
}
ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
ccl.setDefaultWriteConsistencyLevel(WRITE_CONSISTENCY);
ccl.setDefaultReadConsistencyLevel(READ_CONSISTENCY);
LOG.trace("Loading keyspace");
setProfileKeyspace(HFactory.createKeyspace(getKeyspaceName(), getCluster(), ccl));
markAsAvailable();
} catch (Exception e) {
LOG.error("Caught exception while initializing store {}", e);
}
}
/**
* Read an entire profile row
*/
@Override
public Profile get(String id) throws StoreException {
final TimerContext readLatencyContext = readLatency.time();
// create the new Profile instance
Profile profile = new Profile(id);
try {
// load columns
List<HColumn<Composite, String>> strColumns = _getStringAttributes(id);
LOG.debug("Found "+strColumns.size()+" string columns..");
for (HColumn<Composite, String> column:strColumns) {
LOG.debug("Profile#"+id+" unserializing string attribute "+column.getName());
final TimerContext deserializationLatencyContext = deserializationLatency.time();
// check the attribute name-encoding flag
switch(column.getName().get(2, new IntegerSerializer())) {
case CassandraStoreKeeper.ENCODING_INTEGER:
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new IntegerSerializer()), column.getValue(), column.getClock());
// LOG.debug("Found a string column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new IntegerSerializer())+", value: "+column.getValue() );
break;
case CassandraStoreKeeper.ENCODING_STRING:
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new StringSerializer()), column.getValue(), column.getClock());
// LOG.debug("Found a string column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new StringSerializer())+", value: "+column.getValue() );
break;
}
deserializationLatencyContext.stop();
}
List<HColumn<Composite, Integer>> intColumns = _getIntegerAttributes(id);
LOG.debug("Found "+intColumns.size()+" integer columns..");
for (HColumn<Composite, Integer> column:intColumns) {
LOG.debug("Profile#"+id+" unserializing integer attribute "+column.getName());
final TimerContext deserializationLatencyContext = deserializationLatency.time();
// check the attribute name-encoding flag
switch(column.getName().get(2, new IntegerSerializer())) {
case CassandraStoreKeeper.ENCODING_INTEGER:
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new IntegerSerializer()), column.getValue(), column.getClock());
// LOG.debug("Found an integer column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new IntegerSerializer())+", value: "+column.getValue() );
break;
case CassandraStoreKeeper.ENCODING_STRING:
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new StringSerializer()), column.getValue(), column.getClock());
// LOG.debug("Found an integer column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new StringSerializer())+", value: "+column.getValue() );
break;
}
deserializationLatencyContext.stop();
}
} catch(Exception e) {
// TODO Auto-generated catch block
}
readLatencyContext.stop();
return profile;
}
/**
*
* @author andras
*
* @param <T>
*/
private class ColumnLoader<T> {
private T t;
/**
* @param id
* @param start
* @param limit
* @param serializer
* @return
*/
public List<HColumn<Composite, T>> pageColumnsFor(String id, Composite start, Composite end, int limit, AbstractSerializer<T> serializer) {
final TimerContext pagePrepareLatencyContext = pagePrepareLatency.time();
List<HColumn<Composite, T>> results = new ArrayList<HColumn<Composite, T>>();
SliceQuery<String, Composite, T> query = HFactory.createSliceQuery(_getProfileKeyspace(), new StringSerializer(), new CompositeSerializer(), serializer);
query.setColumnFamily(getColumnFamilyName())
.setKey(id);
ColumnSliceIterator<String, Composite, T> sliceIterator = new ColumnSliceIterator<String, Composite, T>(query, start, end, false, limit);
pagePrepareLatencyContext.stop();
// get the next column from the iterator, if there is a next one, and if we are not over our
while(sliceIterator.hasNext()) {
final TimerContext pageLatencyContext = pageLatency.time();
results.add(sliceIterator.next());
pageLatencyContext.stop();
}
return results;
}
}
}
/**
*
*/
package com.ignitionone.profilefront.storekeepers;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import me.prettyprint.hector.api.HConsistencyLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.io.FileUtils;
import com.ignitionone.Profile;
import com.ignitionone.ProfileAttribute;
import com.ignitionone.ProfileTest;
import com.ignitionone.profilefront.exception.AttributeKeyTypeCollisionException;
import com.ignitionone.profilefront.exception.AttributeValueTypeCollisionException;
import com.ignitionone.profilefront.exception.StoreException;
import com.ignitionone.profilefront.store.StoreKeeper;
import com.ignitionone.profilefront.store.StoreManager;
import com.ignitionone.profilefront.store.keeper.CassandraStoreKeeper;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
/**
* @author andras
*
*/
public class CassandraStoreKeeperTest extends StoreKeeperTest {
private static CassandraDaemon cassandra;
public CassandraStoreKeeperTest() {
LOG = LoggerFactory.getLogger(CassandraStoreKeeperTest.class);
}
@BeforeClass
public static void setup() throws IOException, SecurityException,
IllegalArgumentException, NoSuchMethodException,
ClassNotFoundException, IllegalAccessException,
InvocationTargetException, InstantiationException, ConfigurationException, InterruptedException {
CassandraStoreKeeperTest.cassandra = new CassandraDaemon();
CassandraStoreKeeperTest.cassandra.init(null);
CassandraStoreKeeperTest.cassandra.start();
// Thread.sleep(60000);
XMLConfiguration configuration = new XMLConfiguration();
configuration.addProperty("configuration.stores.store.name", "TestCassandra");
configuration.addProperty("configuration.stores.store.keeper", "CassandraStoreKeeper");
configuration.addProperty("configuration.stores.store.settings.keyspaceName", "ProfilefrontTest");
configuration.addProperty("configuration.stores.store.settings.columnFamilyName", "Profiles");
configuration.addProperty("configuration.stores.store.settings.columnFamilyKeyCacheSize", "10000");
configuration.addProperty("configuration.stores.store.settings.columnFamilyRowCacheSize", "0");
configuration.addProperty("configuration.stores.store.settings.replicationStrategy", "SimpleStrategy");
configuration.addProperty("configuration.stores.store.settings.replicationStrategyOptions", "replication_factor:1");
configuration.addProperty("configuration.stores.store.nodes.node.host", "127.0.0.1");
configuration.addProperty("configuration.stores.store.nodes.node.port", "10160");
configuration.addProperty("configuration.stores.store.cassandraThriftSocketTimeout", "10000");
CassandraStoreKeeper.READ_CONSISTENCY = HConsistencyLevel.ONE;
CassandraStoreKeeper.WRITE_CONSISTENCY = HConsistencyLevel.ONE;
setManager(new StoreManager());
getManager().registerAndInitializeStore(configuration.subset("configuration.stores"));
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
storekeeper = getManager().getStoreKeeper("TestCassandra");
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
}
@AfterClass
public static void destroy() {
// getManager().shutdown();
cassandra.stop();
try {
FileUtils.deleteDirectory(new File("/tmp/profilefront-cassandra"));
} catch (IOException e) {
}
}
}
package com.ignitionone.profilefront.storekeepers;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ignitionone.Profile;
import com.ignitionone.ProfileAttribute;
import com.ignitionone.ProfileBucket;
import com.ignitionone.ProfileTest;
import com.ignitionone.profilefront.exception.AttributeKeyTypeCollisionException;
import com.ignitionone.profilefront.exception.AttributeValueTypeCollisionException;
import com.ignitionone.profilefront.exception.StoreException;
import com.ignitionone.profilefront.store.StoreKeeper;
import com.ignitionone.profilefront.store.StoreManager;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import com.yammer.metrics.stats.Snapshot;
abstract public class StoreKeeperTest {
private static StoreManager manager;
private List<Profile> profiles;
protected StoreKeeper storekeeper;
protected static Logger LOG = LoggerFactory.getLogger(StoreKeeperTest.class);
@Test
public void testThroughput() throws AttributeKeyTypeCollisionException, AttributeValueTypeCollisionException, StoreException {
int overMaxLatency = 0;
final Timer writeTimer = Metrics.newTimer(this.getClass(), "write-performance", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
final Timer readTimer = Metrics.newTimer(this.getClass(), "read-performance", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
setProfiles(ProfileTest.generateProfiles(1000, (ProfileTest.FEATURE_INT_ATTRIBUTE | ProfileTest.FEATURE_STRING_ATTRIBUTE | ProfileTest.FEATURE_PHP_VISITOR_OBJECT | ProfileTest.FEATURE_INTERESTS)));
LOG.trace("Write throughput test with {} profiles", getProfiles().size());
// warmup..
for(Profile profile:getProfiles()) {
storekeeper.createUpdate(profile);
storekeeper.get(profile.getId());
}
for(Profile profile:getProfiles()) {
// write
TimerContext writeContext = writeTimer.time();
storekeeper.createUpdate(profile);
writeContext.stop();
// read
TimerContext readContext = readTimer.time();
storekeeper.get(profile.getId());
readContext.stop();
}
Snapshot writeSnapshot = writeTimer.getSnapshot();
LOG.debug("Write Throughput | Mean: "+writeTimer.mean()+", Max: "+writeTimer.max()+" stDev: "+writeTimer.stdDev()+", 75th:"+writeSnapshot.get75thPercentile()+", 95th: "+writeSnapshot.get95thPercentile()+", 98th: "+writeSnapshot.get98thPercentile()+", 99th: "+writeSnapshot.get99thPercentile()+", 999th: "+writeSnapshot.get999thPercentile());
Snapshot readSnapshot = readTimer.getSnapshot();
LOG.debug("Read Throughput | Mean: "+readTimer.mean()+", Max: "+readTimer.max()+" stDev: "+readTimer.stdDev()+", 75th:"+readSnapshot.get75thPercentile()+", 95th: "+readSnapshot.get95thPercentile()+", 98th: "+readSnapshot.get98thPercentile()+", 99th: "+readSnapshot.get99thPercentile()+", 999th: "+readSnapshot.get999thPercentile());
}
protected static StoreManager getManager() {
return manager;
}
protected static void setManager(StoreManager manager) {
StoreKeeperTest.manager = manager;
}
protected List<Profile> getProfiles() {
return profiles;
}
protected void setProfiles(List<Profile> profiles) {
this.profiles = profiles;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment