Skip to content

Instantly share code, notes, and snippets.

@ajjain
Last active December 16, 2015 13:39
Show Gist options
  • Save ajjain/5443381 to your computer and use it in GitHub Desktop.
Save ajjain/5443381 to your computer and use it in GitHub Desktop.
Generic Cassandra composite column reader using Hector client library.
/**
* reads the data from the composite column family.
*
* @param rowKey unique key which identifies a row in cassandra column family.
* @param columnFamilyName represents the column family which contains the data to be looked up.
* @param startValue range query start parameter
* @param endValue range query end parameter
* @param compositeColumnSerializer represents mapping from column name to {@link Serializer}
* @param startComparator holds the comparator value to start the range query
* @param endComparator holds the comparator value to end the range query
* @param columnSerializer output column serializer mapping, defaulted to {@link StringSerializer}
* @param reverse fetches latest columns first if set to true otherwise vice versa
* @return
*/
public Map<String, Object> read (
String rowKey,
String columnFamilyName,
LinkedHashMap<String, Object> startValue,
LinkedHashMap<String, Object> endValue,
Map<String, Serializer> compositeColumnSerializer,
Map<String, Comparator> startComparator,
Map<String, Comparator> endComparator,
Map<String, Serializer> columnSerializer,
boolean reverse
){
// generate the criteria composites based on value and comparator using appropriate serializers
Composite startCriteria = getCriteria(startValue, startComparator, compositeColumnSerializer);
Composite endCriteria = getCriteria(endValue, endComparator, compositeColumnSerializer);
// create a slice query with row key and range set
SliceQuery<String, Composite, byte[]> sliceQuery = getHectorTemplate().createSliceQuery(
stringSerializer,
CompositeSerializer.get(),
BytesArraySerializer.get()
);
sliceQuery.setColumnFamily(columnFamilyName);
sliceQuery.setKey(rowKey);
if(reverse){
sliceQuery.setRange(endCriteria, startCriteria, reverse, 100);
}
else{
sliceQuery.setRange(startCriteria, endCriteria, reverse, 100);
}
// generate the result set and map it to respective data type based on column serializer information.
QueryResult<ColumnSlice<Composite, byte[]>> result = sliceQuery.execute();
if(logger.isDebugEnabled()){
printResults(result);
}
List<HColumn<Composite, byte[]>> columns = result.get().getColumns();
Map<String, Object> values = new HashMap<String, Object>();
for(HColumn<Composite, byte[]> col: columns){
int size = col.getName().size();
String columnName = col.getName().get(size - 1, StringSerializer.get());
logger.trace("ColumnName {}", columnName);
if(values.containsKey(columnName)){
break;
}
else{
Serializer valueSerializer = columnSerializer.get(columnName);
if(valueSerializer == null){
valueSerializer = StringSerializer.get();
}
logger.trace("ColumnName {}, Value {}", columnName, valueSerializer.fromBytes(col.getValue()));
values.put(columnName, valueSerializer.fromBytes(col.getValue()));
}
}
return values;
}
/**
* @param valueMap
* @param comparator
* @param compositeColumnSerializer
* @return
*/
private Composite getCriteria(LinkedHashMap<String, Object> valueMap, Map<String, Comparator> comparator, Map<String, Serializer> compositeColumnSerializer){
ComponentEquality defaultCE = ComponentEquality.EQUAL;
Composite startComposite = new Composite();
int index = 0;
for(Entry<String, Object> entry : valueMap.entrySet()){
logger.trace("getting criteria for key {}", entry.getKey());
ComponentEquality equality = getComponentEquality(comparator.get(entry.getKey()));
logger.trace("looked up equality {}", equality);
if(equality == null){
equality = defaultCE;
}
Serializer sz = compositeColumnSerializer==null ? stringSerializer : compositeColumnSerializer.get(entry.getKey());
if(sz == null){
sz = stringSerializer;
}
startComposite.addComponent(index++, valueMap.get(entry.getKey()), equality);
}
return startComposite;
}
/**
* @param comparator
* @return
*/
private ComponentEquality getComponentEquality(Comparator comparator){
if(Comparator.EQ.equals(comparator)){
return ComponentEquality.EQUAL;
}
else if(Comparator.LTEQ.equals(comparator)){
return ComponentEquality.LESS_THAN_EQUAL;
}
else if(Comparator.GTEQ.equals(comparator)){
return ComponentEquality.GREATER_THAN_EQUAL;
}
else{
throw new RuntimeException("Comparator ["+comparator+"] is not supported");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment