Created
July 30, 2012 15:39
-
-
Save andlaz/3207894 to your computer and use it in GitHub Desktop.
Cassandra perf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cluster_name: 'TestCluster' | |
initial_token: 0 | |
hinted_handoff_enabled: true | |
max_hint_window_in_ms: 3600000 # one hour | |
hinted_handoff_throttle_delay_in_ms: 1 | |
authenticator: org.apache.cassandra.auth.AllowAllAuthenticator | |
authority: org.apache.cassandra.auth.AllowAllAuthority | |
partitioner: org.apache.cassandra.dht.RandomPartitioner | |
data_file_directories: | |
- /tmp/profilefront-cassandra/data | |
commitlog_directory: /tmp/profilefront-cassandra/commitlog | |
saved_caches_directory: /tmp/profilefront-cassandra/saved_caches | |
commitlog_sync: periodic | |
commitlog_sync_period_in_ms: 60000 | |
seed_provider: | |
- class_name: org.apache.cassandra.locator.SimpleSeedProvider | |
parameters: | |
- seeds: "127.0.0.1" | |
flush_largest_memtables_at: 0.85 | |
reduce_cache_sizes_at: 0.85 | |
reduce_cache_capacity_to: 0.6 | |
concurrent_reads: 2 | |
concurrent_writes: 2 | |
memtable_total_space_in_mb: 1024 | |
memtable_flush_writers: 1 | |
memtable_flush_queue_size: 4 | |
storage_port: 7000 | |
ssl_storage_port: 7001 | |
listen_address: 127.0.0.1 | |
broadcast_address: 127.0.0.1 | |
rpc_address: 0.0.0.0 | |
rpc_port: 10160 | |
rpc_keepalive: true | |
rpc_server_type: hsha | |
thrift_framed_transport_size_in_mb: 15 | |
thrift_max_message_length_in_mb: 16 | |
incremental_backups: false | |
snapshot_before_compaction: false | |
column_index_size_in_kb: 64 | |
in_memory_compaction_limit_in_mb: 64 | |
multithreaded_compaction: false | |
compaction_throughput_mb_per_sec: 16 | |
compaction_preheat_key_cache: true | |
rpc_timeout_in_ms: 10000 | |
endpoint_snitch: PropertyFileSnitch | |
dynamic_snitch_update_interval_in_ms: 100 | |
dynamic_snitch_reset_interval_in_ms: 600000 | |
dynamic_snitch_badness_threshold: 0.1 | |
request_scheduler: org.apache.cassandra.scheduler.NoScheduler | |
index_interval: 128 | |
encryption_options: | |
internode_encryption: none | |
keystore: conf/.keystore | |
keystore_password: cassandra | |
truststore: conf/.truststore | |
truststore_password: cassandra |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ignitionone.profilefront.store.keeper; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.commons.configuration.Configuration; | |
import org.apache.commons.configuration.ConfigurationException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.ignitionone.Profile; | |
import com.ignitionone.ProfileAttribute; | |
import com.ignitionone.ProfileBucket; | |
import com.ignitionone.profilefront.exception.AttributeKeyTypeCollisionException; | |
import com.ignitionone.profilefront.exception.AttributeValueTypeCollisionException; | |
import com.ignitionone.profilefront.exception.StoreException; | |
import com.ignitionone.profilefront.store.StoreKeeper; | |
import com.netflix.astyanax.AstyanaxContext; | |
import com.netflix.astyanax.AstyanaxContext.Builder; | |
import com.netflix.astyanax.Cluster; | |
import com.netflix.astyanax.Keyspace; | |
import com.netflix.astyanax.MutationBatch; | |
import com.netflix.astyanax.annotations.Component; | |
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration; | |
import com.netflix.astyanax.connectionpool.NodeDiscoveryType; | |
import com.netflix.astyanax.connectionpool.OperationResult; | |
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; | |
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; | |
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType; | |
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; | |
import com.netflix.astyanax.ddl.KeyspaceDefinition; | |
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; | |
import com.netflix.astyanax.model.Column; | |
import com.netflix.astyanax.model.ColumnFamily; | |
import com.netflix.astyanax.model.ColumnList; | |
import com.netflix.astyanax.model.ConsistencyLevel; | |
import com.netflix.astyanax.serializers.AnnotatedCompositeSerializer; | |
import com.netflix.astyanax.serializers.StringSerializer; | |
import com.netflix.astyanax.thrift.ThriftFamilyFactory; | |
import com.yammer.metrics.Metrics; | |
import com.yammer.metrics.core.Timer; | |
import com.yammer.metrics.core.TimerContext; | |
public class CassandraAstyanaxStoreKeeper extends StoreKeeper { | |
/* These are now read from configuration | |
------------------------------------------ */ | |
protected String columnFamilyName; | |
protected String keyspaceName; | |
private int cfKeyCacheSize = 0; | |
private int cfRowCacheSize = 0; | |
private String ksReplicationStrategy; | |
private Map<String, String> ksReplicationStrategyOptions = new HashMap<String,String>(); | |
/* Content-encoding flags | |
-------------------------- */ | |
public static final int ENCODING_INTEGER = 1; | |
public static final int ENCODING_STRING = 2; | |
private static final Logger LOG = LoggerFactory.getLogger(CassandraAstyanaxStoreKeeper.class); | |
/* These represent Astyanax state | |
------------------------------- */ | |
protected AstyanaxContext<Keyspace> keyspaceContext; | |
protected AstyanaxContext<Cluster> clusterContext; | |
protected Keyspace keyspace; | |
protected Cluster cluster; | |
public static class StrAttribute { | |
@Component(ordinal=0) String bucketName; | |
@Component(ordinal=1) String attributeKey; | |
@Component(ordinal=2) Integer valueType; | |
public StrAttribute() { | |
} | |
public String toString() { | |
return "bucketName: "+bucketName+", attributeKey: "+attributeKey+", valueType: "+valueType; | |
} | |
} | |
public static class IntAttribute { | |
@Component(ordinal=0) String bucketName; | |
@Component(ordinal=1) Integer attributeKey; | |
@Component(ordinal=2) Integer valueType; | |
public IntAttribute() { | |
} | |
public String toString() { | |
return "bucketName: "+bucketName+", attributeKey: "+attributeKey+", valueType: "+valueType; | |
} | |
} | |
protected AnnotatedCompositeSerializer<StrAttribute> strAttributeSerializer; | |
protected ColumnFamily<String, StrAttribute> cfProfileStrAttributes; | |
protected AnnotatedCompositeSerializer<IntAttribute> intAttributeSerializer; | |
protected ColumnFamily<String, IntAttribute> cfProfileIntAttributes; | |
/* Metrics | |
----------- */ | |
protected final Timer readLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "read-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer writeLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "write-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer pageLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "page-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer pagePrepareLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "page-prepare-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer deserializationLatency = Metrics.newTimer(CassandraAstyanaxStoreKeeper.class, "deserialization-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
public CassandraAstyanaxStoreKeeper(Configuration configuration) throws Exception { | |
super(configuration); | |
// set schema-related properties | |
if (!configuration.containsKey("settings.keyspaceName")) throw new ConfigurationException("Keyspace name not defined"); | |
if (!configuration.containsKey("settings.columnFamilyName")) throw new ConfigurationException("Column family name not defined"); | |
setColumnFamilyName(configuration.getString("settings.columnFamilyName")); | |
strAttributeSerializer = new AnnotatedCompositeSerializer<StrAttribute>(StrAttribute.class); | |
cfProfileStrAttributes = new ColumnFamily<String, StrAttribute>(getColumnFamilyName(), StringSerializer.get(), strAttributeSerializer); | |
intAttributeSerializer = new AnnotatedCompositeSerializer<IntAttribute>(IntAttribute.class); | |
cfProfileIntAttributes = new ColumnFamily<String, IntAttribute>(getColumnFamilyName(), StringSerializer.get(), intAttributeSerializer); | |
setKeyspaceName(configuration.getString("settings.keyspaceName")); | |
setCfKeyCacheSize(configuration.getInt("settings.columnFamilyKeyCacheSize", 200000)); | |
setCfRowCacheSize(configuration.getInt("settings.columnFamilyRowCacheSize", 25000)); | |
setKsReplicationStrategy(configuration.getString("settings.replicationStrategy", "SimpleStrategy")); | |
// setKsReplicationStrategyOptions(configuration.getString("settings.replicationStrategyOptions", "replication_factor:2")); | |
String[] rSOpts = configuration.getString("settings.replicationStrategyOptions", "replication_factor:2").split(","); | |
for(String rsOpt:Arrays.asList(rSOpts)) { | |
String[] rSOptKV = rsOpt.split(":"); | |
if (rSOptKV.length == 2) getKsReplicationStrategyOptions().put(rSOptKV[0], rSOptKV[1]); | |
} | |
LOG.trace("Connecting to Cassandra cluster.."); | |
// load hosts | |
Object property = configuration.getProperty("nodes.node.host"); | |
int nodes = 1; | |
if (null == property) { | |
// TODO maybe give more details about the name | |
throw new ConfigurationException("No nodes defined for the store keeper."); | |
} else if (property instanceof List) { | |
nodes = ((List) property).size(); | |
} | |
LOG.debug("Seed: "+configuration.getString("nodes.node(0).host")+":"+configuration.getInt("nodes.node(0).port")); | |
ConnectionPoolConfiguration pool = new ConnectionPoolConfigurationImpl(configuration.getString("name")+"-Pool") | |
.setInitConnsPerHost(configuration.getInt("settings.initConnsPerHost", 10)) | |
.setMaxConnsPerHost(configuration.getInt("settings.maxConnsPerHost", 25)) | |
.setSeeds(configuration.getString("nodes.node(0).host")+":"+configuration.getInt("nodes.node(0).port")); | |
try { | |
Builder builder = new AstyanaxContext.Builder() | |
.forCluster(configuration.getString("name")) | |
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE).setDefaultReadConsistencyLevel(ConsistencyLevel.CL_ONE).setDefaultWriteConsistencyLevel(ConsistencyLevel.CL_ANY).setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE)) | |
.withConnectionPoolConfiguration(pool) | |
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor()); | |
clusterContext = builder.buildCluster(ThriftFamilyFactory.getInstance()); | |
clusterContext.start(); | |
cluster = clusterContext.getEntity(); | |
try { | |
KeyspaceDefinition ksDef = cluster.makeKeyspaceDefinition(); | |
ksDef.setName(getKeyspaceName()) | |
.setStrategyClass(getKsReplicationStrategy()) | |
.setStrategyOptions(getKsReplicationStrategyOptions()) | |
.addColumnFamily( | |
cluster.makeColumnFamilyDefinition() | |
.setName(getColumnFamilyName()) | |
); | |
cluster.addKeyspace(ksDef); | |
} catch(ConnectionException e) { | |
//System.out.println(e); | |
} | |
keyspace = cluster.getKeyspace(getKeyspaceName()); | |
} catch(Exception e) { | |
LOG.error("Caught exception while initializing store {}", e); | |
} | |
} | |
@Override | |
public Profile get(String id) throws StoreException { | |
// TODO Auto-generated method stub | |
final TimerContext readLatencyContext = readLatency.time(); | |
// create the new Profile instance | |
Profile profile = new Profile(id); | |
try { | |
// get int keys.. | |
OperationResult<ColumnList<IntAttribute>> intRead = keyspace.prepareQuery(cfProfileIntAttributes).getKey(id).execute(); | |
for (Column<IntAttribute> result : intRead.getResult()) { | |
IntAttribute iAttribute = result.getName(); | |
LOG.debug("Found IntAttribute {}", iAttribute); | |
switch(iAttribute.valueType) { | |
case CassandraAstyanaxStoreKeeper.ENCODING_INTEGER: | |
if (iAttribute.attributeKey != null) profile.setAttribute(iAttribute.bucketName, iAttribute.attributeKey, result.getIntegerValue(), result.getTimestamp()); | |
break; | |
case CassandraAstyanaxStoreKeeper.ENCODING_STRING: | |
if (iAttribute.attributeKey != null) profile.setAttribute(iAttribute.bucketName, iAttribute.attributeKey, result.getStringValue(), result.getTimestamp()); | |
break; | |
} | |
} | |
// get str keys.. | |
OperationResult<ColumnList<StrAttribute>> strRead = keyspace.prepareQuery(cfProfileStrAttributes).getKey(id).execute(); | |
for (Column<StrAttribute> result : strRead.getResult()) { | |
StrAttribute sAttribute = result.getName(); | |
LOG.debug("Found StrAttribute {}", sAttribute); | |
switch(sAttribute.valueType) { | |
case CassandraAstyanaxStoreKeeper.ENCODING_INTEGER: | |
if (sAttribute.attributeKey != null) profile.setAttribute(sAttribute.bucketName, sAttribute.attributeKey, result.getIntegerValue(), result.getTimestamp()); | |
break; | |
case CassandraAstyanaxStoreKeeper.ENCODING_STRING: | |
if (sAttribute.attributeKey != null) profile.setAttribute(sAttribute.bucketName, sAttribute.attributeKey, result.getStringValue(), result.getTimestamp()); | |
break; | |
} | |
} | |
} catch (ConnectionException e) { | |
throw new StoreException("Caught Astyanax exception while trying to get profile", e); | |
} catch (AttributeKeyTypeCollisionException e) { | |
// TODO Auto-generated catch block | |
throw new StoreException("Caught AttributeKeyTypeCollisionException while trying to get profile", e); | |
} catch (AttributeValueTypeCollisionException e) { | |
// TODO Auto-generated catch block | |
throw new StoreException("Caught AttributeValueTypeCollisionException while trying to get profile", e); | |
} catch (Exception e) { | |
throw new StoreException("Caught Exception while trying to get profile", e); | |
} | |
readLatencyContext.stop(); | |
return profile; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ignitionone.profilefront.storekeepers; | |
import java.io.File; | |
import java.io.IOException; | |
import java.lang.reflect.InvocationTargetException; | |
import org.apache.cassandra.thrift.CassandraDaemon; | |
import org.apache.commons.configuration.ConfigurationException; | |
import org.apache.commons.configuration.XMLConfiguration; | |
import org.apache.commons.io.FileUtils; | |
import org.junit.AfterClass; | |
import org.junit.Before; | |
import org.junit.BeforeClass; | |
import org.slf4j.LoggerFactory; | |
import com.ignitionone.profilefront.store.StoreManager; | |
public class CassandraAstyanaxStoreKeeperTest extends StoreKeeperTest { | |
private static CassandraDaemon cassandra; | |
public CassandraAstyanaxStoreKeeperTest() { | |
LOG = LoggerFactory.getLogger(CassandraAstyanaxStoreKeeperTest.class); | |
} | |
@BeforeClass | |
public static void setup() throws IOException, SecurityException, | |
IllegalArgumentException, NoSuchMethodException, | |
ClassNotFoundException, IllegalAccessException, | |
InvocationTargetException, InstantiationException, ConfigurationException, InterruptedException { | |
(new File("/tmp/profilefront-cassandra")).mkdir(); | |
CassandraAstyanaxStoreKeeperTest.cassandra = new CassandraDaemon(); | |
CassandraAstyanaxStoreKeeperTest.cassandra.init(null); | |
CassandraAstyanaxStoreKeeperTest.cassandra.start(); | |
// Thread.sleep(60000); | |
XMLConfiguration configuration = new XMLConfiguration(); | |
configuration.addProperty("configuration.stores.store.name", "TestCassandra"); | |
configuration.addProperty("configuration.stores.store.keeper", "CassandraAstyanaxStoreKeeper"); | |
configuration.addProperty("configuration.stores.store.settings.keyspaceName", "ProfilefrontTest"); | |
configuration.addProperty("configuration.stores.store.settings.columnFamilyName", "Profiles"); | |
configuration.addProperty("configuration.stores.store.settings.columnFamilyKeyCacheSize", "10000"); | |
configuration.addProperty("configuration.stores.store.settings.columnFamilyRowCacheSize", "0"); | |
configuration.addProperty("configuration.stores.store.settings.replicationStrategy", "SimpleStrategy"); | |
configuration.addProperty("configuration.stores.store.settings.replicationStrategyOptions", "replication_factor:1"); | |
configuration.addProperty("configuration.stores.store.nodes.node.host", "127.0.0.1"); | |
configuration.addProperty("configuration.stores.store.nodes.node.port", "10160"); | |
configuration.addProperty("configuration.stores.store.cassandraThriftSocketTimeout", "10000"); | |
setManager(new StoreManager()); | |
getManager().registerAndInitializeStore(configuration.subset("configuration.stores")); | |
} | |
@Before | |
public void setUp() throws Exception { | |
storekeeper = getManager().getStoreKeeper("TestCassandra"); | |
} | |
@AfterClass | |
public static void destroy() { | |
// getManager().shutdown(); | |
cassandra.stop(); | |
try { | |
FileUtils.deleteDirectory(new File("/tmp/profilefront-cassandra")); | |
} catch (IOException e) { | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ignitionone.profilefront.store.keeper; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.concurrent.TimeUnit; | |
import me.prettyprint.cassandra.connection.DynamicLoadBalancingPolicy; | |
import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition; | |
import me.prettyprint.cassandra.model.BasicKeyspaceDefinition; | |
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel; | |
import me.prettyprint.cassandra.serializers.AbstractSerializer; | |
import me.prettyprint.cassandra.serializers.CompositeSerializer; | |
import me.prettyprint.cassandra.serializers.IntegerSerializer; | |
import me.prettyprint.cassandra.serializers.StringSerializer; | |
import me.prettyprint.cassandra.service.CassandraHostConfigurator; | |
import me.prettyprint.cassandra.service.ColumnSliceIterator; | |
import me.prettyprint.cassandra.service.ThriftCfDef; | |
import me.prettyprint.cassandra.service.ThriftKsDef; | |
import me.prettyprint.hector.api.Cluster; | |
import me.prettyprint.hector.api.HConsistencyLevel; | |
import me.prettyprint.hector.api.Keyspace; | |
import me.prettyprint.hector.api.beans.Composite; | |
import me.prettyprint.hector.api.beans.DynamicComposite; | |
import me.prettyprint.hector.api.beans.HColumn; | |
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition; | |
import me.prettyprint.hector.api.ddl.KeyspaceDefinition; | |
import me.prettyprint.hector.api.factory.HFactory; | |
import me.prettyprint.hector.api.mutation.Mutator; | |
import me.prettyprint.hector.api.query.SliceQuery; | |
import org.apache.commons.configuration.Configuration; | |
import org.apache.commons.configuration.ConfigurationException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.ignitionone.Profile; | |
import com.ignitionone.ProfileAttribute; | |
import com.ignitionone.ProfileBucket; | |
import com.ignitionone.profilefront.exception.StoreException; | |
import com.ignitionone.profilefront.store.StoreKeeper; | |
import com.yammer.metrics.Metrics; | |
import com.yammer.metrics.core.Timer; | |
import com.yammer.metrics.core.TimerContext; | |
public class CassandraStoreKeeper extends StoreKeeper { | |
/* These are now read from configuration | |
------------------------------------------ */ | |
protected String columnFamilyName; | |
protected String keyspaceName; | |
private int cfKeyCacheSize = 0; | |
private int cfRowCacheSize = 0; | |
private String ksReplicationStrategy; | |
private Map<String, String> ksReplicationStrategyOptions = new HashMap<String,String>(); | |
public static HConsistencyLevel READ_CONSISTENCY = HConsistencyLevel.ONE; // TODO expose in configuration | |
public static HConsistencyLevel WRITE_CONSISTENCY = HConsistencyLevel.ANY; // TODO expose in configuration | |
/* Content-encoding flags | |
-------------------------- */ | |
public static final int ENCODING_INTEGER = 1; | |
public static final int ENCODING_STRING = 2; | |
private static final Logger LOG = LoggerFactory.getLogger(CassandraStoreKeeper.class); | |
/* These repesent Hector state | |
------------------------------- */ | |
protected Cluster cluster; | |
protected Keyspace profileKeyspace; | |
/* Metrics | |
----------- */ | |
protected final Timer readLatency = Metrics.newTimer(CassandraStoreKeeper.class, "read-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer writeLatency = Metrics.newTimer(CassandraStoreKeeper.class, "write-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer pageLatency = Metrics.newTimer(CassandraStoreKeeper.class, "page-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer pagePrepareLatency = Metrics.newTimer(CassandraStoreKeeper.class, "page-prepare-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
protected final Timer deserializationLatency = Metrics.newTimer(CassandraStoreKeeper.class, "deserialization-latency", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
public CassandraStoreKeeper(Configuration configuration) throws Exception { | |
super(configuration); | |
try { | |
// set schema-related properties | |
if (!configuration.containsKey("settings.keyspaceName")) throw new ConfigurationException("Keyspace name not defined"); | |
if (!configuration.containsKey("settings.columnFamilyName")) throw new ConfigurationException("Column family name not defined"); | |
setColumnFamilyName(configuration.getString("settings.columnFamilyName")); | |
setKeyspaceName(configuration.getString("settings.keyspaceName")); | |
//setCfKeyCacheSize(configuration.getInt("settings.columnFamilyKeyCacheSize", 200000)); | |
//setCfRowCacheSize(configuration.getInt("settings.columnFamilyRowCacheSize", 25000)); | |
setKsReplicationStrategy(configuration.getString("settings.replicationStrategy", "SimpleStrategy")); | |
// setKsReplicationStrategyOptions(configuration.getString("settings.replicationStrategyOptions", "replication_factor:2")); | |
String[] rSOpts = configuration.getString("settings.replicationStrategyOptions", "replication_factor:2").split(","); | |
for(String rsOpt:Arrays.asList(rSOpts)) { | |
String[] rSOptKV = rsOpt.split(":"); | |
if (rSOptKV.length == 2) getKsReplicationStrategyOptions().put(rSOptKV[0], rSOptKV[1]); | |
} | |
// try to connect to the Cassandra nodes & set Hector-related (connection) properties | |
LOG.trace("Connecting to Cassandra cluster.."); | |
// load hosts | |
Object property = configuration.getProperty("nodes.node.host"); | |
int nodes = 1; | |
if (null == property) { | |
// TODO maybe give more details about the name | |
throw new ConfigurationException("No nodes defined for the store keeper."); | |
} else if (property instanceof List) { | |
nodes = ((List) property).size(); | |
} | |
String hosts = ""; | |
for(int nodesIterator=0;nodesIterator<nodes;nodesIterator++) { | |
hosts = hosts.concat(configuration.getString("nodes.node("+nodesIterator+").host")+":"+configuration.getInt("nodes.node("+nodesIterator+").port")+","); | |
} | |
LOG.debug("Hosts {}",hosts); | |
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator(); | |
cassandraHostConfigurator.setHosts(hosts); | |
cassandraHostConfigurator.setLoadBalancingPolicy(new DynamicLoadBalancingPolicy()); | |
// cassandraHostConfigurator.setCassandraThriftSocketTimeout(configuration.getInt("settings.cassandraThriftSocketTimeout", 100)); | |
// cassandraHostConfigurator.setMaxActive(configuration.getInt("settings.maxActive", 50)); | |
// cassandraHostConfigurator.setUseSocketKeepalive(configuration.getBoolean("settings.useSocketKeepalive", true)); | |
cassandraHostConfigurator.setAutoDiscoverHosts(configuration.getBoolean("settings.runAutoDiscoveryAtStartup", true)); | |
cassandraHostConfigurator.setUseThriftFramedTransport(true); | |
setCluster(HFactory.getOrCreateCluster(configuration.getString("name"),cassandraHostConfigurator)); | |
ThriftKsDef pfKeySpace = (ThriftKsDef) getCluster() | |
.describeKeyspace(getKeyspaceName()); | |
if (pfKeySpace == null) { | |
LOG.warn("Keyspace {} was not found, creating it", getKeyspaceName()); | |
pfKeySpace = | |
(ThriftKsDef) HFactory.createKeyspaceDefinition(getKeyspaceName()); | |
pfKeySpace.setStrategyClass(getKsReplicationStrategy()); | |
pfKeySpace.setStrategyOptions(getKsReplicationStrategyOptions()); | |
getCluster().addKeyspace(pfKeySpace, true); | |
} | |
List<String> cfNames = new ArrayList<String>(); | |
for (ColumnFamilyDefinition cfFound : pfKeySpace.getCfDefs()) { | |
cfNames.add(cfFound.getName()); | |
} | |
if (!cfNames.contains(new String(getColumnFamilyName()))) { | |
LOG.warn("ColumnFamily {} was not found, creating it", getColumnFamilyName()); | |
BasicColumnFamilyDefinition pfProfilesDef = new BasicColumnFamilyDefinition(); | |
pfProfilesDef.setKeyspaceName(pfKeySpace.getName()); | |
pfProfilesDef.setName(getColumnFamilyName()); | |
ColumnFamilyDefinition cfDefStandard = new ThriftCfDef(pfProfilesDef); | |
cfDefStandard.setReadRepairChance(0.3); | |
getCluster().addColumnFamily(cfDefStandard, true); | |
} | |
ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel(); | |
ccl.setDefaultWriteConsistencyLevel(WRITE_CONSISTENCY); | |
ccl.setDefaultReadConsistencyLevel(READ_CONSISTENCY); | |
LOG.trace("Loading keyspace"); | |
setProfileKeyspace(HFactory.createKeyspace(getKeyspaceName(), getCluster(), ccl)); | |
markAsAvailable(); | |
} catch (Exception e) { | |
LOG.error("Caught exception while initializing store {}", e); | |
} | |
} | |
/** | |
* Read an entire profile row | |
*/ | |
@Override | |
public Profile get(String id) throws StoreException { | |
final TimerContext readLatencyContext = readLatency.time(); | |
// create the new Profile instance | |
Profile profile = new Profile(id); | |
try { | |
// load columns | |
List<HColumn<Composite, String>> strColumns = _getStringAttributes(id); | |
LOG.debug("Found "+strColumns.size()+" string columns.."); | |
for (HColumn<Composite, String> column:strColumns) { | |
LOG.debug("Profile#"+id+" unserializing string attribute "+column.getName()); | |
final TimerContext deserializationLatencyContext = deserializationLatency.time(); | |
// check the attribute name-encoding flag | |
switch(column.getName().get(2, new IntegerSerializer())) { | |
case CassandraStoreKeeper.ENCODING_INTEGER: | |
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new IntegerSerializer()), column.getValue(), column.getClock()); | |
// LOG.debug("Found a string column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new IntegerSerializer())+", value: "+column.getValue() ); | |
break; | |
case CassandraStoreKeeper.ENCODING_STRING: | |
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new StringSerializer()), column.getValue(), column.getClock()); | |
// LOG.debug("Found a string column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new StringSerializer())+", value: "+column.getValue() ); | |
break; | |
} | |
deserializationLatencyContext.stop(); | |
} | |
List<HColumn<Composite, Integer>> intColumns = _getIntegerAttributes(id); | |
LOG.debug("Found "+intColumns.size()+" integer columns.."); | |
for (HColumn<Composite, Integer> column:intColumns) { | |
LOG.debug("Profile#"+id+" unserializing integer attribute "+column.getName()); | |
final TimerContext deserializationLatencyContext = deserializationLatency.time(); | |
// check the attribute name-encoding flag | |
switch(column.getName().get(2, new IntegerSerializer())) { | |
case CassandraStoreKeeper.ENCODING_INTEGER: | |
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new IntegerSerializer()), column.getValue(), column.getClock()); | |
// LOG.debug("Found an integer column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new IntegerSerializer())+", value: "+column.getValue() ); | |
break; | |
case CassandraStoreKeeper.ENCODING_STRING: | |
profile.setAttribute(column.getName().get(1, new StringSerializer()), column.getName().get(3, new StringSerializer()), column.getValue(), column.getClock()); | |
// LOG.debug("Found an integer column under "+column.getName().get(1, new StringSerializer())+", attribute: "+column.getName().get(3, new StringSerializer())+", value: "+column.getValue() ); | |
break; | |
} | |
deserializationLatencyContext.stop(); | |
} | |
} catch(Exception e) { | |
// TODO Auto-generated catch block | |
} | |
readLatencyContext.stop(); | |
return profile; | |
} | |
/** | |
* | |
* @author andras | |
* | |
* @param <T> | |
*/ | |
private class ColumnLoader<T> { | |
private T t; | |
/** | |
* @param id | |
* @param start | |
* @param limit | |
* @param serializer | |
* @return | |
*/ | |
public List<HColumn<Composite, T>> pageColumnsFor(String id, Composite start, Composite end, int limit, AbstractSerializer<T> serializer) { | |
final TimerContext pagePrepareLatencyContext = pagePrepareLatency.time(); | |
List<HColumn<Composite, T>> results = new ArrayList<HColumn<Composite, T>>(); | |
SliceQuery<String, Composite, T> query = HFactory.createSliceQuery(_getProfileKeyspace(), new StringSerializer(), new CompositeSerializer(), serializer); | |
query.setColumnFamily(getColumnFamilyName()) | |
.setKey(id); | |
ColumnSliceIterator<String, Composite, T> sliceIterator = new ColumnSliceIterator<String, Composite, T>(query, start, end, false, limit); | |
pagePrepareLatencyContext.stop(); | |
// get the next column from the iterator, if there is a next one, and if we are not over our | |
while(sliceIterator.hasNext()) { | |
final TimerContext pageLatencyContext = pageLatency.time(); | |
results.add(sliceIterator.next()); | |
pageLatencyContext.stop(); | |
} | |
return results; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
package com.ignitionone.profilefront.storekeepers; | |
import java.io.ByteArrayInputStream; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.StringWriter; | |
import java.lang.reflect.InvocationTargetException; | |
import java.nio.ByteBuffer; | |
import java.nio.charset.CharacterCodingException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import me.prettyprint.hector.api.HConsistencyLevel; | |
import org.junit.After; | |
import org.junit.AfterClass; | |
import org.junit.Before; | |
import org.junit.BeforeClass; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.apache.cassandra.thrift.CassandraDaemon; | |
import org.apache.commons.configuration.Configuration; | |
import org.apache.commons.configuration.ConfigurationException; | |
import org.apache.commons.configuration.PropertiesConfiguration; | |
import org.apache.commons.configuration.XMLConfiguration; | |
import org.apache.commons.io.FileUtils; | |
import com.ignitionone.Profile; | |
import com.ignitionone.ProfileAttribute; | |
import com.ignitionone.ProfileTest; | |
import com.ignitionone.profilefront.exception.AttributeKeyTypeCollisionException; | |
import com.ignitionone.profilefront.exception.AttributeValueTypeCollisionException; | |
import com.ignitionone.profilefront.exception.StoreException; | |
import com.ignitionone.profilefront.store.StoreKeeper; | |
import com.ignitionone.profilefront.store.StoreManager; | |
import com.ignitionone.profilefront.store.keeper.CassandraStoreKeeper; | |
import static org.junit.Assert.assertTrue; | |
import static org.junit.Assert.assertEquals; | |
/** | |
* @author andras | |
* | |
*/ | |
public class CassandraStoreKeeperTest extends StoreKeeperTest { | |
private static CassandraDaemon cassandra; | |
public CassandraStoreKeeperTest() { | |
LOG = LoggerFactory.getLogger(CassandraStoreKeeperTest.class); | |
} | |
@BeforeClass | |
public static void setup() throws IOException, SecurityException, | |
IllegalArgumentException, NoSuchMethodException, | |
ClassNotFoundException, IllegalAccessException, | |
InvocationTargetException, InstantiationException, ConfigurationException, InterruptedException { | |
CassandraStoreKeeperTest.cassandra = new CassandraDaemon(); | |
CassandraStoreKeeperTest.cassandra.init(null); | |
CassandraStoreKeeperTest.cassandra.start(); | |
// Thread.sleep(60000); | |
XMLConfiguration configuration = new XMLConfiguration(); | |
configuration.addProperty("configuration.stores.store.name", "TestCassandra"); | |
configuration.addProperty("configuration.stores.store.keeper", "CassandraStoreKeeper"); | |
configuration.addProperty("configuration.stores.store.settings.keyspaceName", "ProfilefrontTest"); | |
configuration.addProperty("configuration.stores.store.settings.columnFamilyName", "Profiles"); | |
configuration.addProperty("configuration.stores.store.settings.columnFamilyKeyCacheSize", "10000"); | |
configuration.addProperty("configuration.stores.store.settings.columnFamilyRowCacheSize", "0"); | |
configuration.addProperty("configuration.stores.store.settings.replicationStrategy", "SimpleStrategy"); | |
configuration.addProperty("configuration.stores.store.settings.replicationStrategyOptions", "replication_factor:1"); | |
configuration.addProperty("configuration.stores.store.nodes.node.host", "127.0.0.1"); | |
configuration.addProperty("configuration.stores.store.nodes.node.port", "10160"); | |
configuration.addProperty("configuration.stores.store.cassandraThriftSocketTimeout", "10000"); | |
CassandraStoreKeeper.READ_CONSISTENCY = HConsistencyLevel.ONE; | |
CassandraStoreKeeper.WRITE_CONSISTENCY = HConsistencyLevel.ONE; | |
setManager(new StoreManager()); | |
getManager().registerAndInitializeStore(configuration.subset("configuration.stores")); | |
} | |
/** | |
* @throws java.lang.Exception | |
*/ | |
@Before | |
public void setUp() throws Exception { | |
storekeeper = getManager().getStoreKeeper("TestCassandra"); | |
} | |
/** | |
* @throws java.lang.Exception | |
*/ | |
@After | |
public void tearDown() throws Exception { | |
} | |
@AfterClass | |
public static void destroy() { | |
// getManager().shutdown(); | |
cassandra.stop(); | |
try { | |
FileUtils.deleteDirectory(new File("/tmp/profilefront-cassandra")); | |
} catch (IOException e) { | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ignitionone.profilefront.storekeepers; | |
import static org.junit.Assert.*; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.UUID; | |
import java.util.concurrent.TimeUnit; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.ignitionone.Profile; | |
import com.ignitionone.ProfileAttribute; | |
import com.ignitionone.ProfileBucket; | |
import com.ignitionone.ProfileTest; | |
import com.ignitionone.profilefront.exception.AttributeKeyTypeCollisionException; | |
import com.ignitionone.profilefront.exception.AttributeValueTypeCollisionException; | |
import com.ignitionone.profilefront.exception.StoreException; | |
import com.ignitionone.profilefront.store.StoreKeeper; | |
import com.ignitionone.profilefront.store.StoreManager; | |
import com.yammer.metrics.Metrics; | |
import com.yammer.metrics.core.Timer; | |
import com.yammer.metrics.core.TimerContext; | |
import com.yammer.metrics.stats.Snapshot; | |
abstract public class StoreKeeperTest { | |
private static StoreManager manager; | |
private List<Profile> profiles; | |
protected StoreKeeper storekeeper; | |
protected static Logger LOG = LoggerFactory.getLogger(StoreKeeperTest.class); | |
@Test | |
public void testThroughput() throws AttributeKeyTypeCollisionException, AttributeValueTypeCollisionException, StoreException { | |
int overMaxLatency = 0; | |
final Timer writeTimer = Metrics.newTimer(this.getClass(), "write-performance", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
final Timer readTimer = Metrics.newTimer(this.getClass(), "read-performance", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
setProfiles(ProfileTest.generateProfiles(1000, (ProfileTest.FEATURE_INT_ATTRIBUTE | ProfileTest.FEATURE_STRING_ATTRIBUTE | ProfileTest.FEATURE_PHP_VISITOR_OBJECT | ProfileTest.FEATURE_INTERESTS))); | |
LOG.trace("Write throughput test with {} profiles", getProfiles().size()); | |
// warmup.. | |
for(Profile profile:getProfiles()) { | |
storekeeper.createUpdate(profile); | |
storekeeper.get(profile.getId()); | |
} | |
for(Profile profile:getProfiles()) { | |
// write | |
TimerContext writeContext = writeTimer.time(); | |
storekeeper.createUpdate(profile); | |
writeContext.stop(); | |
// read | |
TimerContext readContext = readTimer.time(); | |
storekeeper.get(profile.getId()); | |
readContext.stop(); | |
} | |
Snapshot writeSnapshot = writeTimer.getSnapshot(); | |
LOG.debug("Write Throughput | Mean: "+writeTimer.mean()+", Max: "+writeTimer.max()+" stDev: "+writeTimer.stdDev()+", 75th:"+writeSnapshot.get75thPercentile()+", 95th: "+writeSnapshot.get95thPercentile()+", 98th: "+writeSnapshot.get98thPercentile()+", 99th: "+writeSnapshot.get99thPercentile()+", 999th: "+writeSnapshot.get999thPercentile()); | |
Snapshot readSnapshot = readTimer.getSnapshot(); | |
LOG.debug("Read Throughput | Mean: "+readTimer.mean()+", Max: "+readTimer.max()+" stDev: "+readTimer.stdDev()+", 75th:"+readSnapshot.get75thPercentile()+", 95th: "+readSnapshot.get95thPercentile()+", 98th: "+readSnapshot.get98thPercentile()+", 99th: "+readSnapshot.get99thPercentile()+", 999th: "+readSnapshot.get999thPercentile()); | |
} | |
protected static StoreManager getManager() { | |
return manager; | |
} | |
protected static void setManager(StoreManager manager) { | |
StoreKeeperTest.manager = manager; | |
} | |
protected List<Profile> getProfiles() { | |
return profiles; | |
} | |
protected void setProfiles(List<Profile> profiles) { | |
this.profiles = profiles; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment