Patches to Cassandra Tools
import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex; | |
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; | |
import org.apache.cassandra.config.CFMetaData; | |
import org.apache.cassandra.config.ConfigurationException; | |
import org.apache.cassandra.config.DatabaseDescriptor; | |
import org.apache.cassandra.config.Schema; | |
import org.apache.cassandra.db.ColumnFamily; | |
import org.apache.cassandra.db.CounterColumn; | |
import org.apache.cassandra.db.DecoratedKey; | |
import org.apache.cassandra.db.DeletedColumn; | |
import org.apache.cassandra.db.ExpiringColumn; | |
import org.apache.cassandra.db.IColumn; | |
import org.apache.cassandra.db.marshal.AbstractType; | |
import org.apache.cassandra.dht.IPartitioner; | |
import org.apache.cassandra.io.sstable.Descriptor; | |
import org.apache.cassandra.io.sstable.KeyIterator; | |
import org.apache.cassandra.io.sstable.SSTableIdentityIterator; | |
import org.apache.cassandra.io.sstable.SSTableReader; | |
import org.apache.cassandra.io.sstable.SSTableScanner; | |
import org.apache.cassandra.utils.ByteBufferUtil; | |
import org.apache.commons.cli.CommandLine; | |
import org.apache.commons.cli.CommandLineParser; | |
import org.apache.commons.cli.Option; | |
import org.apache.commons.cli.Options; | |
import org.apache.commons.cli.ParseException; | |
import org.apache.commons.cli.PosixParser; | |
import org.codehaus.jackson.JsonGenerator; | |
import org.codehaus.jackson.map.ObjectMapper; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.PrintStream; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Set; | |
/** | |
* Export SSTables to JSON format. | |
*/ | |
public class SSTableExport | |
{ | |
private static ObjectMapper jsonMapper = new ObjectMapper(); | |
private static final String KEY_OPTION = "k"; | |
private static final String EXCLUDEKEY_OPTION = "x"; | |
private static final String ENUMERATEKEYS_OPTION = "e"; | |
private static Options options; | |
private static CommandLine cmd; | |
static | |
{ | |
options = new Options(); | |
Option optKey = new Option(KEY_OPTION, true, "Row key"); | |
// Number of times -k <key> can be passed on the command line. | |
optKey.setArgs(500); | |
options.addOption(optKey); | |
Option excludeKey = new Option(EXCLUDEKEY_OPTION, true, "Excluded row key"); | |
// Number of times -x <key> can be passed on the command line. | |
excludeKey.setArgs(500); | |
options.addOption(excludeKey); | |
Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only"); | |
options.addOption(optEnumerate); | |
// disabling auto close of the stream | |
jsonMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); | |
} | |
/** | |
* JSON Hash Key serializer | |
* | |
* @param out | |
* The output steam to write data | |
* @param value | |
* value to set as a key | |
*/ | |
private static void writeKey(PrintStream out, String value) | |
{ | |
writeJSON(out, value); | |
out.print(": "); | |
} | |
/** | |
* Serialize columns using given column iterator | |
* | |
* @param columns | |
* column iterator | |
* @param out | |
* output stream | |
* @param comparator | |
* columns comparator | |
* @param cfMetaData | |
* Column Family metadata (to get validator) | |
*/ | |
private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType<?> comparator, | |
CFMetaData cfMetaData) | |
{ | |
while (columns.hasNext()) | |
{ | |
writeJSON(out, serializeColumn(columns.next(), comparator, cfMetaData)); | |
if (columns.hasNext()) | |
out.print(", "); | |
} | |
} | |
/** | |
* Serialize a given column to the JSON format | |
* | |
* @param column | |
* column presentation | |
* @param comparator | |
* columns comparator | |
* @param cfMetaData | |
* Column Family metadata (to get validator) | |
* @return column as serialized list | |
*/ | |
private static List<Object> serializeColumn(IColumn column, AbstractType<?> comparator, CFMetaData cfMetaData) | |
{ | |
ArrayList<Object> serializedColumn = new ArrayList<Object>(); | |
ByteBuffer name = ByteBufferUtil.clone(column.name()); | |
ByteBuffer value = ByteBufferUtil.clone(column.value()); | |
serializedColumn.add(comparator.getString(name)); | |
if (column instanceof DeletedColumn) | |
{ | |
serializedColumn.add(ByteBufferUtil.bytesToHex(value)); | |
} | |
else | |
{ | |
AbstractType<?> validator = cfMetaData.getValueValidator(name); | |
serializedColumn.add(validator.getString(value)); | |
} | |
serializedColumn.add(column.timestamp()); | |
if (column instanceof DeletedColumn) | |
{ | |
serializedColumn.add("d"); | |
} | |
else if (column instanceof ExpiringColumn) | |
{ | |
serializedColumn.add("e"); | |
serializedColumn.add(((ExpiringColumn) column).getTimeToLive()); | |
serializedColumn.add(column.getLocalDeletionTime()); | |
} | |
else if (column instanceof CounterColumn) | |
{ | |
serializedColumn.add("c"); | |
serializedColumn.add(((CounterColumn) column).timestampOfLastDelete()); | |
} | |
return serializedColumn; | |
} | |
/** | |
* Get portion of the columns and serialize in loop while not more columns left in the row | |
* | |
* @param row | |
* SSTableIdentityIterator row representation with Column Family | |
* @param key | |
* Decorated Key for the required row | |
* @param out | |
* output stream | |
*/ | |
private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out) | |
{ | |
ColumnFamily columnFamily = row.getColumnFamily(); | |
boolean isSuperCF = columnFamily.isSuper(); | |
CFMetaData cfMetaData = columnFamily.metadata(); | |
AbstractType<?> comparator = columnFamily.getComparator(); | |
writeKey(out, bytesToHex(key.key)); | |
out.print(isSuperCF ? "{" : "["); | |
if (isSuperCF) | |
{ | |
while (row.hasNext()) | |
{ | |
IColumn column = row.next(); | |
writeKey(out, comparator.getString(column.name())); | |
out.print("{"); | |
writeKey(out, "deletedAt"); | |
out.print(column.getMarkedForDeleteAt()); | |
out.print(", "); | |
writeKey(out, "subColumns"); | |
out.print("["); | |
serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData); | |
out.print("]"); | |
out.print("}"); | |
if (row.hasNext()) | |
out.print(", "); | |
} | |
} | |
else | |
{ | |
serializeColumns(row, out, comparator, cfMetaData); | |
} | |
out.print(isSuperCF ? "}" : "]"); | |
} | |
/** | |
* Enumerate row keys from an SSTableReader and write the result to a PrintStream. | |
* | |
* @param desc | |
* the descriptor of the file to export the rows from | |
* @param outs | |
* PrintStream to write the output to | |
* @throws IOException | |
* on failure to read/write input/output | |
*/ | |
public static void enumeratekeys(Descriptor desc, PrintStream outs) | |
throws IOException | |
{ | |
KeyIterator iter = new KeyIterator(desc); | |
DecoratedKey lastKey = null; | |
while (iter.hasNext()) | |
{ | |
DecoratedKey key = iter.next(); | |
// validate order of the keys in the sstable | |
if (lastKey != null && lastKey.compareTo(key) > 0) | |
throw new IOException("Key out of order! " + lastKey + " > " + key); | |
lastKey = key; | |
outs.println(bytesToHex(key.key)); | |
} | |
iter.close(); | |
outs.flush(); | |
} | |
/** | |
* Export specific rows from an SSTable and write the resulting JSON to a PrintStream. | |
* | |
* @param desc | |
* the descriptor of the sstable table to read from | |
* @param outs | |
* PrintStream to write the output to | |
* @param toExport | |
* the keys corresponding to the rows to export | |
* @param excludes | |
* keys to exclude from export | |
* @throws IOException | |
* on failure to read/write input/output | |
*/ | |
public static void | |
export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) | |
throws IOException | |
{ | |
SSTableReader reader = SSTableReader.open(desc); | |
SSTableScanner scanner = reader.getDirectScanner(); | |
IPartitioner<?> partitioner = reader.partitioner; | |
if (excludes != null) | |
toExport.removeAll(Arrays.asList(excludes)); | |
outs.println("{"); | |
int i = 0; | |
// last key to compare order | |
DecoratedKey lastKey = null; | |
for (String key : toExport) | |
{ | |
DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key)); | |
if (lastKey != null && lastKey.compareTo(decoratedKey) > 0) | |
throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey); | |
lastKey = decoratedKey; | |
scanner.seekTo(decoratedKey); | |
if (!scanner.hasNext()) | |
continue; | |
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); | |
if (!row.getKey().equals(decoratedKey)) | |
continue; | |
serializeRow(row, decoratedKey, outs); | |
if (i != 0) | |
outs.println(","); | |
i++; | |
} | |
outs.println("\n}"); | |
outs.flush(); | |
scanner.close(); | |
} | |
// This is necessary to accommodate the test suite since you cannot open a Reader more | |
// than once from within the same process. | |
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException | |
{ | |
Set<String> excludeSet = new HashSet<String>(); | |
if (excludes != null) | |
excludeSet = new HashSet<String>(Arrays.asList(excludes)); | |
SSTableIdentityIterator row; | |
SSTableScanner scanner = reader.getDirectScanner(); | |
outs.println("{"); | |
int i = 0; | |
// collecting keys to export | |
while (scanner.hasNext()) | |
{ | |
row = (SSTableIdentityIterator) scanner.next(); | |
String currentKey = bytesToHex(row.getKey().key); | |
if (excludeSet.contains(currentKey)) | |
continue; | |
else if (i != 0) | |
outs.println(","); | |
serializeRow(row, row.getKey(), outs); | |
i++; | |
} | |
outs.println("\n}"); | |
outs.flush(); | |
scanner.close(); | |
} | |
/** | |
* Export an SSTable and write the resulting JSON to a PrintStream. | |
* | |
* @param desc | |
* the descriptor of the sstable table to read from | |
* @param outs | |
* PrintStream to write the output to | |
* @param excludes | |
* keys to exclude from export | |
* @throws IOException | |
* on failure to read/write input/output | |
*/ | |
public static void export(Descriptor desc, PrintStream outs, String[] excludes) throws IOException | |
{ | |
export(SSTableReader.open(desc), outs, excludes); | |
} | |
/** | |
* Export an SSTable and write the resulting JSON to standard out. | |
* | |
* @param desc | |
* the descriptor of the sstable table to read from | |
* @param excludes | |
* keys to exclude from export | |
* @throws IOException | |
* on failure to read/write SSTable/standard out | |
*/ | |
public static void export(Descriptor desc, String[] excludes) throws IOException | |
{ | |
export(desc, System.out, excludes); | |
} | |
/** | |
* Given arguments specifying an SSTable, and optionally an output file, export the contents of the SSTable to JSON. | |
* | |
* @param args | |
* command lines arguments | |
* @throws IOException | |
* on failure to open/read/write files or output streams | |
* @throws ConfigurationException | |
* on configuration failure (wrong params given) | |
*/ | |
public static void main(String[] args) throws IOException, ConfigurationException | |
{ | |
String usage = | |
String.format("Usage: %s <sstable> [-k key [-k key [...]] -x key [-x key [...]]]%n", | |
SSTableExport.class.getName()); | |
CommandLineParser parser = new PosixParser(); | |
try | |
{ | |
cmd = parser.parse(options, args); | |
} | |
catch (ParseException e1) | |
{ | |
System.err.println(e1.getMessage()); | |
System.err.println(usage); | |
System.exit(1); | |
} | |
if (cmd.getArgs().length != 1) | |
{ | |
System.err.println("You must supply exactly one sstable"); | |
System.err.println(usage); | |
System.exit(1); | |
} | |
String[] keys = cmd.getOptionValues(KEY_OPTION); | |
String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION); | |
String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath(); | |
DatabaseDescriptor.loadSchemas(); | |
//if (Schema.instance.getNonSystemTables().size() < 1) | |
//{ | |
// String msg = "no non-system tables are defined"; | |
// System.err.println(msg); | |
// throw new ConfigurationException(msg); | |
//} | |
Descriptor descriptor = Descriptor.fromFilename(ssTableFileName); | |
if (Schema.instance.getCFMetaData(descriptor) == null) | |
{ | |
System.err.println(String.format("The provided column family is not part of this cassandra database: keysapce = %s, column family = %s", | |
descriptor.ksname, descriptor.cfname)); | |
System.exit(1); | |
} | |
if (cmd.hasOption(ENUMERATEKEYS_OPTION)) | |
{ | |
enumeratekeys(descriptor, System.out); | |
} | |
else | |
{ | |
if ((keys != null) && (keys.length > 0)) | |
export(descriptor, System.out, Arrays.asList(keys), excludes); | |
else | |
export(descriptor, excludes); | |
} | |
System.exit(0); | |
} | |
private static void writeJSON(PrintStream out, Object value) | |
{ | |
try | |
{ | |
jsonMapper.writeValue(out, value); | |
} | |
catch (Exception e) | |
{ | |
throw new RuntimeException(e.getMessage(), e); | |
} | |
} | |
} |
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; | |
import org.apache.cassandra.config.CFMetaData; | |
import org.apache.cassandra.config.ConfigurationException; | |
import org.apache.cassandra.config.DatabaseDescriptor; | |
import org.apache.cassandra.db.ColumnFamily; | |
import org.apache.cassandra.db.ColumnFamilyType; | |
import org.apache.cassandra.db.CounterColumn; | |
import org.apache.cassandra.db.DecoratedKey; | |
import org.apache.cassandra.db.ExpiringColumn; | |
import org.apache.cassandra.db.filter.QueryPath; | |
import org.apache.cassandra.db.marshal.AbstractType; | |
import org.apache.cassandra.db.marshal.BytesType; | |
import org.apache.cassandra.db.marshal.MarshalException; | |
import org.apache.cassandra.dht.IPartitioner; | |
import org.apache.cassandra.io.sstable.SSTableWriter; | |
import org.apache.cassandra.utils.ByteBufferUtil; | |
import org.apache.commons.cli.CommandLine; | |
import org.apache.commons.cli.CommandLineParser; | |
import org.apache.commons.cli.Option; | |
import org.apache.commons.cli.Options; | |
import org.apache.commons.cli.ParseException; | |
import org.apache.commons.cli.PosixParser; | |
import org.codehaus.jackson.JsonFactory; | |
import org.codehaus.jackson.JsonParser; | |
import org.codehaus.jackson.map.MappingJsonFactory; | |
import org.codehaus.jackson.type.TypeReference; | |
import java.io.File; | |
import java.io.IOException; | |
import java.nio.ByteBuffer; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.SortedMap; | |
import java.util.TreeMap; | |
/** | |
* Create SSTables from JSON input | |
*/ | |
public class SSTableImport | |
{ | |
private static final String KEYSPACE_OPTION = "K"; | |
private static final String COLUMN_FAMILY_OPTION = "c"; | |
private static final String KEY_COUNT_OPTION = "n"; | |
private static final String IS_SORTED_OPTION = "s"; | |
private static Options options; | |
private static CommandLine cmd; | |
private static Integer keyCountToImport = null; | |
private static boolean isSorted = false; | |
private static JsonFactory factory = new MappingJsonFactory().configure(JsonParser.Feature.INTERN_FIELD_NAMES, | |
false); | |
static | |
{ | |
options = new Options(); | |
Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name."); | |
optKeyspace.setRequired(true); | |
options.addOption(optKeyspace); | |
Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Column Family name."); | |
optColfamily.setRequired(true); | |
options.addOption(optColfamily); | |
options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional).")); | |
options.addOption(new Option(IS_SORTED_OPTION, false, | |
"Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional).")); | |
} | |
private static class JsonColumn<T> | |
{ | |
private ByteBuffer name; | |
private ByteBuffer value; | |
private long timestamp; | |
private String kind; | |
// Expiring columns | |
private int ttl; | |
private int localExpirationTime; | |
// Counter columns | |
private long timestampOfLastDelete; | |
public JsonColumn(T json, CFMetaData meta, boolean isSubColumn) | |
{ | |
if (json instanceof List) | |
{ | |
AbstractType<?> comparator = (isSubColumn) ? meta.subcolumnComparator : meta.comparator; | |
List fields = (List<?>) json; | |
assert fields.size() >= 3 : "Column definition should have at least 3"; | |
name = stringAsType((String) fields.get(0), comparator); | |
timestamp = (Long) fields.get(2); | |
kind = ""; | |
if (fields.size() > 3) | |
{ | |
if (fields.get(3) instanceof Boolean) | |
{ | |
// old format, reading this for backward compatibility sake | |
if (fields.size() == 6) | |
{ | |
kind = "e"; | |
ttl = (Integer) fields.get(4); | |
localExpirationTime = (Integer) fields.get(5); | |
} | |
else | |
{ | |
kind = ((Boolean) fields.get(3)) ? "d" : ""; | |
} | |
} | |
else | |
{ | |
kind = (String) fields.get(3); | |
if (isExpiring()) | |
{ | |
ttl = (Integer) fields.get(4); | |
localExpirationTime = (Integer) fields.get(5); | |
} | |
else if (isCounter()) | |
{ | |
timestampOfLastDelete = ((Integer) fields.get(4)); | |
} | |
} | |
} | |
value = isDeleted() ? ByteBufferUtil.hexToBytes((String) fields.get(1)) | |
: stringAsType((String) fields.get(1), meta.getValueValidator(name.duplicate())); | |
} | |
} | |
public boolean isDeleted() | |
{ | |
return kind.equals("d"); | |
} | |
public boolean isExpiring() | |
{ | |
return kind.equals("e"); | |
} | |
public boolean isCounter() | |
{ | |
return kind.equals("c"); | |
} | |
public ByteBuffer getName() | |
{ | |
return name.duplicate(); | |
} | |
public ByteBuffer getValue() | |
{ | |
return value.duplicate(); | |
} | |
} | |
private static void addToStandardCF(List<?> row, ColumnFamily cfamily) | |
{ | |
addColumnsToCF(row, null, cfamily); | |
} | |
/** | |
* Add columns to a column family. | |
* | |
* @param row | |
* the columns associated with a row | |
* @param superName | |
* name of the super column if any | |
* @param cfamily | |
* the column family to add columns to | |
*/ | |
private static void addColumnsToCF(List<?> row, ByteBuffer superName, ColumnFamily cfamily) | |
{ | |
CFMetaData cfm = cfamily.metadata(); | |
assert cfm != null; | |
for (Object c : row) | |
{ | |
JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName != null)); | |
QueryPath path = new QueryPath(cfm.cfName, superName, col.getName()); | |
if (col.isExpiring()) | |
{ | |
cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl, | |
col.localExpirationTime)); | |
} | |
else if (col.isCounter()) | |
{ | |
cfamily.addColumn(null, new CounterColumn(col.getName(), col.getValue(), col.timestamp, | |
col.timestampOfLastDelete)); | |
} | |
else if (col.isDeleted()) | |
{ | |
cfamily.addTombstone(path, col.getValue(), col.timestamp); | |
} | |
else | |
{ | |
cfamily.addColumn(path, col.getValue(), col.timestamp); | |
} | |
} | |
} | |
/** | |
* Add super columns to a column family. | |
* | |
* @param row | |
* the super columns associated with a row | |
* @param cfamily | |
* the column family to add columns to | |
*/ | |
private static void addToSuperCF(Map<?, ?> row, ColumnFamily cfamily) | |
{ | |
CFMetaData metaData = cfamily.metadata(); | |
assert metaData != null; | |
AbstractType<?> comparator = metaData.comparator; | |
// Super columns | |
for (Map.Entry<?, ?> entry : row.entrySet()) | |
{ | |
Map<?, ?> data = (Map<?, ?>) entry.getValue(); | |
addColumnsToCF((List<?>) data.get("subColumns"), stringAsType((String) entry.getKey(), comparator), cfamily); | |
// *WARNING* markForDeleteAt has been DEPRECATED at Cassandra side | |
//BigInteger deletedAt = (BigInteger) data.get("deletedAt"); | |
//SuperColumn superColumn = (SuperColumn) cfamily.getColumn(superName); | |
//superColumn.markForDeleteAt((int) (System.currentTimeMillis()/1000), deletedAt); | |
} | |
} | |
/** | |
* Convert a JSON formatted file to an SSTable. | |
* | |
* @param jsonFile | |
* the file containing JSON formatted data | |
* @param keyspace | |
* keyspace the data belongs to | |
* @param cf | |
* column family the data belongs to | |
* @param ssTablePath | |
* file to write the SSTable to | |
* @throws IOException | |
* for errors reading/writing input/output | |
*/ | |
public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException | |
{ | |
ColumnFamily columnFamily = ColumnFamily.create(keyspace, cf); | |
IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner(); | |
int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner) | |
: importUnsorted(getParser(jsonFile), columnFamily, ssTablePath, partitioner); | |
if (importedKeys != -1) | |
System.out.printf("%d keys imported successfully.%n", importedKeys); | |
} | |
private static int importUnsorted(JsonParser parser, ColumnFamily columnFamily, String ssTablePath, | |
IPartitioner<?> partitioner) throws IOException | |
{ | |
int importedKeys = 0; | |
long start = System.currentTimeMillis(); | |
Map<?, ?> data = parser.readValueAs(new TypeReference<Map<?, ?>>() { | |
}); | |
keyCountToImport = (keyCountToImport == null) ? data.size() : keyCountToImport; | |
SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); | |
System.out.printf("Importing %s keys...%n", keyCountToImport); | |
// sort by dk representation, but hold onto the hex version | |
SortedMap<DecoratedKey, String> decoratedKeys = new TreeMap<DecoratedKey, String>(); | |
for (Object keyObject : data.keySet()) | |
{ | |
String key = (String) keyObject; | |
decoratedKeys.put(partitioner.decorateKey(hexToBytes(key)), key); | |
} | |
for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet()) | |
{ | |
if (columnFamily.getType() == ColumnFamilyType.Super) | |
{ | |
addToSuperCF((Map<?, ?>) data.get(rowKey.getValue()), columnFamily); | |
} | |
else | |
{ | |
addToStandardCF((List<?>) data.get(rowKey.getValue()), columnFamily); | |
} | |
writer.append(rowKey.getKey(), columnFamily); | |
columnFamily.clear(); | |
importedKeys++; | |
long current = System.currentTimeMillis(); | |
if (current - start >= 5000) // 5 secs. | |
{ | |
System.out.printf("Currently imported %d keys.%n", importedKeys); | |
start = current; | |
} | |
if (keyCountToImport == importedKeys) | |
break; | |
} | |
writer.closeAndOpenReader(); | |
return importedKeys; | |
} | |
public static int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, | |
IPartitioner<?> partitioner) throws IOException | |
{ | |
int importedKeys = 0; // already imported keys count | |
long start = System.currentTimeMillis(); | |
JsonParser parser = getParser(jsonFile); | |
if (keyCountToImport == null) | |
{ | |
keyCountToImport = 0; | |
System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)"); | |
parser.nextToken(); // START_OBJECT | |
while (parser.nextToken() != null) | |
{ | |
parser.nextToken(); | |
parser.skipChildren(); | |
if (parser.getCurrentName() == null) | |
continue; | |
keyCountToImport++; | |
} | |
} | |
System.out.printf("Importing %s keys...%n", keyCountToImport); | |
parser = getParser(jsonFile); // renewing parser | |
SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); | |
int lineNumber = 1; | |
DecoratedKey prevStoredKey = null; | |
while (parser.nextToken() != null) | |
{ | |
String key = parser.getCurrentName(); | |
if (key != null) | |
{ | |
String tokenName = parser.nextToken().name(); | |
if (tokenName.equals("START_ARRAY")) | |
{ | |
if (columnFamily.getType() == ColumnFamilyType.Super) | |
{ | |
throw new RuntimeException("Can't write Standard columns to the Super Column Family."); | |
} | |
List<?> columns = parser.readValueAs(new TypeReference<List<?>>() { | |
}); | |
addToStandardCF(columns, columnFamily); | |
} | |
else if (tokenName.equals("START_OBJECT")) | |
{ | |
if (columnFamily.getType() == ColumnFamilyType.Standard) | |
{ | |
throw new RuntimeException("Can't write Super columns to the Standard Column Family."); | |
} | |
Map<?, ?> columns = parser.readValueAs(new TypeReference<Map<?, ?>>() { | |
}); | |
addToSuperCF(columns, columnFamily); | |
} | |
else | |
{ | |
throw new UnsupportedOperationException("Only Array or Hash allowed as row content."); | |
} | |
DecoratedKey currentKey = partitioner.decorateKey(hexToBytes(key)); | |
if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1) | |
{ | |
System.err.printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n", | |
lineNumber, key); | |
return -1; | |
} | |
// saving decorated key | |
writer.append(currentKey, columnFamily); | |
columnFamily.clear(); | |
prevStoredKey = currentKey; | |
importedKeys++; | |
lineNumber++; | |
long current = System.currentTimeMillis(); | |
if (current - start >= 5000) // 5 secs. | |
{ | |
System.out.printf("Currently imported %d keys.%n", importedKeys); | |
start = current; | |
} | |
if (keyCountToImport == importedKeys) | |
break; | |
} | |
} | |
writer.closeAndOpenReader(); | |
return importedKeys; | |
} | |
/** | |
* Get JsonParser object for file | |
* | |
* @param fileName | |
* name of the file | |
* @return json parser instance for given file | |
* @throws IOException | |
* if any I/O error. | |
*/ | |
private static JsonParser getParser(String fileName) throws IOException | |
{ | |
return factory.createJsonParser(new File(fileName)); | |
} | |
/** | |
* Converts JSON to an SSTable file. JSON input can either be a file specified using an optional command line | |
* argument, or supplied on standard in. | |
* | |
* @param args | |
* command line arguments | |
* @throws IOException | |
* on failure to open/read/write files or output streams | |
* @throws ParseException | |
* on failure to parse JSON input | |
* @throws ConfigurationException | |
* on configuration error. | |
*/ | |
public static void main(String[] args) throws IOException, ParseException, ConfigurationException | |
{ | |
CommandLineParser parser = new PosixParser(); | |
try | |
{ | |
cmd = parser.parse(options, args); | |
} | |
catch (org.apache.commons.cli.ParseException e) | |
{ | |
System.err.println(e.getMessage()); | |
printProgramUsage(); | |
System.exit(1); | |
} | |
if (cmd.getArgs().length != 2) | |
{ | |
printProgramUsage(); | |
System.exit(1); | |
} | |
String json = cmd.getArgs()[0]; | |
String ssTable = cmd.getArgs()[1]; | |
String keyspace = cmd.getOptionValue(KEYSPACE_OPTION); | |
String cfamily = cmd.getOptionValue(COLUMN_FAMILY_OPTION); | |
if (cmd.hasOption(KEY_COUNT_OPTION)) | |
{ | |
keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION)); | |
} | |
if (cmd.hasOption(IS_SORTED_OPTION)) | |
{ | |
isSorted = true; | |
} | |
DatabaseDescriptor.loadSchemas(); | |
//if (Schema.instance.getNonSystemTables().size() < 1) | |
//{ | |
// String msg = "no non-system tables are defined"; | |
// System.err.println(msg); | |
// throw new ConfigurationException(msg); | |
//} | |
try | |
{ | |
importJson(json, keyspace, cfamily, ssTable); | |
} | |
catch (Exception e) | |
{ | |
e.printStackTrace(); | |
System.err.println("ERROR: " + e.getMessage()); | |
System.exit(-1); | |
} | |
System.exit(0); | |
} | |
private static void printProgramUsage() | |
{ | |
System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n", | |
SSTableImport.class.getName()); | |
System.out.println("Options:"); | |
for (Object o : options.getOptions()) | |
{ | |
Option opt = (Option) o; | |
System.out.println(" -" + opt.getOpt() + " - " + opt.getDescription()); | |
} | |
} | |
/** | |
* Used by test framework to set key count | |
* | |
* @param keyCount | |
* numbers of keys to import | |
*/ | |
public static void setKeyCountToImport(Integer keyCount) | |
{ | |
keyCountToImport = keyCount; | |
} | |
/** | |
* Convert a string to bytes (ByteBuffer) according to type | |
* | |
* @param content | |
* string to convert | |
* @param type | |
* type to use for conversion | |
* @return byte buffer representation of the given string | |
*/ | |
private static ByteBuffer stringAsType(String content, AbstractType<?> type) | |
{ | |
try | |
{ | |
return (type == BytesType.instance) ? hexToBytes(content) : type.fromString(content); | |
} | |
catch (MarshalException e) | |
{ | |
throw new RuntimeException(e.getMessage()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment