Last active
December 16, 2015 13:39
-
-
Save ajjain/5443381 to your computer and use it in GitHub Desktop.
Generic Cassandra composite column reader using Hector client library.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* reads the data from the composite column family. | |
* | |
* @param rowKey unique key which identifies a row in cassandra column family. | |
* @param columnFamilyName represents the column family which contains the data to be looked up. | |
* @param startValue range query start parameter | |
* @param endValue range query end parameter | |
* @param compositeColumnSerializer represents mapping from column name to {@link Serializer} | |
* @param startComparator holds the comparator value to start the range query | |
* @param endComparator holds the comparator value to end the range query | |
* @param columnSerializer output column serializer mapping, defaulted to {@link StringSerializer} | |
* @param reverse fetches latest columns first if set to true otherwise vice versa | |
* @return | |
*/ | |
public Map<String, Object> read ( | |
String rowKey, | |
String columnFamilyName, | |
LinkedHashMap<String, Object> startValue, | |
LinkedHashMap<String, Object> endValue, | |
Map<String, Serializer> compositeColumnSerializer, | |
Map<String, Comparator> startComparator, | |
Map<String, Comparator> endComparator, | |
Map<String, Serializer> columnSerializer, | |
boolean reverse | |
){ | |
// generate the criteria composites based on value and comparator using appropriate serializers | |
Composite startCriteria = getCriteria(startValue, startComparator, compositeColumnSerializer); | |
Composite endCriteria = getCriteria(endValue, endComparator, compositeColumnSerializer); | |
// create a slice query with row key and range set | |
SliceQuery<String, Composite, byte[]> sliceQuery = getHectorTemplate().createSliceQuery( | |
stringSerializer, | |
CompositeSerializer.get(), | |
BytesArraySerializer.get() | |
); | |
sliceQuery.setColumnFamily(columnFamilyName); | |
sliceQuery.setKey(rowKey); | |
if(reverse){ | |
sliceQuery.setRange(endCriteria, startCriteria, reverse, 100); | |
} | |
else{ | |
sliceQuery.setRange(startCriteria, endCriteria, reverse, 100); | |
} | |
// generate the result set and map it to respective data type based on column serializer information. | |
QueryResult<ColumnSlice<Composite, byte[]>> result = sliceQuery.execute(); | |
if(logger.isDebugEnabled()){ | |
printResults(result); | |
} | |
List<HColumn<Composite, byte[]>> columns = result.get().getColumns(); | |
Map<String, Object> values = new HashMap<String, Object>(); | |
for(HColumn<Composite, byte[]> col: columns){ | |
int size = col.getName().size(); | |
String columnName = col.getName().get(size - 1, StringSerializer.get()); | |
logger.trace("ColumnName {}", columnName); | |
if(values.containsKey(columnName)){ | |
break; | |
} | |
else{ | |
Serializer valueSerializer = columnSerializer.get(columnName); | |
if(valueSerializer == null){ | |
valueSerializer = StringSerializer.get(); | |
} | |
logger.trace("ColumnName {}, Value {}", columnName, valueSerializer.fromBytes(col.getValue())); | |
values.put(columnName, valueSerializer.fromBytes(col.getValue())); | |
} | |
} | |
return values; | |
} | |
/** | |
* @param valueMap | |
* @param comparator | |
* @param compositeColumnSerializer | |
* @return | |
*/ | |
private Composite getCriteria(LinkedHashMap<String, Object> valueMap, Map<String, Comparator> comparator, Map<String, Serializer> compositeColumnSerializer){ | |
ComponentEquality defaultCE = ComponentEquality.EQUAL; | |
Composite startComposite = new Composite(); | |
int index = 0; | |
for(Entry<String, Object> entry : valueMap.entrySet()){ | |
logger.trace("getting criteria for key {}", entry.getKey()); | |
ComponentEquality equality = getComponentEquality(comparator.get(entry.getKey())); | |
logger.trace("looked up equality {}", equality); | |
if(equality == null){ | |
equality = defaultCE; | |
} | |
Serializer sz = compositeColumnSerializer==null ? stringSerializer : compositeColumnSerializer.get(entry.getKey()); | |
if(sz == null){ | |
sz = stringSerializer; | |
} | |
startComposite.addComponent(index++, valueMap.get(entry.getKey()), equality); | |
} | |
return startComposite; | |
} | |
/** | |
* @param comparator | |
* @return | |
*/ | |
private ComponentEquality getComponentEquality(Comparator comparator){ | |
if(Comparator.EQ.equals(comparator)){ | |
return ComponentEquality.EQUAL; | |
} | |
else if(Comparator.LTEQ.equals(comparator)){ | |
return ComponentEquality.LESS_THAN_EQUAL; | |
} | |
else if(Comparator.GTEQ.equals(comparator)){ | |
return ComponentEquality.GREATER_THAN_EQUAL; | |
} | |
else{ | |
throw new RuntimeException("Comparator ["+comparator+"] is not supported"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment