Patches to Cassandra Tools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex; | |
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; | |
import org.apache.cassandra.config.CFMetaData; | |
import org.apache.cassandra.config.ConfigurationException; | |
import org.apache.cassandra.config.DatabaseDescriptor; | |
import org.apache.cassandra.config.Schema; | |
import org.apache.cassandra.db.ColumnFamily; | |
import org.apache.cassandra.db.CounterColumn; | |
import org.apache.cassandra.db.DecoratedKey; | |
import org.apache.cassandra.db.DeletedColumn; | |
import org.apache.cassandra.db.ExpiringColumn; | |
import org.apache.cassandra.db.IColumn; | |
import org.apache.cassandra.db.marshal.AbstractType; | |
import org.apache.cassandra.dht.IPartitioner; | |
import org.apache.cassandra.io.sstable.Descriptor; | |
import org.apache.cassandra.io.sstable.KeyIterator; | |
import org.apache.cassandra.io.sstable.SSTableIdentityIterator; | |
import org.apache.cassandra.io.sstable.SSTableReader; | |
import org.apache.cassandra.io.sstable.SSTableScanner; | |
import org.apache.cassandra.utils.ByteBufferUtil; | |
import org.apache.commons.cli.CommandLine; | |
import org.apache.commons.cli.CommandLineParser; | |
import org.apache.commons.cli.Option; | |
import org.apache.commons.cli.Options; | |
import org.apache.commons.cli.ParseException; | |
import org.apache.commons.cli.PosixParser; | |
import org.codehaus.jackson.JsonGenerator; | |
import org.codehaus.jackson.map.ObjectMapper; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.PrintStream; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Set; | |
/** | |
* Export SSTables to JSON format. | |
*/ | |
public class SSTableExport | |
{ | |
private static ObjectMapper jsonMapper = new ObjectMapper(); | |
private static final String KEY_OPTION = "k"; | |
private static final String EXCLUDEKEY_OPTION = "x"; | |
private static final String ENUMERATEKEYS_OPTION = "e"; | |
private static Options options; | |
private static CommandLine cmd; | |
static | |
{ | |
options = new Options(); | |
Option optKey = new Option(KEY_OPTION, true, "Row key"); | |
// Number of times -k <key> can be passed on the command line. | |
optKey.setArgs(500); | |
options.addOption(optKey); | |
Option excludeKey = new Option(EXCLUDEKEY_OPTION, true, "Excluded row key"); | |
// Number of times -x <key> can be passed on the command line. | |
excludeKey.setArgs(500); | |
options.addOption(excludeKey); | |
Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only"); | |
options.addOption(optEnumerate); | |
// disabling auto close of the stream | |
jsonMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); | |
} | |
/** | |
* JSON Hash Key serializer | |
* | |
* @param out | |
* The output steam to write data | |
* @param value | |
* value to set as a key | |
*/ | |
private static void writeKey(PrintStream out, String value) | |
{ | |
writeJSON(out, value); | |
out.print(": "); | |
} | |
/** | |
* Serialize columns using given column iterator | |
* | |
* @param columns | |
* column iterator | |
* @param out | |
* output stream | |
* @param comparator | |
* columns comparator | |
* @param cfMetaData | |
* Column Family metadata (to get validator) | |
*/ | |
private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType<?> comparator, | |
CFMetaData cfMetaData) | |
{ | |
while (columns.hasNext()) | |
{ | |
writeJSON(out, serializeColumn(columns.next(), comparator, cfMetaData)); | |
if (columns.hasNext()) | |
out.print(", "); | |
} | |
} | |
/** | |
* Serialize a given column to the JSON format | |
* | |
* @param column | |
* column presentation | |
* @param comparator | |
* columns comparator | |
* @param cfMetaData | |
* Column Family metadata (to get validator) | |
* @return column as serialized list | |
*/ | |
private static List<Object> serializeColumn(IColumn column, AbstractType<?> comparator, CFMetaData cfMetaData) | |
{ | |
ArrayList<Object> serializedColumn = new ArrayList<Object>(); | |
ByteBuffer name = ByteBufferUtil.clone(column.name()); | |
ByteBuffer value = ByteBufferUtil.clone(column.value()); | |
serializedColumn.add(comparator.getString(name)); | |
if (column instanceof DeletedColumn) | |
{ | |
serializedColumn.add(ByteBufferUtil.bytesToHex(value)); | |
} | |
else | |
{ | |
AbstractType<?> validator = cfMetaData.getValueValidator(name); | |
serializedColumn.add(validator.getString(value)); | |
} | |
serializedColumn.add(column.timestamp()); | |
if (column instanceof DeletedColumn) | |
{ | |
serializedColumn.add("d"); | |
} | |
else if (column instanceof ExpiringColumn) | |
{ | |
serializedColumn.add("e"); | |
serializedColumn.add(((ExpiringColumn) column).getTimeToLive()); | |
serializedColumn.add(column.getLocalDeletionTime()); | |
} | |
else if (column instanceof CounterColumn) | |
{ | |
serializedColumn.add("c"); | |
serializedColumn.add(((CounterColumn) column).timestampOfLastDelete()); | |
} | |
return serializedColumn; | |
} | |
/** | |
* Get portion of the columns and serialize in loop while not more columns left in the row | |
* | |
* @param row | |
* SSTableIdentityIterator row representation with Column Family | |
* @param key | |
* Decorated Key for the required row | |
* @param out | |
* output stream | |
*/ | |
private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out) | |
{ | |
ColumnFamily columnFamily = row.getColumnFamily(); | |
boolean isSuperCF = columnFamily.isSuper(); | |
CFMetaData cfMetaData = columnFamily.metadata(); | |
AbstractType<?> comparator = columnFamily.getComparator(); | |
writeKey(out, bytesToHex(key.key)); | |
out.print(isSuperCF ? "{" : "["); | |
if (isSuperCF) | |
{ | |
while (row.hasNext()) | |
{ | |
IColumn column = row.next(); | |
writeKey(out, comparator.getString(column.name())); | |
out.print("{"); | |
writeKey(out, "deletedAt"); | |
out.print(column.getMarkedForDeleteAt()); | |
out.print(", "); | |
writeKey(out, "subColumns"); | |
out.print("["); | |
serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData); | |
out.print("]"); | |
out.print("}"); | |
if (row.hasNext()) | |
out.print(", "); | |
} | |
} | |
else | |
{ | |
serializeColumns(row, out, comparator, cfMetaData); | |
} | |
out.print(isSuperCF ? "}" : "]"); | |
} | |
/** | |
* Enumerate row keys from an SSTableReader and write the result to a PrintStream. | |
* | |
* @param desc | |
* the descriptor of the file to export the rows from | |
* @param outs | |
* PrintStream to write the output to | |
* @throws IOException | |
* on failure to read/write input/output | |
*/ | |
public static void enumeratekeys(Descriptor desc, PrintStream outs) | |
throws IOException | |
{ | |
KeyIterator iter = new KeyIterator(desc); | |
DecoratedKey lastKey = null; | |
while (iter.hasNext()) | |
{ | |
DecoratedKey key = iter.next(); | |
// validate order of the keys in the sstable | |
if (lastKey != null && lastKey.compareTo(key) > 0) | |
throw new IOException("Key out of order! " + lastKey + " > " + key); | |
lastKey = key; | |
outs.println(bytesToHex(key.key)); | |
} | |
iter.close(); | |
outs.flush(); | |
} | |
/** | |
* Export specific rows from an SSTable and write the resulting JSON to a PrintStream. | |
* | |
* @param desc | |
* the descriptor of the sstable table to read from | |
* @param outs | |
* PrintStream to write the output to | |
* @param toExport | |
* the keys corresponding to the rows to export | |
* @param excludes | |
* keys to exclude from export | |
* @throws IOException | |
* on failure to read/write input/output | |
*/ | |
public static void | |
export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) | |
throws IOException | |
{ | |
SSTableReader reader = SSTableReader.open(desc); | |
SSTableScanner scanner = reader.getDirectScanner(); | |
IPartitioner<?> partitioner = reader.partitioner; | |
if (excludes != null) | |
toExport.removeAll(Arrays.asList(excludes)); | |
outs.println("{"); | |
int i = 0; | |
// last key to compare order | |
DecoratedKey lastKey = null; | |
for (String key : toExport) | |
{ | |
DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key)); | |
if (lastKey != null && lastKey.compareTo(decoratedKey) > 0) | |
throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey); | |
lastKey = decoratedKey; | |
scanner.seekTo(decoratedKey); | |
if (!scanner.hasNext()) | |
continue; | |
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); | |
if (!row.getKey().equals(decoratedKey)) | |
continue; | |
serializeRow(row, decoratedKey, outs); | |
if (i != 0) | |
outs.println(","); | |
i++; | |
} | |
outs.println("\n}"); | |
outs.flush(); | |
scanner.close(); | |
} | |
// This is necessary to accommodate the test suite since you cannot open a Reader more | |
// than once from within the same process. | |
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException | |
{ | |
Set<String> excludeSet = new HashSet<String>(); | |
if (excludes != null) | |
excludeSet = new HashSet<String>(Arrays.asList(excludes)); | |
SSTableIdentityIterator row; | |
SSTableScanner scanner = reader.getDirectScanner(); | |
outs.println("{"); | |
int i = 0; | |
// collecting keys to export | |
while (scanner.hasNext()) | |
{ | |
row = (SSTableIdentityIterator) scanner.next(); | |
String currentKey = bytesToHex(row.getKey().key); | |
if (excludeSet.contains(currentKey)) | |
continue; | |
else if (i != 0) | |
outs.println(","); | |
serializeRow(row, row.getKey(), outs); | |
i++; | |
} | |
outs.println("\n}"); | |
outs.flush(); | |
scanner.close(); | |
} | |
/** | |
* Export an SSTable and write the resulting JSON to a PrintStream. | |
* | |
* @param desc | |
* the descriptor of the sstable table to read from | |
* @param outs | |
* PrintStream to write the output to | |
* @param excludes | |
* keys to exclude from export | |
* @throws IOException | |
* on failure to read/write input/output | |
*/ | |
public static void export(Descriptor desc, PrintStream outs, String[] excludes) throws IOException | |
{ | |
export(SSTableReader.open(desc), outs, excludes); | |
} | |
/** | |
* Export an SSTable and write the resulting JSON to standard out. | |
* | |
* @param desc | |
* the descriptor of the sstable table to read from | |
* @param excludes | |
* keys to exclude from export | |
* @throws IOException | |
* on failure to read/write SSTable/standard out | |
*/ | |
public static void export(Descriptor desc, String[] excludes) throws IOException | |
{ | |
export(desc, System.out, excludes); | |
} | |
/** | |
* Given arguments specifying an SSTable, and optionally an output file, export the contents of the SSTable to JSON. | |
* | |
* @param args | |
* command lines arguments | |
* @throws IOException | |
* on failure to open/read/write files or output streams | |
* @throws ConfigurationException | |
* on configuration failure (wrong params given) | |
*/ | |
public static void main(String[] args) throws IOException, ConfigurationException | |
{ | |
String usage = | |
String.format("Usage: %s <sstable> [-k key [-k key [...]] -x key [-x key [...]]]%n", | |
SSTableExport.class.getName()); | |
CommandLineParser parser = new PosixParser(); | |
try | |
{ | |
cmd = parser.parse(options, args); | |
} | |
catch (ParseException e1) | |
{ | |
System.err.println(e1.getMessage()); | |
System.err.println(usage); | |
System.exit(1); | |
} | |
if (cmd.getArgs().length != 1) | |
{ | |
System.err.println("You must supply exactly one sstable"); | |
System.err.println(usage); | |
System.exit(1); | |
} | |
String[] keys = cmd.getOptionValues(KEY_OPTION); | |
String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION); | |
String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath(); | |
DatabaseDescriptor.loadSchemas(); | |
//if (Schema.instance.getNonSystemTables().size() < 1) | |
//{ | |
// String msg = "no non-system tables are defined"; | |
// System.err.println(msg); | |
// throw new ConfigurationException(msg); | |
//} | |
Descriptor descriptor = Descriptor.fromFilename(ssTableFileName); | |
if (Schema.instance.getCFMetaData(descriptor) == null) | |
{ | |
System.err.println(String.format("The provided column family is not part of this cassandra database: keysapce = %s, column family = %s", | |
descriptor.ksname, descriptor.cfname)); | |
System.exit(1); | |
} | |
if (cmd.hasOption(ENUMERATEKEYS_OPTION)) | |
{ | |
enumeratekeys(descriptor, System.out); | |
} | |
else | |
{ | |
if ((keys != null) && (keys.length > 0)) | |
export(descriptor, System.out, Arrays.asList(keys), excludes); | |
else | |
export(descriptor, excludes); | |
} | |
System.exit(0); | |
} | |
private static void writeJSON(PrintStream out, Object value) | |
{ | |
try | |
{ | |
jsonMapper.writeValue(out, value); | |
} | |
catch (Exception e) | |
{ | |
throw new RuntimeException(e.getMessage(), e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; | |
import org.apache.cassandra.config.CFMetaData; | |
import org.apache.cassandra.config.ConfigurationException; | |
import org.apache.cassandra.config.DatabaseDescriptor; | |
import org.apache.cassandra.db.ColumnFamily; | |
import org.apache.cassandra.db.ColumnFamilyType; | |
import org.apache.cassandra.db.CounterColumn; | |
import org.apache.cassandra.db.DecoratedKey; | |
import org.apache.cassandra.db.ExpiringColumn; | |
import org.apache.cassandra.db.filter.QueryPath; | |
import org.apache.cassandra.db.marshal.AbstractType; | |
import org.apache.cassandra.db.marshal.BytesType; | |
import org.apache.cassandra.db.marshal.MarshalException; | |
import org.apache.cassandra.dht.IPartitioner; | |
import org.apache.cassandra.io.sstable.SSTableWriter; | |
import org.apache.cassandra.utils.ByteBufferUtil; | |
import org.apache.commons.cli.CommandLine; | |
import org.apache.commons.cli.CommandLineParser; | |
import org.apache.commons.cli.Option; | |
import org.apache.commons.cli.Options; | |
import org.apache.commons.cli.ParseException; | |
import org.apache.commons.cli.PosixParser; | |
import org.codehaus.jackson.JsonFactory; | |
import org.codehaus.jackson.JsonParser; | |
import org.codehaus.jackson.map.MappingJsonFactory; | |
import org.codehaus.jackson.type.TypeReference; | |
import java.io.File; | |
import java.io.IOException; | |
import java.nio.ByteBuffer; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.SortedMap; | |
import java.util.TreeMap; | |
/** | |
* Create SSTables from JSON input | |
*/ | |
public class SSTableImport | |
{ | |
private static final String KEYSPACE_OPTION = "K"; | |
private static final String COLUMN_FAMILY_OPTION = "c"; | |
private static final String KEY_COUNT_OPTION = "n"; | |
private static final String IS_SORTED_OPTION = "s"; | |
private static Options options; | |
private static CommandLine cmd; | |
private static Integer keyCountToImport = null; | |
private static boolean isSorted = false; | |
private static JsonFactory factory = new MappingJsonFactory().configure(JsonParser.Feature.INTERN_FIELD_NAMES, | |
false); | |
static | |
{ | |
options = new Options(); | |
Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name."); | |
optKeyspace.setRequired(true); | |
options.addOption(optKeyspace); | |
Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Column Family name."); | |
optColfamily.setRequired(true); | |
options.addOption(optColfamily); | |
options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional).")); | |
options.addOption(new Option(IS_SORTED_OPTION, false, | |
"Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional).")); | |
} | |
private static class JsonColumn<T> | |
{ | |
private ByteBuffer name; | |
private ByteBuffer value; | |
private long timestamp; | |
private String kind; | |
// Expiring columns | |
private int ttl; | |
private int localExpirationTime; | |
// Counter columns | |
private long timestampOfLastDelete; | |
public JsonColumn(T json, CFMetaData meta, boolean isSubColumn) | |
{ | |
if (json instanceof List) | |
{ | |
AbstractType<?> comparator = (isSubColumn) ? meta.subcolumnComparator : meta.comparator; | |
List fields = (List<?>) json; | |
assert fields.size() >= 3 : "Column definition should have at least 3"; | |
name = stringAsType((String) fields.get(0), comparator); | |
timestamp = (Long) fields.get(2); | |
kind = ""; | |
if (fields.size() > 3) | |
{ | |
if (fields.get(3) instanceof Boolean) | |
{ | |
// old format, reading this for backward compatibility sake | |
if (fields.size() == 6) | |
{ | |
kind = "e"; | |
ttl = (Integer) fields.get(4); | |
localExpirationTime = (Integer) fields.get(5); | |
} | |
else | |
{ | |
kind = ((Boolean) fields.get(3)) ? "d" : ""; | |
} | |
} | |
else | |
{ | |
kind = (String) fields.get(3); | |
if (isExpiring()) | |
{ | |
ttl = (Integer) fields.get(4); | |
localExpirationTime = (Integer) fields.get(5); | |
} | |
else if (isCounter()) | |
{ | |
timestampOfLastDelete = ((Integer) fields.get(4)); | |
} | |
} | |
} | |
value = isDeleted() ? ByteBufferUtil.hexToBytes((String) fields.get(1)) | |
: stringAsType((String) fields.get(1), meta.getValueValidator(name.duplicate())); | |
} | |
} | |
public boolean isDeleted() | |
{ | |
return kind.equals("d"); | |
} | |
public boolean isExpiring() | |
{ | |
return kind.equals("e"); | |
} | |
public boolean isCounter() | |
{ | |
return kind.equals("c"); | |
} | |
public ByteBuffer getName() | |
{ | |
return name.duplicate(); | |
} | |
public ByteBuffer getValue() | |
{ | |
return value.duplicate(); | |
} | |
} | |
private static void addToStandardCF(List<?> row, ColumnFamily cfamily) | |
{ | |
addColumnsToCF(row, null, cfamily); | |
} | |
/** | |
* Add columns to a column family. | |
* | |
* @param row | |
* the columns associated with a row | |
* @param superName | |
* name of the super column if any | |
* @param cfamily | |
* the column family to add columns to | |
*/ | |
private static void addColumnsToCF(List<?> row, ByteBuffer superName, ColumnFamily cfamily) | |
{ | |
CFMetaData cfm = cfamily.metadata(); | |
assert cfm != null; | |
for (Object c : row) | |
{ | |
JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName != null)); | |
QueryPath path = new QueryPath(cfm.cfName, superName, col.getName()); | |
if (col.isExpiring()) | |
{ | |
cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl, | |
col.localExpirationTime)); | |
} | |
else if (col.isCounter()) | |
{ | |
cfamily.addColumn(null, new CounterColumn(col.getName(), col.getValue(), col.timestamp, | |
col.timestampOfLastDelete)); | |
} | |
else if (col.isDeleted()) | |
{ | |
cfamily.addTombstone(path, col.getValue(), col.timestamp); | |
} | |
else | |
{ | |
cfamily.addColumn(path, col.getValue(), col.timestamp); | |
} | |
} | |
} | |
/** | |
* Add super columns to a column family. | |
* | |
* @param row | |
* the super columns associated with a row | |
* @param cfamily | |
* the column family to add columns to | |
*/ | |
private static void addToSuperCF(Map<?, ?> row, ColumnFamily cfamily) | |
{ | |
CFMetaData metaData = cfamily.metadata(); | |
assert metaData != null; | |
AbstractType<?> comparator = metaData.comparator; | |
// Super columns | |
for (Map.Entry<?, ?> entry : row.entrySet()) | |
{ | |
Map<?, ?> data = (Map<?, ?>) entry.getValue(); | |
addColumnsToCF((List<?>) data.get("subColumns"), stringAsType((String) entry.getKey(), comparator), cfamily); | |
// *WARNING* markForDeleteAt has been DEPRECATED at Cassandra side | |
//BigInteger deletedAt = (BigInteger) data.get("deletedAt"); | |
//SuperColumn superColumn = (SuperColumn) cfamily.getColumn(superName); | |
//superColumn.markForDeleteAt((int) (System.currentTimeMillis()/1000), deletedAt); | |
} | |
} | |
/** | |
* Convert a JSON formatted file to an SSTable. | |
* | |
* @param jsonFile | |
* the file containing JSON formatted data | |
* @param keyspace | |
* keyspace the data belongs to | |
* @param cf | |
* column family the data belongs to | |
* @param ssTablePath | |
* file to write the SSTable to | |
* @throws IOException | |
* for errors reading/writing input/output | |
*/ | |
public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException | |
{ | |
ColumnFamily columnFamily = ColumnFamily.create(keyspace, cf); | |
IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner(); | |
int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner) | |
: importUnsorted(getParser(jsonFile), columnFamily, ssTablePath, partitioner); | |
if (importedKeys != -1) | |
System.out.printf("%d keys imported successfully.%n", importedKeys); | |
} | |
private static int importUnsorted(JsonParser parser, ColumnFamily columnFamily, String ssTablePath, | |
IPartitioner<?> partitioner) throws IOException | |
{ | |
int importedKeys = 0; | |
long start = System.currentTimeMillis(); | |
Map<?, ?> data = parser.readValueAs(new TypeReference<Map<?, ?>>() { | |
}); | |
keyCountToImport = (keyCountToImport == null) ? data.size() : keyCountToImport; | |
SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); | |
System.out.printf("Importing %s keys...%n", keyCountToImport); | |
// sort by dk representation, but hold onto the hex version | |
SortedMap<DecoratedKey, String> decoratedKeys = new TreeMap<DecoratedKey, String>(); | |
for (Object keyObject : data.keySet()) | |
{ | |
String key = (String) keyObject; | |
decoratedKeys.put(partitioner.decorateKey(hexToBytes(key)), key); | |
} | |
for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet()) | |
{ | |
if (columnFamily.getType() == ColumnFamilyType.Super) | |
{ | |
addToSuperCF((Map<?, ?>) data.get(rowKey.getValue()), columnFamily); | |
} | |
else | |
{ | |
addToStandardCF((List<?>) data.get(rowKey.getValue()), columnFamily); | |
} | |
writer.append(rowKey.getKey(), columnFamily); | |
columnFamily.clear(); | |
importedKeys++; | |
long current = System.currentTimeMillis(); | |
if (current - start >= 5000) // 5 secs. | |
{ | |
System.out.printf("Currently imported %d keys.%n", importedKeys); | |
start = current; | |
} | |
if (keyCountToImport == importedKeys) | |
break; | |
} | |
writer.closeAndOpenReader(); | |
return importedKeys; | |
} | |
public static int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, | |
IPartitioner<?> partitioner) throws IOException | |
{ | |
int importedKeys = 0; // already imported keys count | |
long start = System.currentTimeMillis(); | |
JsonParser parser = getParser(jsonFile); | |
if (keyCountToImport == null) | |
{ | |
keyCountToImport = 0; | |
System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)"); | |
parser.nextToken(); // START_OBJECT | |
while (parser.nextToken() != null) | |
{ | |
parser.nextToken(); | |
parser.skipChildren(); | |
if (parser.getCurrentName() == null) | |
continue; | |
keyCountToImport++; | |
} | |
} | |
System.out.printf("Importing %s keys...%n", keyCountToImport); | |
parser = getParser(jsonFile); // renewing parser | |
SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); | |
int lineNumber = 1; | |
DecoratedKey prevStoredKey = null; | |
while (parser.nextToken() != null) | |
{ | |
String key = parser.getCurrentName(); | |
if (key != null) | |
{ | |
String tokenName = parser.nextToken().name(); | |
if (tokenName.equals("START_ARRAY")) | |
{ | |
if (columnFamily.getType() == ColumnFamilyType.Super) | |
{ | |
throw new RuntimeException("Can't write Standard columns to the Super Column Family."); | |
} | |
List<?> columns = parser.readValueAs(new TypeReference<List<?>>() { | |
}); | |
addToStandardCF(columns, columnFamily); | |
} | |
else if (tokenName.equals("START_OBJECT")) | |
{ | |
if (columnFamily.getType() == ColumnFamilyType.Standard) | |
{ | |
throw new RuntimeException("Can't write Super columns to the Standard Column Family."); | |
} | |
Map<?, ?> columns = parser.readValueAs(new TypeReference<Map<?, ?>>() { | |
}); | |
addToSuperCF(columns, columnFamily); | |
} | |
else | |
{ | |
throw new UnsupportedOperationException("Only Array or Hash allowed as row content."); | |
} | |
DecoratedKey currentKey = partitioner.decorateKey(hexToBytes(key)); | |
if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1) | |
{ | |
System.err.printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n", | |
lineNumber, key); | |
return -1; | |
} | |
// saving decorated key | |
writer.append(currentKey, columnFamily); | |
columnFamily.clear(); | |
prevStoredKey = currentKey; | |
importedKeys++; | |
lineNumber++; | |
long current = System.currentTimeMillis(); | |
if (current - start >= 5000) // 5 secs. | |
{ | |
System.out.printf("Currently imported %d keys.%n", importedKeys); | |
start = current; | |
} | |
if (keyCountToImport == importedKeys) | |
break; | |
} | |
} | |
writer.closeAndOpenReader(); | |
return importedKeys; | |
} | |
/** | |
* Get JsonParser object for file | |
* | |
* @param fileName | |
* name of the file | |
* @return json parser instance for given file | |
* @throws IOException | |
* if any I/O error. | |
*/ | |
private static JsonParser getParser(String fileName) throws IOException | |
{ | |
return factory.createJsonParser(new File(fileName)); | |
} | |
/** | |
* Converts JSON to an SSTable file. JSON input can either be a file specified using an optional command line | |
* argument, or supplied on standard in. | |
* | |
* @param args | |
* command line arguments | |
* @throws IOException | |
* on failure to open/read/write files or output streams | |
* @throws ParseException | |
* on failure to parse JSON input | |
* @throws ConfigurationException | |
* on configuration error. | |
*/ | |
public static void main(String[] args) throws IOException, ParseException, ConfigurationException | |
{ | |
CommandLineParser parser = new PosixParser(); | |
try | |
{ | |
cmd = parser.parse(options, args); | |
} | |
catch (org.apache.commons.cli.ParseException e) | |
{ | |
System.err.println(e.getMessage()); | |
printProgramUsage(); | |
System.exit(1); | |
} | |
if (cmd.getArgs().length != 2) | |
{ | |
printProgramUsage(); | |
System.exit(1); | |
} | |
String json = cmd.getArgs()[0]; | |
String ssTable = cmd.getArgs()[1]; | |
String keyspace = cmd.getOptionValue(KEYSPACE_OPTION); | |
String cfamily = cmd.getOptionValue(COLUMN_FAMILY_OPTION); | |
if (cmd.hasOption(KEY_COUNT_OPTION)) | |
{ | |
keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION)); | |
} | |
if (cmd.hasOption(IS_SORTED_OPTION)) | |
{ | |
isSorted = true; | |
} | |
DatabaseDescriptor.loadSchemas(); | |
//if (Schema.instance.getNonSystemTables().size() < 1) | |
//{ | |
// String msg = "no non-system tables are defined"; | |
// System.err.println(msg); | |
// throw new ConfigurationException(msg); | |
//} | |
try | |
{ | |
importJson(json, keyspace, cfamily, ssTable); | |
} | |
catch (Exception e) | |
{ | |
e.printStackTrace(); | |
System.err.println("ERROR: " + e.getMessage()); | |
System.exit(-1); | |
} | |
System.exit(0); | |
} | |
private static void printProgramUsage() | |
{ | |
System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n", | |
SSTableImport.class.getName()); | |
System.out.println("Options:"); | |
for (Object o : options.getOptions()) | |
{ | |
Option opt = (Option) o; | |
System.out.println(" -" + opt.getOpt() + " - " + opt.getDescription()); | |
} | |
} | |
/** | |
* Used by test framework to set key count | |
* | |
* @param keyCount | |
* numbers of keys to import | |
*/ | |
public static void setKeyCountToImport(Integer keyCount) | |
{ | |
keyCountToImport = keyCount; | |
} | |
/** | |
* Convert a string to bytes (ByteBuffer) according to type | |
* | |
* @param content | |
* string to convert | |
* @param type | |
* type to use for conversion | |
* @return byte buffer representation of the given string | |
*/ | |
private static ByteBuffer stringAsType(String content, AbstractType<?> type) | |
{ | |
try | |
{ | |
return (type == BytesType.instance) ? hexToBytes(content) : type.fromString(content); | |
} | |
catch (MarshalException e) | |
{ | |
throw new RuntimeException(e.getMessage()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment