Skip to content

Instantly share code, notes, and snippets.

@charithe
Created December 28, 2012 11:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save charithe/4397133 to your computer and use it in GitHub Desktop.
Save charithe/4397133 to your computer and use it in GitHub Desktop.
Patches to Cassandra Tools
import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletedColumn;
import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Export SSTables to JSON format.
*/
public class SSTableExport
{
private static ObjectMapper jsonMapper = new ObjectMapper();
private static final String KEY_OPTION = "k";
private static final String EXCLUDEKEY_OPTION = "x";
private static final String ENUMERATEKEYS_OPTION = "e";
private static Options options;
private static CommandLine cmd;
static
{
options = new Options();
Option optKey = new Option(KEY_OPTION, true, "Row key");
// Number of times -k <key> can be passed on the command line.
optKey.setArgs(500);
options.addOption(optKey);
Option excludeKey = new Option(EXCLUDEKEY_OPTION, true, "Excluded row key");
// Number of times -x <key> can be passed on the command line.
excludeKey.setArgs(500);
options.addOption(excludeKey);
Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
options.addOption(optEnumerate);
// disabling auto close of the stream
jsonMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
}
/**
* JSON Hash Key serializer
*
* @param out
* The output steam to write data
* @param value
* value to set as a key
*/
private static void writeKey(PrintStream out, String value)
{
writeJSON(out, value);
out.print(": ");
}
/**
* Serialize columns using given column iterator
*
* @param columns
* column iterator
* @param out
* output stream
* @param comparator
* columns comparator
* @param cfMetaData
* Column Family metadata (to get validator)
*/
private static void serializeColumns(Iterator<IColumn> columns, PrintStream out, AbstractType<?> comparator,
CFMetaData cfMetaData)
{
while (columns.hasNext())
{
writeJSON(out, serializeColumn(columns.next(), comparator, cfMetaData));
if (columns.hasNext())
out.print(", ");
}
}
/**
* Serialize a given column to the JSON format
*
* @param column
* column presentation
* @param comparator
* columns comparator
* @param cfMetaData
* Column Family metadata (to get validator)
* @return column as serialized list
*/
private static List<Object> serializeColumn(IColumn column, AbstractType<?> comparator, CFMetaData cfMetaData)
{
ArrayList<Object> serializedColumn = new ArrayList<Object>();
ByteBuffer name = ByteBufferUtil.clone(column.name());
ByteBuffer value = ByteBufferUtil.clone(column.value());
serializedColumn.add(comparator.getString(name));
if (column instanceof DeletedColumn)
{
serializedColumn.add(ByteBufferUtil.bytesToHex(value));
}
else
{
AbstractType<?> validator = cfMetaData.getValueValidator(name);
serializedColumn.add(validator.getString(value));
}
serializedColumn.add(column.timestamp());
if (column instanceof DeletedColumn)
{
serializedColumn.add("d");
}
else if (column instanceof ExpiringColumn)
{
serializedColumn.add("e");
serializedColumn.add(((ExpiringColumn) column).getTimeToLive());
serializedColumn.add(column.getLocalDeletionTime());
}
else if (column instanceof CounterColumn)
{
serializedColumn.add("c");
serializedColumn.add(((CounterColumn) column).timestampOfLastDelete());
}
return serializedColumn;
}
/**
* Get portion of the columns and serialize in loop while not more columns left in the row
*
* @param row
* SSTableIdentityIterator row representation with Column Family
* @param key
* Decorated Key for the required row
* @param out
* output stream
*/
private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
{
ColumnFamily columnFamily = row.getColumnFamily();
boolean isSuperCF = columnFamily.isSuper();
CFMetaData cfMetaData = columnFamily.metadata();
AbstractType<?> comparator = columnFamily.getComparator();
writeKey(out, bytesToHex(key.key));
out.print(isSuperCF ? "{" : "[");
if (isSuperCF)
{
while (row.hasNext())
{
IColumn column = row.next();
writeKey(out, comparator.getString(column.name()));
out.print("{");
writeKey(out, "deletedAt");
out.print(column.getMarkedForDeleteAt());
out.print(", ");
writeKey(out, "subColumns");
out.print("[");
serializeColumns(column.getSubColumns().iterator(), out, columnFamily.getSubComparator(), cfMetaData);
out.print("]");
out.print("}");
if (row.hasNext())
out.print(", ");
}
}
else
{
serializeColumns(row, out, comparator, cfMetaData);
}
out.print(isSuperCF ? "}" : "]");
}
/**
* Enumerate row keys from an SSTableReader and write the result to a PrintStream.
*
* @param desc
* the descriptor of the file to export the rows from
* @param outs
* PrintStream to write the output to
* @throws IOException
* on failure to read/write input/output
*/
public static void enumeratekeys(Descriptor desc, PrintStream outs)
throws IOException
{
KeyIterator iter = new KeyIterator(desc);
DecoratedKey lastKey = null;
while (iter.hasNext())
{
DecoratedKey key = iter.next();
// validate order of the keys in the sstable
if (lastKey != null && lastKey.compareTo(key) > 0)
throw new IOException("Key out of order! " + lastKey + " > " + key);
lastKey = key;
outs.println(bytesToHex(key.key));
}
iter.close();
outs.flush();
}
/**
* Export specific rows from an SSTable and write the resulting JSON to a PrintStream.
*
* @param desc
* the descriptor of the sstable table to read from
* @param outs
* PrintStream to write the output to
* @param toExport
* the keys corresponding to the rows to export
* @param excludes
* keys to exclude from export
* @throws IOException
* on failure to read/write input/output
*/
public static void
export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes)
throws IOException
{
SSTableReader reader = SSTableReader.open(desc);
SSTableScanner scanner = reader.getDirectScanner();
IPartitioner<?> partitioner = reader.partitioner;
if (excludes != null)
toExport.removeAll(Arrays.asList(excludes));
outs.println("{");
int i = 0;
// last key to compare order
DecoratedKey lastKey = null;
for (String key : toExport)
{
DecoratedKey decoratedKey = partitioner.decorateKey(hexToBytes(key));
if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
throw new IOException("Key out of order! " + lastKey + " > " + decoratedKey);
lastKey = decoratedKey;
scanner.seekTo(decoratedKey);
if (!scanner.hasNext())
continue;
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
if (!row.getKey().equals(decoratedKey))
continue;
serializeRow(row, decoratedKey, outs);
if (i != 0)
outs.println(",");
i++;
}
outs.println("\n}");
outs.flush();
scanner.close();
}
// This is necessary to accommodate the test suite since you cannot open a Reader more
// than once from within the same process.
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
{
Set<String> excludeSet = new HashSet<String>();
if (excludes != null)
excludeSet = new HashSet<String>(Arrays.asList(excludes));
SSTableIdentityIterator row;
SSTableScanner scanner = reader.getDirectScanner();
outs.println("{");
int i = 0;
// collecting keys to export
while (scanner.hasNext())
{
row = (SSTableIdentityIterator) scanner.next();
String currentKey = bytesToHex(row.getKey().key);
if (excludeSet.contains(currentKey))
continue;
else if (i != 0)
outs.println(",");
serializeRow(row, row.getKey(), outs);
i++;
}
outs.println("\n}");
outs.flush();
scanner.close();
}
/**
* Export an SSTable and write the resulting JSON to a PrintStream.
*
* @param desc
* the descriptor of the sstable table to read from
* @param outs
* PrintStream to write the output to
* @param excludes
* keys to exclude from export
* @throws IOException
* on failure to read/write input/output
*/
public static void export(Descriptor desc, PrintStream outs, String[] excludes) throws IOException
{
export(SSTableReader.open(desc), outs, excludes);
}
/**
* Export an SSTable and write the resulting JSON to standard out.
*
* @param desc
* the descriptor of the sstable table to read from
* @param excludes
* keys to exclude from export
* @throws IOException
* on failure to read/write SSTable/standard out
*/
public static void export(Descriptor desc, String[] excludes) throws IOException
{
export(desc, System.out, excludes);
}
/**
* Given arguments specifying an SSTable, and optionally an output file, export the contents of the SSTable to JSON.
*
* @param args
* command lines arguments
* @throws IOException
* on failure to open/read/write files or output streams
* @throws ConfigurationException
* on configuration failure (wrong params given)
*/
public static void main(String[] args) throws IOException, ConfigurationException
{
String usage =
String.format("Usage: %s <sstable> [-k key [-k key [...]] -x key [-x key [...]]]%n",
SSTableExport.class.getName());
CommandLineParser parser = new PosixParser();
try
{
cmd = parser.parse(options, args);
}
catch (ParseException e1)
{
System.err.println(e1.getMessage());
System.err.println(usage);
System.exit(1);
}
if (cmd.getArgs().length != 1)
{
System.err.println("You must supply exactly one sstable");
System.err.println(usage);
System.exit(1);
}
String[] keys = cmd.getOptionValues(KEY_OPTION);
String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION);
String ssTableFileName = new File(cmd.getArgs()[0]).getAbsolutePath();
DatabaseDescriptor.loadSchemas();
//if (Schema.instance.getNonSystemTables().size() < 1)
//{
// String msg = "no non-system tables are defined";
// System.err.println(msg);
// throw new ConfigurationException(msg);
//}
Descriptor descriptor = Descriptor.fromFilename(ssTableFileName);
if (Schema.instance.getCFMetaData(descriptor) == null)
{
System.err.println(String.format("The provided column family is not part of this cassandra database: keysapce = %s, column family = %s",
descriptor.ksname, descriptor.cfname));
System.exit(1);
}
if (cmd.hasOption(ENUMERATEKEYS_OPTION))
{
enumeratekeys(descriptor, System.out);
}
else
{
if ((keys != null) && (keys.length > 0))
export(descriptor, System.out, Arrays.asList(keys), excludes);
else
export(descriptor, excludes);
}
System.exit(0);
}
private static void writeJSON(PrintStream out, Object value)
{
try
{
jsonMapper.writeValue(out, value);
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage(), e);
}
}
}
import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.MappingJsonFactory;
import org.codehaus.jackson.type.TypeReference;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* Create SSTables from JSON input
*/
public class SSTableImport
{
private static final String KEYSPACE_OPTION = "K";
private static final String COLUMN_FAMILY_OPTION = "c";
private static final String KEY_COUNT_OPTION = "n";
private static final String IS_SORTED_OPTION = "s";
private static Options options;
private static CommandLine cmd;
private static Integer keyCountToImport = null;
private static boolean isSorted = false;
private static JsonFactory factory = new MappingJsonFactory().configure(JsonParser.Feature.INTERN_FIELD_NAMES,
false);
static
{
options = new Options();
Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name.");
optKeyspace.setRequired(true);
options.addOption(optKeyspace);
Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Column Family name.");
optColfamily.setRequired(true);
options.addOption(optColfamily);
options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional)."));
options.addOption(new Option(IS_SORTED_OPTION, false,
"Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional)."));
}
private static class JsonColumn<T>
{
private ByteBuffer name;
private ByteBuffer value;
private long timestamp;
private String kind;
// Expiring columns
private int ttl;
private int localExpirationTime;
// Counter columns
private long timestampOfLastDelete;
public JsonColumn(T json, CFMetaData meta, boolean isSubColumn)
{
if (json instanceof List)
{
AbstractType<?> comparator = (isSubColumn) ? meta.subcolumnComparator : meta.comparator;
List fields = (List<?>) json;
assert fields.size() >= 3 : "Column definition should have at least 3";
name = stringAsType((String) fields.get(0), comparator);
timestamp = (Long) fields.get(2);
kind = "";
if (fields.size() > 3)
{
if (fields.get(3) instanceof Boolean)
{
// old format, reading this for backward compatibility sake
if (fields.size() == 6)
{
kind = "e";
ttl = (Integer) fields.get(4);
localExpirationTime = (Integer) fields.get(5);
}
else
{
kind = ((Boolean) fields.get(3)) ? "d" : "";
}
}
else
{
kind = (String) fields.get(3);
if (isExpiring())
{
ttl = (Integer) fields.get(4);
localExpirationTime = (Integer) fields.get(5);
}
else if (isCounter())
{
timestampOfLastDelete = ((Integer) fields.get(4));
}
}
}
value = isDeleted() ? ByteBufferUtil.hexToBytes((String) fields.get(1))
: stringAsType((String) fields.get(1), meta.getValueValidator(name.duplicate()));
}
}
public boolean isDeleted()
{
return kind.equals("d");
}
public boolean isExpiring()
{
return kind.equals("e");
}
public boolean isCounter()
{
return kind.equals("c");
}
public ByteBuffer getName()
{
return name.duplicate();
}
public ByteBuffer getValue()
{
return value.duplicate();
}
}
private static void addToStandardCF(List<?> row, ColumnFamily cfamily)
{
addColumnsToCF(row, null, cfamily);
}
/**
* Add columns to a column family.
*
* @param row
* the columns associated with a row
* @param superName
* name of the super column if any
* @param cfamily
* the column family to add columns to
*/
private static void addColumnsToCF(List<?> row, ByteBuffer superName, ColumnFamily cfamily)
{
CFMetaData cfm = cfamily.metadata();
assert cfm != null;
for (Object c : row)
{
JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName != null));
QueryPath path = new QueryPath(cfm.cfName, superName, col.getName());
if (col.isExpiring())
{
cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl,
col.localExpirationTime));
}
else if (col.isCounter())
{
cfamily.addColumn(null, new CounterColumn(col.getName(), col.getValue(), col.timestamp,
col.timestampOfLastDelete));
}
else if (col.isDeleted())
{
cfamily.addTombstone(path, col.getValue(), col.timestamp);
}
else
{
cfamily.addColumn(path, col.getValue(), col.timestamp);
}
}
}
/**
* Add super columns to a column family.
*
* @param row
* the super columns associated with a row
* @param cfamily
* the column family to add columns to
*/
private static void addToSuperCF(Map<?, ?> row, ColumnFamily cfamily)
{
CFMetaData metaData = cfamily.metadata();
assert metaData != null;
AbstractType<?> comparator = metaData.comparator;
// Super columns
for (Map.Entry<?, ?> entry : row.entrySet())
{
Map<?, ?> data = (Map<?, ?>) entry.getValue();
addColumnsToCF((List<?>) data.get("subColumns"), stringAsType((String) entry.getKey(), comparator), cfamily);
// *WARNING* markForDeleteAt has been DEPRECATED at Cassandra side
//BigInteger deletedAt = (BigInteger) data.get("deletedAt");
//SuperColumn superColumn = (SuperColumn) cfamily.getColumn(superName);
//superColumn.markForDeleteAt((int) (System.currentTimeMillis()/1000), deletedAt);
}
}
/**
* Convert a JSON formatted file to an SSTable.
*
* @param jsonFile
* the file containing JSON formatted data
* @param keyspace
* keyspace the data belongs to
* @param cf
* column family the data belongs to
* @param ssTablePath
* file to write the SSTable to
* @throws IOException
* for errors reading/writing input/output
*/
public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException
{
ColumnFamily columnFamily = ColumnFamily.create(keyspace, cf);
IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner)
: importUnsorted(getParser(jsonFile), columnFamily, ssTablePath, partitioner);
if (importedKeys != -1)
System.out.printf("%d keys imported successfully.%n", importedKeys);
}
private static int importUnsorted(JsonParser parser, ColumnFamily columnFamily, String ssTablePath,
IPartitioner<?> partitioner) throws IOException
{
int importedKeys = 0;
long start = System.currentTimeMillis();
Map<?, ?> data = parser.readValueAs(new TypeReference<Map<?, ?>>() {
});
keyCountToImport = (keyCountToImport == null) ? data.size() : keyCountToImport;
SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
System.out.printf("Importing %s keys...%n", keyCountToImport);
// sort by dk representation, but hold onto the hex version
SortedMap<DecoratedKey, String> decoratedKeys = new TreeMap<DecoratedKey, String>();
for (Object keyObject : data.keySet())
{
String key = (String) keyObject;
decoratedKeys.put(partitioner.decorateKey(hexToBytes(key)), key);
}
for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet())
{
if (columnFamily.getType() == ColumnFamilyType.Super)
{
addToSuperCF((Map<?, ?>) data.get(rowKey.getValue()), columnFamily);
}
else
{
addToStandardCF((List<?>) data.get(rowKey.getValue()), columnFamily);
}
writer.append(rowKey.getKey(), columnFamily);
columnFamily.clear();
importedKeys++;
long current = System.currentTimeMillis();
if (current - start >= 5000) // 5 secs.
{
System.out.printf("Currently imported %d keys.%n", importedKeys);
start = current;
}
if (keyCountToImport == importedKeys)
break;
}
writer.closeAndOpenReader();
return importedKeys;
}
public static int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath,
IPartitioner<?> partitioner) throws IOException
{
int importedKeys = 0; // already imported keys count
long start = System.currentTimeMillis();
JsonParser parser = getParser(jsonFile);
if (keyCountToImport == null)
{
keyCountToImport = 0;
System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
parser.nextToken(); // START_OBJECT
while (parser.nextToken() != null)
{
parser.nextToken();
parser.skipChildren();
if (parser.getCurrentName() == null)
continue;
keyCountToImport++;
}
}
System.out.printf("Importing %s keys...%n", keyCountToImport);
parser = getParser(jsonFile); // renewing parser
SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
int lineNumber = 1;
DecoratedKey prevStoredKey = null;
while (parser.nextToken() != null)
{
String key = parser.getCurrentName();
if (key != null)
{
String tokenName = parser.nextToken().name();
if (tokenName.equals("START_ARRAY"))
{
if (columnFamily.getType() == ColumnFamilyType.Super)
{
throw new RuntimeException("Can't write Standard columns to the Super Column Family.");
}
List<?> columns = parser.readValueAs(new TypeReference<List<?>>() {
});
addToStandardCF(columns, columnFamily);
}
else if (tokenName.equals("START_OBJECT"))
{
if (columnFamily.getType() == ColumnFamilyType.Standard)
{
throw new RuntimeException("Can't write Super columns to the Standard Column Family.");
}
Map<?, ?> columns = parser.readValueAs(new TypeReference<Map<?, ?>>() {
});
addToSuperCF(columns, columnFamily);
}
else
{
throw new UnsupportedOperationException("Only Array or Hash allowed as row content.");
}
DecoratedKey currentKey = partitioner.decorateKey(hexToBytes(key));
if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
{
System.err.printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n",
lineNumber, key);
return -1;
}
// saving decorated key
writer.append(currentKey, columnFamily);
columnFamily.clear();
prevStoredKey = currentKey;
importedKeys++;
lineNumber++;
long current = System.currentTimeMillis();
if (current - start >= 5000) // 5 secs.
{
System.out.printf("Currently imported %d keys.%n", importedKeys);
start = current;
}
if (keyCountToImport == importedKeys)
break;
}
}
writer.closeAndOpenReader();
return importedKeys;
}
/**
* Get JsonParser object for file
*
* @param fileName
* name of the file
* @return json parser instance for given file
* @throws IOException
* if any I/O error.
*/
private static JsonParser getParser(String fileName) throws IOException
{
return factory.createJsonParser(new File(fileName));
}
/**
* Converts JSON to an SSTable file. JSON input can either be a file specified using an optional command line
* argument, or supplied on standard in.
*
* @param args
* command line arguments
* @throws IOException
* on failure to open/read/write files or output streams
* @throws ParseException
* on failure to parse JSON input
* @throws ConfigurationException
* on configuration error.
*/
public static void main(String[] args) throws IOException, ParseException, ConfigurationException
{
CommandLineParser parser = new PosixParser();
try
{
cmd = parser.parse(options, args);
}
catch (org.apache.commons.cli.ParseException e)
{
System.err.println(e.getMessage());
printProgramUsage();
System.exit(1);
}
if (cmd.getArgs().length != 2)
{
printProgramUsage();
System.exit(1);
}
String json = cmd.getArgs()[0];
String ssTable = cmd.getArgs()[1];
String keyspace = cmd.getOptionValue(KEYSPACE_OPTION);
String cfamily = cmd.getOptionValue(COLUMN_FAMILY_OPTION);
if (cmd.hasOption(KEY_COUNT_OPTION))
{
keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION));
}
if (cmd.hasOption(IS_SORTED_OPTION))
{
isSorted = true;
}
DatabaseDescriptor.loadSchemas();
//if (Schema.instance.getNonSystemTables().size() < 1)
//{
// String msg = "no non-system tables are defined";
// System.err.println(msg);
// throw new ConfigurationException(msg);
//}
try
{
importJson(json, keyspace, cfamily, ssTable);
}
catch (Exception e)
{
e.printStackTrace();
System.err.println("ERROR: " + e.getMessage());
System.exit(-1);
}
System.exit(0);
}
private static void printProgramUsage()
{
System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n",
SSTableImport.class.getName());
System.out.println("Options:");
for (Object o : options.getOptions())
{
Option opt = (Option) o;
System.out.println(" -" + opt.getOpt() + " - " + opt.getDescription());
}
}
/**
* Used by test framework to set key count
*
* @param keyCount
* numbers of keys to import
*/
public static void setKeyCountToImport(Integer keyCount)
{
keyCountToImport = keyCount;
}
/**
* Convert a string to bytes (ByteBuffer) according to type
*
* @param content
* string to convert
* @param type
* type to use for conversion
* @return byte buffer representation of the given string
*/
private static ByteBuffer stringAsType(String content, AbstractType<?> type)
{
try
{
return (type == BytesType.instance) ? hexToBytes(content) : type.fromString(content);
}
catch (MarshalException e)
{
throw new RuntimeException(e.getMessage());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment