Skip to content

Instantly share code, notes, and snippets.

@tamilmani58
Last active August 29, 2015 14:23
Show Gist options
  • Save tamilmani58/aaa093cbc3855dca5fc5 to your computer and use it in GitHub Desktop.
Save tamilmani58/aaa093cbc3855dca5fc5 to your computer and use it in GitHub Desktop.
Sample SSTable Writer Code
private static void writeTSMetric(String KEYSPACE) {
// Create output directory that has keyspace and table name in the path
String TABLE = CFTimeSliceMetric.name;
File outputDir = new File(DEFAULT_OUTPUT_DIR + File.separator + KEYSPACE + File.separator + TABLE);
if (!outputDir.exists() && !outputDir.mkdirs()) {
throw new RuntimeException("Cannot create output directory: " + outputDir);
}
/**
* Schema for bulk loading table.
* It is important not to forget adding keyspace name before table name,
* otherwise CQLSSTableWriter throws exception.
*/
String SCHEMA = String.format("CREATE TABLE %s.%s (" +
"RTB text, TimeBucket int, Grouptype text, Metric text, Idtype text,GroupId text,Id text, " +
"Combogroup text,Combination text, value counter, " +
"PRIMARY KEY ((RTB, TimeBucket,Grouptype, Metric,Idtype), GroupId ,Id, Combogroup, Combination) " +
") WITH COMPACT STORAGE AND " +
" CLUSTERING ORDER BY ( GroupId DESC ,Id DESC, Combogroup DESC,Combination DESC) "
, KEYSPACE, TABLE);
/**
* INSERT statement to bulk load.
* It is like prepared statement. You fill in place holder for each data.
*/
String UPDATE_STMT = String.format("update %s.%s set value=value+? where RTB=?" +
" and TimeBucket=? and Grouptype=? and Metric=? and Idtype=? and GroupId=? and Id=? and Combogroup=? and Combination=?"
, KEYSPACE, TABLE);
// Prepare SSTable writer
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory
builder.inDirectory(outputDir)
// set target schema
.forTable(SCHEMA)
// set CQL statement to put data
.using(UPDATE_STMT)
// set partitioner if needed
// default is Murmur3Partitioner so set if you use different one.
.withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
ColumnList<TimeSliceMetricColumn> columns;
int pageize = 3000;
try {
ArrayList<Row> rows = new ArrayList<Row>();
OperationResult<Rows<TimeSliceMetricRowKey, TimeSliceMetricColumn>> result =
keyspace.prepareQuery(TimeSliceMetric)
.getAllRows()
.withColumnRange(new RangeBuilder().setLimit(0).build()) // RangeBuilder will be available in version 1.13
.execute();
for (Row<TimeSliceMetricRowKey, TimeSliceMetricColumn> row : result.getResult()) {
TimeSliceMetricRowKey rk = row.getKey();
if(rk.validate()){
continue;
}
rows.add(row);
}
System.out.println("Got " + rows.size() + " rows for " + TABLE);
for (Row<TimeSliceMetricRowKey, TimeSliceMetricColumn> row : rows) {
TimeSliceMetricRowKey rk = row.getKey();
RowQuery<TimeSliceMetricRowKey, TimeSliceMetricColumn> query = keyspace
.prepareQuery(TimeSliceMetric)
.getKey(rk)
.autoPaginate(true)
.withColumnRange(new RangeBuilder().setLimit(pageize).build());
int page = 0;
while (!(columns = query.execute().getResult()).isEmpty()) {
page++;
System.out.println("Going Page " + page + " for " + TABLE);
for (Column<TimeSliceMetricColumn> c : columns) {
//Got the bugger!!
TimeSliceMetricColumn colname = c.getName();
if(colname.validate()) {
continue;
}
Long value = c.getLongValue();
if (value == null) continue;
try {
String groupId = "";
if (colname.groupId != null)
groupId = colname.groupId;
if (colname.pid == null)
continue;
writer.addRow(value,
rk.rtb,
rk.tb,
rk.pg,
rk.metric,
rk.idtype,
groupId,
colname.pid,
colname.combogroup,
colname.combo);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
try {
writer.close();
} catch (IOException ignore) {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment