Skip to content

Instantly share code, notes, and snippets.

@stliu
Created August 20, 2016 10:33
Show Gist options
  • Save stliu/12becc2e229d73f24a3cd8bf1b5e7c6f to your computer and use it in GitHub Desktop.
Save stliu/12becc2e229d73f24a3cd8bf1b5e7c6f to your computer and use it in GitHub Desktop.
counter
package com.example;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.Assignment;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Update;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.datastax.driver.core.utils.UUIDs;
import lombok.Data;
import java.util.*;
/**
* 使用cassandra的counter表来记录一个坐席的未读消息数
*/
public class UnreadMessageCount {
public static final String KEYSPACE = "weichat";
public static final String TABLE = "unread_message_count";
public static final String UNREAD_MESSAGE_COUNT = "unread_message_count";
public static final String AGENT_ID = "agent_id";
public static final String SESSION_ID = "session_id";
public static void main(String[] args) {
// At initialization:
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.build();
Session session = cluster.connect();
createKeyspace(session);
createTable(session);
UUID agentId = UUIDs.timeBased();
UUID sessionId = UUIDs.timeBased();
for (int i = 0; i < 100; i++) {
addNewMessage(session, sessionId, agentId);
}
long count = getUnreadMessageCount(session, sessionId, agentId);
System.out.println("current unread message count is " + count + " which should be " + 100);
for (int i = 0; i < 50; i++) {
markReadMessage(session, sessionId, agentId);
}
count = getUnreadMessageCount(session, sessionId, agentId);
System.out.println("current unread message count is " + count + " which should be " + 50);
execute(session, SchemaBuilder.dropTable(KEYSPACE, TABLE));
execute(session, SchemaBuilder.dropKeyspace(KEYSPACE));
session.close();
cluster.close();
}
private static void addNewMessage(Session session, UUID sessionId, UUID agentId) {
changeCounterValue(session, sessionId, agentId, QueryBuilder.incr(UNREAD_MESSAGE_COUNT));
}
private static void markReadMessage(Session session, UUID sessionId, UUID agentId) {
changeCounterValue(session, sessionId, agentId, QueryBuilder.decr(UNREAD_MESSAGE_COUNT));
}
private static long getUnreadMessageCount(Session session, UUID sessionId, UUID agentId) {
Statement update = QueryBuilder.select(UNREAD_MESSAGE_COUNT)
.from(KEYSPACE, TABLE)
.where(QueryBuilder.eq(AGENT_ID, agentId))
.and(QueryBuilder.eq(SESSION_ID, sessionId));
ResultSet resultSet = execute(session, update);
Row counterRow = resultSet.one();
return counterRow == null ? 0 : counterRow.getLong(UNREAD_MESSAGE_COUNT);
}
private static void changeCounterValue(Session session, UUID sessionId, UUID agentId, Assignment assignment) {
Statement update = QueryBuilder.update(KEYSPACE, TABLE)
.with(assignment)
.where(QueryBuilder.eq(AGENT_ID, agentId))
.and(QueryBuilder.eq(SESSION_ID, sessionId));
execute(session, update);
}
private static void createTable(Session session) {
Statement statement = SchemaBuilder.createTable(KEYSPACE, TABLE)
.addPartitionKey(AGENT_ID, DataType.timeuuid())
.addClusteringColumn(SESSION_ID, DataType.timeuuid())
.addColumn(UNREAD_MESSAGE_COUNT, DataType.counter())
.ifNotExists();
execute(session, statement);
}
private static void createKeyspace(Session session) {
Map<String, Object> replication = new HashMap<>();
replication.put("class", "NetworkTopologyStrategy");
replication.put("datacenter1", 1);
Statement statement = SchemaBuilder.createKeyspace(KEYSPACE).ifNotExists().with().replication(replication).durableWrites(false);
execute(session, statement);
}
private static ResultSet execute(Session session, Statement statement) {
// System.out.println("Executing:");
// System.out.println("\t" + statement);
return session.execute(statement);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment