Skip to content

Instantly share code, notes, and snippets.

View zznate's full-sized avatar

Nate McCall zznate

View GitHub Profile
@Test
public void testBatchMutate() throws IllegalArgumentException, NoSuchElementException,
IllegalStateException, NotFoundException, TException, Exception {
Map<String, Map<String, List<Mutation>>> outerMutationMap = new HashMap<String, Map<String,List<Mutation>>>();
for (int i = 0; i < 10; i++) {
Map<String, List<Mutation>> mutationMap = new HashMap<String, List<Mutation>>();
ArrayList<Mutation> mutations = new ArrayList<Mutation>(10);
for (int j = 0; j < 10; j++) {
Given Keyspace ks, and an optional startOnKey string, the following snippet returns a Set of 100 keys without de-serializing the underlying columns. The tradeoff is that you have no local check on tombstones. Pass the last key retrieved as startOnKey to page.
ColumnParent clp = new ColumnParent(columnFamilyName);
SlicePredicate sp = new SlicePredicate();
sp.setColumn_names(new ArrayList<byte[]>());
KeyRange keyRange = new KeyRange();
keyRange.setCount(100);
if ( StringUtils.isBlank(startOnKey)) {
The following would work with ExampleDao if you make execute public. Just an example of how you can get real tight encapsulation with a command pattern.
Given the following method on my "DAO" class:
public void batchInsert(final String key, final Map<String, String> columnMap) throws Exception {
BatchMutationCallback batchInsertCallback = new BatchMutationCallback(columnFamilyName, buildTimestamp());
for (String colName : columnMap.keySet()) {
batchInsertCallback.addInsertion(key, colName, columnMap.get(colName));
}
execute(batchInsertCallback);
Notes:
- From within main() method, I pass in threadCount to control number of threads:
- DumpFileReader is a thin wrapper around a BufferedReader that controls paging, and CSV splitting on the MySQL dump file
- The number of lines DumpFileReader grabs is also passed in as a main argument.
The big take-away here is that messing around with threads and line count, I really got an excellent picture of what my test environment was capable of
....
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
diff --git a/pom.xml b/pom.xml
index 4a2d807..0e9517e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,6 +10,22 @@
<build>
<finalName>cassandra-webconsole</finalName>
+ <plugins>
+
Assuming I have three hosts I want to configure Identically:
CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("localhost:9170,localhost:9171,localhost:9172");
cassandraHostConfigurator.setMaxActive(20);
cassandraHostConfigurator.setMaxIdle(5);
cassandraHostConfigurator.setCassandraThriftSocketTimeout(3000);
cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(4000);
cassandraHostConfigurator.setExhaustedPolicy(ExhaustedPolicy.WHEN_EXHAUSTED_GROW);
CassandraClientPool cassandraClientPool = CassandraClientPoolFactory.INSTANCE.createNew(cassandraHostConfigurator);
forcing a compaction on CF with indexed cols occasionally causes:
-------------
ERROR 14:33:31,430 Uncaught exception in thread Thread[CompactionExecutor:1,5,main]
java.util.concurrent.ExecutionException: java.io.IOException: Keys must be written in ascending order.
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor.afterExecute(DebuggableThreadPoolExecutor.java:87)
at org.apache.cassandra.db.CompactionManager$CompactionExecutor.afterExecute(CompactionManager.java:659)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:888)
nate:cassandra-TRUNK$ python=test nosetests --tests=system.test_thrift_server:TestMutations.test_index_scan
/usr/local/lib/python2.6/dist-packages/thrift/Thrift.py:58: DeprecationWarning: BaseException.message has been deprecated as of Python 2.6
self.message = message
E
======================================================================
ERROR: system.test_thrift_server.TestMutations.test_index_scan
----------------------------------------------------------------------
Traceback (most recent call last):
File "/var/lib/python-support/python2.6/nose/case.py", line 182, in runTest
self.test(*self.arg)
11:45:56,075 ERROR JMXEnabledThreadPoolExecutor:102 - Error in ThreadPoolExecutor
java.lang.RuntimeException: java.lang.ClassCastException: org.apache.cassandra.db.Column cannot be cast to org.apache.cassandra.db.SuperColumn
at org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:34)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.lang.ClassCastException: org.apache.cassandra.db.Column cannot be cast to org.apache.cassandra.db.SuperColumn
at org.apache.cassandra.db.SuperColumnSerializer.serialize(SuperColumn.java:318)
at org.apache.cassandra.db.SuperColumnSerializer.serialize(SuperColumn.java:298)
at org.apache.cassandra.db.ColumnFamilySerializer.serializeForSSTable(ColumnFamilySerializer.java:84)
EF.....E...E........................
======================================================================
ERROR: Failure: ImportError (No module named avro.ipc)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/var/lib/python-support/python2.6/nose/loader.py", line 364, in loadTestsFromName
addr.filename, addr.module)
File "/var/lib/python-support/python2.6/nose/importer.py", line 39, in importFromPath
return self.importFromDir(dir_path, fqname)
File "/var/lib/python-support/python2.6/nose/importer.py", line 84, in importFromDir