Last active
August 29, 2015 14:23
-
-
Save tamilmani58/aaa093cbc3855dca5fc5 to your computer and use it in GitHub Desktop.
Sample SSTable Writer Code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static void writeTSMetric(String KEYSPACE) { | |
// Create output directory that has keyspace and table name in the path | |
String TABLE = CFTimeSliceMetric.name; | |
File outputDir = new File(DEFAULT_OUTPUT_DIR + File.separator + KEYSPACE + File.separator + TABLE); | |
if (!outputDir.exists() && !outputDir.mkdirs()) { | |
throw new RuntimeException("Cannot create output directory: " + outputDir); | |
} | |
/** | |
* Schema for bulk loading table. | |
* It is important not to forget adding keyspace name before table name, | |
* otherwise CQLSSTableWriter throws exception. | |
*/ | |
String SCHEMA = String.format("CREATE TABLE %s.%s (" + | |
"RTB text, TimeBucket int, Grouptype text, Metric text, Idtype text,GroupId text,Id text, " + | |
"Combogroup text,Combination text, value counter, " + | |
"PRIMARY KEY ((RTB, TimeBucket,Grouptype, Metric,Idtype), GroupId ,Id, Combogroup, Combination) " + | |
") WITH COMPACT STORAGE AND " + | |
" CLUSTERING ORDER BY ( GroupId DESC ,Id DESC, Combogroup DESC,Combination DESC) " | |
, KEYSPACE, TABLE); | |
/** | |
* INSERT statement to bulk load. | |
* It is like prepared statement. You fill in place holder for each data. | |
*/ | |
String UPDATE_STMT = String.format("update %s.%s set value=value+? where RTB=?" + | |
" and TimeBucket=? and Grouptype=? and Metric=? and Idtype=? and GroupId=? and Id=? and Combogroup=? and Combination=?" | |
, KEYSPACE, TABLE); | |
// Prepare SSTable writer | |
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder(); | |
// set output directory | |
builder.inDirectory(outputDir) | |
// set target schema | |
.forTable(SCHEMA) | |
// set CQL statement to put data | |
.using(UPDATE_STMT) | |
// set partitioner if needed | |
// default is Murmur3Partitioner so set if you use different one. | |
.withPartitioner(new Murmur3Partitioner()); | |
CQLSSTableWriter writer = builder.build(); | |
ColumnList<TimeSliceMetricColumn> columns; | |
int pageize = 3000; | |
try { | |
ArrayList<Row> rows = new ArrayList<Row>(); | |
OperationResult<Rows<TimeSliceMetricRowKey, TimeSliceMetricColumn>> result = | |
keyspace.prepareQuery(TimeSliceMetric) | |
.getAllRows() | |
.withColumnRange(new RangeBuilder().setLimit(0).build()) // RangeBuilder will be available in version 1.13 | |
.execute(); | |
for (Row<TimeSliceMetricRowKey, TimeSliceMetricColumn> row : result.getResult()) { | |
TimeSliceMetricRowKey rk = row.getKey(); | |
if(rk.validate()){ | |
continue; | |
} | |
rows.add(row); | |
} | |
System.out.println("Got " + rows.size() + " rows for " + TABLE); | |
for (Row<TimeSliceMetricRowKey, TimeSliceMetricColumn> row : rows) { | |
TimeSliceMetricRowKey rk = row.getKey(); | |
RowQuery<TimeSliceMetricRowKey, TimeSliceMetricColumn> query = keyspace | |
.prepareQuery(TimeSliceMetric) | |
.getKey(rk) | |
.autoPaginate(true) | |
.withColumnRange(new RangeBuilder().setLimit(pageize).build()); | |
int page = 0; | |
while (!(columns = query.execute().getResult()).isEmpty()) { | |
page++; | |
System.out.println("Going Page " + page + " for " + TABLE); | |
for (Column<TimeSliceMetricColumn> c : columns) { | |
//Got the bugger!! | |
TimeSliceMetricColumn colname = c.getName(); | |
if(colname.validate()) { | |
continue; | |
} | |
Long value = c.getLongValue(); | |
if (value == null) continue; | |
try { | |
String groupId = ""; | |
if (colname.groupId != null) | |
groupId = colname.groupId; | |
if (colname.pid == null) | |
continue; | |
writer.addRow(value, | |
rk.rtb, | |
rk.tb, | |
rk.pg, | |
rk.metric, | |
rk.idtype, | |
groupId, | |
colname.pid, | |
colname.combogroup, | |
colname.combo); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
try { | |
writer.close(); | |
} catch (IOException ignore) { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment