Skip to content

Instantly share code, notes, and snippets.

@suyash
Created March 20, 2016 18:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suyash/5ee86376ef24a400f93e to your computer and use it in GitHub Desktop.
Save suyash/5ee86376ef24a400f93e to your computer and use it in GitHub Desktop.
Cassandra indexing using Thrift
### Java ###
*.class
# Package Files #
*.jar
*.war
*.ear
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
### Intellij ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm
*.iml
## Directory-based project format:
.idea/
## File-based project format:
*.ipr
*.iws
## Plugin-specific files:
# IntelliJ
/out/
### Gradle ###
.gradle
build/
# Ignore Gradle GUI config
gradle-app.setting
# Nothing new/important
gradlew*
gradle/
# logs
wc.out
group 'in.suyash.CassandraInput'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.5
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.cassandra', name: 'cassandra-all', version: '2.1.5'
}
sourceSets {
main {
java {
srcDir './'
}
}
}
mainClassName = "Main"
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static String KEYSPACE = "wordcount";
private static String COLUMN_FAMILY = "input";
private static String OUTPUT_COLUMN_FAMILY = "output";
private static Cassandra.Iface createConnection () throws TTransportException {
String host = "localhost";
int port = 9160;
TSocket socket = new TSocket(host, port);
TTransport transport = new TFramedTransport(socket);
TProtocol protocol = new TBinaryProtocol(transport);
transport.open();
return new Cassandra.Client(protocol);
}
private static void createKeyspace (Cassandra.Iface client) throws TException {
try {
client.describe_keyspace(KEYSPACE);
} catch (Exception e) {
String query = "CREATE KEYSPACE " + KEYSPACE
+ " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }";
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
logger.info("created keyspace {}", KEYSPACE);
}
}
private static void createTables (Cassandra.Iface client) throws TException {
String query = "CREATE TABLE " + KEYSPACE + "." + COLUMN_FAMILY + " (id uuid PRIMARY KEY, word text)";
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
logger.info("created table {}.{}", KEYSPACE, COLUMN_FAMILY);
query = "CREATE TABLE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " (word text PRIMARY KEY, count text)";
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
logger.info("created keyspace {}.{}", KEYSPACE, OUTPUT_COLUMN_FAMILY);
}
private static void insertData (Cassandra.Iface client) throws TException {
String[] words = {
"Foo",
"Foo",
"Foo",
"Bar",
"Bar",
"Baz"
};
int times = 10;
String query = "INSERT INTO " + KEYSPACE + "." + COLUMN_FAMILY + " (id, word) VALUES (?, ?)";
CqlPreparedResult result = client.prepare_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE);
for (int i = 0 ; i < words.length ; i++) {
for (int j = 0 ; j < times ; j++) {
List<ByteBuffer> values = new ArrayList<ByteBuffer>();
values.add(ByteBufferUtil.bytes(UUID.randomUUID()));
values.add(ByteBufferUtil.bytes(words[i]));
client.execute_prepared_cql3_query(result.itemId, values, ConsistencyLevel.ONE);
}
}
}
public static void main (String[] args) throws TException {
Cassandra.Iface client = createConnection();
createKeyspace(client);
createTables(client);
insertData(client);
}
}
rootProject.name = 'CassandraInput'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment