Skip to content

Instantly share code, notes, and snippets.

@stliu
Created August 20, 2016 09:34
Show Gist options
  • Save stliu/e20e0b74d5195c2cb7d4f875bb4aa48b to your computer and use it in GitHub Desktop.
Save stliu/e20e0b74d5195c2cb7d4f875bb4aa48b to your computer and use it in GitHub Desktop.
paging with cassandra
package com.example;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.datastax.driver.core.utils.UUIDs;
import lombok.Data;
import java.util.*;
public class TestCassandraQuery {
public static final String KEYSPACE = "weichat";
public static final String TABLE = "messages";
public static final int sessionCount = 1;
public static final int messagePerSession = 1000;
public static void main(String[] args) {
// At initialization:
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.build();
Session session = cluster.connect();
createKeyspace(session);
createTable(session);
List<Message> messages = buildMessages();
messages.stream().map(message -> QueryBuilder.insertInto(KEYSPACE, TABLE)
.value("session_id", message.getSessionId())
.value("message_id", message.getMessageId())
.value("from_user", message.getFromUser())
.value("to_user", message.getToUser())
.value("body", message.getBody()))
.forEach(insert -> execute(session, insert));
UUID sessionId = messages.get(0).getSessionId();
int pageSize = 10;
String nextPage = fetchByPage(session, sessionId, pageSize, null);
while (nextPage != null) {
nextPage = fetchByPage(session, sessionId, pageSize, nextPage);
}
execute(session, SchemaBuilder.dropTable(KEYSPACE, TABLE));
execute(session, SchemaBuilder.dropKeyspace(KEYSPACE));
session.close();
cluster.close();
}
private static List<Message> buildMessages() {
List<Message> messages = new ArrayList<>(sessionCount * messagePerSession);
for (int i = 0; i < 100; i++) {
UUID sessionId = UUIDs.timeBased();
String from = "f" + i;
String to = "t" + i;
for (int j = 0; j < 1000; j++) {
UUID messageId = UUIDs.timeBased();
String msg = "msg-" + i + "-" + j;
Message message = new Message();
message.setSessionId(sessionId);
message.setMessageId(messageId);
message.setFromUser(from);
message.setToUser(to);
message.setBody(msg);
messages.add(message);
}
}
return messages;
}
@Data
public static class Message {
private UUID sessionId;
private UUID messageId;
private String fromUser;
private String toUser;
private String body;
}
private static void createTable(Session session) {
Statement statement = SchemaBuilder.createTable(KEYSPACE, TABLE)
.addPartitionKey("session_id", DataType.timeuuid())
.addColumn("from_user", DataType.varchar())
.addColumn("to_user", DataType.varchar())
.addColumn("body", DataType.text())
.addClusteringColumn("message_id", DataType.timeuuid())
.ifNotExists()
.withOptions().clusteringOrder("message_id", SchemaBuilder.Direction.DESC);
execute(session, statement);
}
private static void createKeyspace(Session session) {
Map<String, Object> replication = new HashMap<>();
replication.put("class", "NetworkTopologyStrategy");
replication.put("datacenter1", 1);
Statement statement = SchemaBuilder.createKeyspace(KEYSPACE).ifNotExists().with().replication(replication).durableWrites(false);
execute(session, statement);
}
private static String fetchByPage(Session session, UUID sessionId, int pageSize, String pagingStateBase64ed) {
Statement statement = QueryBuilder.select().all().from(KEYSPACE, TABLE).where(QueryBuilder.eq("session_id", sessionId));
statement.setFetchSize(pageSize);
if (pagingStateBase64ed != null) {
byte[] bytes = Base64.getUrlDecoder().decode(pagingStateBase64ed);
PagingState pagingState = PagingState.fromBytes(bytes);
statement.setPagingState(pagingState);
}
ResultSet resultSet = execute(session, statement);
PagingState pagingState = resultSet.getExecutionInfo().getPagingState();
String base64Url = pagingState != null ? Base64.getUrlEncoder().encodeToString(pagingState.toBytes()) : null;
System.out.println("paging state with base64 url " + base64Url);
int rowIndex = 0;
for (Row row : resultSet) {
UUID sid = row.get("session_id", UUID.class);
UUID messageId = row.get("message_id", UUID.class);
String body = row.getString("body");
System.out.println(rowIndex + " session " + sid + " message " + messageId + " body " + body);
if (++rowIndex >= pageSize) {
break;
}
}
return base64Url;
}
private static ResultSet execute(Session session, Statement statement) {
System.out.println("Executing:");
System.out.println("\t" + statement);
return session.execute(statement);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment