Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Created February 10, 2015 13:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save IgorBerman/1895cf2ee7462cf0de4f to your computer and use it in GitHub Desktop.
Save IgorBerman/1895cf2ee7462cf0de4f to your computer and use it in GitHub Desktop.
cassandra multithreaded bulk load
package com.myproject.cassandraloading;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.ColumnToCollectionType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.EmptyType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.marshal.TimestampType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.ITransportFactory;
import org.apache.cassandra.thrift.TFramedTransportFactory;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.tools.BulkLoadConnectionFactory;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.ibatis.session.SqlSession;
import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
import org.joda.time.LocalDate;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.myproject.cassandraloading.dal.MapperFactory;
import com.myproject.cassandraloading.dal.TSLoaderMapper;
import com.myproject.cassandraloading.dal.TSLoaderSamplesForDay;
/*
CREATE KEYSPACE myproject_gihon_oper WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USE myproject_gihon_oper;
CREATE TABLE sample (
ts_id bigint,
yr int,
t timestamp,
v double,
tgs map<varchar,varchar>,
PRIMARY KEY((ts_id,yr), t)
) WITH CLUSTERING ORDER BY (t DESC) AND compression={'sstable_compression': 'LZ4Compressor'}
and compaction = { 'class' : 'LeveledCompactionStrategy' };
cassandra
TRUNCATE sample;
sudo ./cassandra-loading prodlikedb.myproject.dev gihon /mnt/data/gihon
try to running with this params, otherwise the java enters fullgc with 100% cpu load at some point
-javaagent:jamm-0.2.5.jar -Dorg.postgresql.forcebinary=true
-Xmx2g -Xms2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=10
*/
public class TimeSeriesCassandraLoader {
private static final Logger logger = Logger.getLogger(TimeSeriesCassandraLoader.class.getName());
private static final String TABLE_NAME = "sample";
private static final int BUFFER_SIZE = 32;
private static Function<File, Boolean> deleteFile = new Function<File, Boolean>() {
@Override
public Boolean apply(File input) {
return input.delete();
}
};
private static ImmutableListMultimap<Integer, TSLoaderSamplesForDay> splitByYear(List<TSLoaderSamplesForDay> readSamples) {
return Multimaps.index(readSamples, byYear);
}
final static Function<TSLoaderSamplesForDay, Integer> byYear = new Function<TSLoaderSamplesForDay, Integer>() {
@Override
public Integer apply(TSLoaderSamplesForDay input) {
return LocalDate.fromDateFields(input.dayTimestamp).year().get();
}
};
public static final String SCHEMA =
"CREATE TABLE %s.%s (ts_id bigint, yr int, t timestamp, v double, tgs map<varchar,varchar>, PRIMARY KEY((ts_id,yr), t)) "
+ " WITH CLUSTERING ORDER BY (t DESC) AND compression={'sstable_compression': 'LZ4Compressor'}"
+ " and compaction = { 'class' : 'LeveledCompactionStrategy' }";
public static final String INSERT_STMT = "INSERT INTO %s.%s (ts_id, yr, t, v) VALUES (?,?,?,?)";
public static void main(String[] args) throws Exception {
Stopwatch global = Stopwatch.createStarted();
String dbHost = args[0];
String tenantId = args[1];
String outputDirPath = args[2];
String cassandraSeed = args[3];
long fromId = Long.parseLong(args[4]);
long toId = Long.parseLong(args[5]);
outputDirPath = outputDirPath + "/" + args[4] + "-" + args[5];
int numOfWriters = Integer.parseInt(args[6]);
Optional<Integer> stage = Optional.absent();
if (args.length == 8) {
stage = Optional.of(Integer.parseInt(args[7]));
}
logger.info("The output will be placed in " + outputDirPath);
String keySpace = String.format("myproject_%s_oper", tenantId);
logger.info("Reading timeseries from " + keySpace + " for tenant " + tenantId);
SqlSession session = MapperFactory.createSession(dbHost, keySpace, tenantId);
TSLoaderMapper mapper = session.getMapper(TSLoaderMapper.class);
List<Long> tsIds = mapper.readAllTsIds(fromId, toId);
session.rollback();
String name = ManagementFactory.getRuntimeMXBean().getName();
// magic!
Config.setClientMode(true);
File outputDir = new File(outputDirPath);
if (!outputDir.exists() && !outputDir.mkdirs()) {
throw new RuntimeException("Cannot create output directory: " + outputDir);
}
if (!stage.isPresent() || stage.get() == 1) {
logger.info(name + ": Cleaning output dir");
initCleanup(outputDir);
}
if (!stage.isPresent() || stage.get() == 2) {
logger.info(name + ": Preparing sstables");
prepareAndLoadSstables(keySpace, tsIds, outputDir, dbHost, tenantId, name, stage, cassandraSeed,
numOfWriters, true);
}
logger.info(name + ": Done in " + global.elapsed(TimeUnit.MINUTES) + " mins");
System.exit(0);// some daemon threads hold jvm
}
private static final class TssReader implements Runnable {
private final ArrayBlockingQueue<SamplesWithId> queue;
private final List<Long> tsIds;
private final String dbHost;
private final String keySpace;
private final String tenantId;
private final CountDownLatch doneSignal;
private int i;
private TssReader(int i,
ArrayBlockingQueue<SamplesWithId> queue,
List<Long> tsIds,
String dbHost,
String keySpace,
String tenantId,
CountDownLatch doneSignal) {
this.i = i;
this.queue = queue;
this.tsIds = tsIds;
this.dbHost = dbHost;
this.keySpace = keySpace;
this.tenantId = tenantId;
this.doneSignal = doneSignal;
}
@Override
public void run() {
Thread.currentThread().setName("DBReader " + i);
try {
String name = ManagementFactory.getRuntimeMXBean().getName();
int c = 1;
for (long tsId : tsIds) {
Connection conn = MapperFactory.createConnection(dbHost, keySpace, tenantId);
PreparedStatement statement =
conn.prepareStatement("select day_timestamp as dayTimestamp, times, values from sample where timeseries_id = ?");
statement.setLong(1, tsId);
ResultSet rs = statement.executeQuery();
List<TSLoaderSamplesForDay> readSamples = Lists.newArrayList();
while (rs.next()) {
Array tArray = rs.getArray(2);
double[] times = ArrayUtils.toPrimitive((Double[]) tArray.getArray(), 0);
Array vArray = rs.getArray(3);
double[] values = ArrayUtils.toPrimitive((Double[]) vArray.getArray(), 0);
TSLoaderSamplesForDay samplesForDay = new TSLoaderSamplesForDay(rs.getDate(1), times, values);
readSamples.add(samplesForDay);
}
rs.close();
statement.close();
SamplesWithId samplesWithId = new SamplesWithId(readSamples, tsId);
queue.put(samplesWithId);
if (c % 10 == 0) {
logger.info(name + i + ": q size: " + queue.size());
}
c++;
conn.close();
}
doneSignal.countDown();
logger.info(name + i + ": reader done");
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Exception ", e);
}
}
}
private static final class TssCqlWriter implements Callable<File> {
private final String keySpace;
private final ArrayBlockingQueue<SamplesWithId> queue;
private final File baseOutputDir;
private final int size;
private final AtomicInteger c;
private final CountDownLatch doneSignal;
private final int i;
private final Optional<Integer> stage;
private final String cassandraSeed;
private TssCqlWriter(int i,
String keySpace,
ArrayBlockingQueue<SamplesWithId> queue,
File outputDir,
int size,
AtomicInteger c,
CountDownLatch doneSignal,
Optional<Integer> stage,
String cassandraSeed) {
this.i = i;
this.keySpace = keySpace;
this.queue = queue;
this.baseOutputDir = outputDir;
this.size = size;
this.c = c;
this.doneSignal = doneSignal;
this.stage = stage;
this.cassandraSeed = cassandraSeed;
}
@Override
public File call() throws Exception {
Thread.currentThread().setName("SSTableWriter " + i);
File outputDir =
new File(this.baseOutputDir, String.valueOf(i) + File.separator + keySpace + File.separator +
TABLE_NAME);
if (!outputDir.exists() && !outputDir.mkdirs()) {
throw new RuntimeException("Cannot create output directory: " + outputDir);
}
String name = ManagementFactory.getRuntimeMXBean().getName();
try {
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
String schema = String.format(SCHEMA, keySpace, TABLE_NAME);
builder.inDirectory(outputDir).forTable(schema).using(String.format(INSERT_STMT, keySpace, TABLE_NAME)).withPartitioner(new Murmur3Partitioner()).withBufferSizeInMB(BUFFER_SIZE);
final CQLSSTableWriter writer = builder.build();
Stopwatch stopWatch = Stopwatch.createStarted();
while (true) {
SamplesWithId samplesWithId = queue.poll(1000, TimeUnit.MILLISECONDS);
if (samplesWithId == null && doneSignal.await(0, TimeUnit.MILLISECONDS)) {
break;
} else if (samplesWithId == null) {
continue;
}
int c = this.c.getAndIncrement();
if (c % 10 == 0) {
long elapsedSecs = stopWatch.elapsed(TimeUnit.SECONDS);
double tsPerSec = 1.0 * c / elapsedSecs;
logger.info(name + i + " : " + elapsedSecs + " secs / " + c + " tss done / " + tsPerSec +
" #/s / " + size + " / " + (size - c) / (tsPerSec * 60) +
" mins remaining(estimated) ");
}
List<TSLoaderSamplesForDay> samples = samplesWithId.samples;
ImmutableListMultimap<Integer, TSLoaderSamplesForDay> samplesByYear = splitByYear(samples);
for (Entry<Integer, Collection<TSLoaderSamplesForDay>> yearSamples : samplesByYear.asMap().entrySet()) {
int year = yearSamples.getKey();
for (TSLoaderSamplesForDay daySamples : yearSamples.getValue()) {
for (int i = 0; i < daySamples.times.length; i++) {
double timeInDays = daySamples.times[i];
long timestamp = (long) (TimeUnit.DAYS.toMillis(1) * timeInDays);
double value = daySamples.values[i];
Date date = new Date(timestamp);
writer.addRow(samplesWithId.tsId, year,// yr
date,// t
value);
}
}
}
samplesByYear = null;
samplesWithId = null;
}
logger.info(name + i + ": closing writer");
writer.close();
logger.info(name + i + ": writer done");
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Unexpected problem ", e);
}
if (!stage.isPresent() || stage.get() == 3) {
logger.info(name + ": Deleting unneccessary files");
cleanup(outputDir);
}
logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + outputDir);
load(cassandraSeed, outputDir);
return outputDir;
}
}
private static final class TssUnsortedWriter implements Callable<File> {
private final String keySpace;
private final ArrayBlockingQueue<SamplesWithId> queue;
private final File baseOutputDir;
private final int size;
private final AtomicInteger c;
private final CountDownLatch doneSignal;
private final int i;
private final Optional<Integer> stage;
private final String cassandraSeed;
private TssUnsortedWriter(int i,
String keySpace,
ArrayBlockingQueue<SamplesWithId> queue,
File outputDir,
int size,
AtomicInteger c,
CountDownLatch doneSignal,
Optional<Integer> stage,
String cassandraSeed) {
this.i = i;
this.keySpace = keySpace;
this.queue = queue;
this.baseOutputDir = outputDir;
this.size = size;
this.c = c;
this.doneSignal = doneSignal;
this.stage = stage;
this.cassandraSeed = cassandraSeed;
}
@Override
public File call() throws UnknownHostException {
Thread.currentThread().setName("SSTableWriter " + i);
File outputDir =
new File(this.baseOutputDir, String.valueOf(i) + File.separator + keySpace + File.separator +
TABLE_NAME);
if (!outputDir.exists() && !outputDir.mkdirs()) {
throw new RuntimeException("Cannot create output directory: " + outputDir);
}
String name = ManagementFactory.getRuntimeMXBean().getName();
try {
CollectionType mapType = MapType.getInstance(UTF8Type.instance, UTF8Type.instance);
Map<ByteBuffer, CollectionType> tgsTypeMap =
ImmutableMap.of(UTF8Type.instance.decompose("tgs"), mapType);
AbstractType<?> columnFComparator =
CompositeType.getInstance(ReversedType.getInstance(TimestampType.instance), UTF8Type.instance,
ColumnToCollectionType.getInstance(tgsTypeMap));
SSTableSimpleUnsortedWriter writer =
new SSTableSimpleUnsortedWriter(outputDir, new Murmur3Partitioner(), keySpace, TABLE_NAME,
columnFComparator, null, BUFFER_SIZE, new CompressionParameters(
LZ4Compressor.create(null)));
long insTs = System.currentTimeMillis();
CompositeType rowKeyType = CompositeType.getInstance(LongType.instance, Int32Type.instance);
CompositeType timeColumnType = CompositeType.getInstance(TimestampType.instance, EmptyType.instance);
CompositeType valueColumnType = CompositeType.getInstance(TimestampType.instance, UTF8Type.instance);
Stopwatch stopWatch = Stopwatch.createStarted();
while (true) {
SamplesWithId samplesWithId = queue.poll(1000, TimeUnit.MILLISECONDS);
if (samplesWithId == null && doneSignal.await(0, TimeUnit.MILLISECONDS)) {
break;
} else if (samplesWithId == null) {
continue;
}
int c = this.c.getAndIncrement();
if (c % 10 == 0) {
long elapsedSecs = stopWatch.elapsed(TimeUnit.SECONDS);
double tsPerSec = 1.0 * c / elapsedSecs;
logger.info(name + i + " : " + elapsedSecs + " secs / " + c + " tss done / " + tsPerSec +
" #/s / " + size + " / " + (size - c) / (tsPerSec * 60) +
" mins remaining(estimated) ");
}
List<TSLoaderSamplesForDay> samples = samplesWithId.samples;
long tsId = samplesWithId.tsId;
ImmutableListMultimap<Integer, TSLoaderSamplesForDay> samplesByYear = splitByYear(samples);
for (Entry<Integer, Collection<TSLoaderSamplesForDay>> yearSamples : samplesByYear.asMap().entrySet()) {
int year = yearSamples.getKey();
for (TSLoaderSamplesForDay daySamples : yearSamples.getValue()) {
for (int i = 0; i < daySamples.times.length; i++) {
writer.newRow(rowKeyType.decompose(tsId, year));
double timeInDays = daySamples.times[i];
long timestamp = (long) (TimeUnit.DAYS.toMillis(1) * timeInDays);
double value = daySamples.values[i];
Date date = new Date(timestamp);
writer.addColumn(timeColumnType.decompose(date, null),
ByteBufferUtil.EMPTY_BYTE_BUFFER, insTs);
writer.addColumn(valueColumnType.decompose(date, "v"), ByteBufferUtil.bytes(value),
insTs);
}
}
}
samplesByYear = null;
samplesWithId = null;
}
logger.info(name + i + ": closing writer");
writer.close();
logger.info(name + i + ": writer done");
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Unexpected problem ", e);
}
if (!stage.isPresent() || stage.get() == 3) {
logger.info(name + ": Deleting unneccessary files");
cleanup(outputDir);
}
logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + outputDir);
load(cassandraSeed, outputDir);
return outputDir;
}
}
static class SamplesWithId {
public SamplesWithId(List<TSLoaderSamplesForDay> readSamples, long tsId2) {
this.samples = readSamples;
this.tsId = tsId2;
}
final List<TSLoaderSamplesForDay> samples;
final long tsId;
}
/**
* this what cqlsstable writer printed so we are using same defintion with unsorted writer cassandra.config.Schema
* DEBUG > Adding org.apache.cassandra.config.CFMetaData@293e2817[cfId=ab47e018-
* 7eeb-30c5-8802-7e7651d788ef,ksName=myproject_gihon_oper, cfName=sample,cfType=Standard,
* comparator=org.apache.cassandra
* .db.marshal.CompositeType(org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra
* .db.marshal.TimestampType),
* org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType
* (746773:org.apache.
* cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal
* .UTF8Type))),
* comment=,readRepairChance=0.0,dclocalReadRepairChance=0.1,replicateOnWrite=true,gcGraceSeconds=864000
* ,defaultValidator=org.apache.cassandra.db.marshal.BytesType,
* keyValidator=org.apache.cassandra.db.marshal.CompositeType
* (org.apache.cassandra.db.marshal.LongType,org.apache.cassandra.db.marshal.Int32Type),
* minCompactionThreshold=4,maxCompactionThreshold=32, column_metadata={java.nio.HeapByteBuffer[pos=0 lim=3
* cap=3]=ColumnDefinition{name=746773,
* validator=org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal
* .UTF8Type,org.apache.cassandra.db.marshal.UTF8Type), type=REGULAR, componentIndex=1, indexName=null,
* indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]=ColumnDefinition{name=74735f6964,
* validator=org.apache.cassandra.db.marshal.LongType, type=PARTITION_KEY, componentIndex=0, indexName=null,
* indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]=ColumnDefinition{name=74,
* validator=org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimestampType),
* type=CLUSTERING_KEY, componentIndex=0, indexName=null, indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=2
* cap=2]=ColumnDefinition{name=7972, validator=org.apache.cassandra.db.marshal.Int32Type, type=PARTITION_KEY,
* componentIndex=1, indexName=null, indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=1
* cap=1]=ColumnDefinition{name=76, validator=org.apache.cassandra.db.marshal.DoubleType, type=REGULAR,
* componentIndex=1, indexName=null, indexType=null}}, compactionStrategyClass=class
* org.apache.cassandra.db.compaction
* .LeveledCompactionStrategy,compactionStrategyOptions={},compressionOptions={sstable_compression
* =org.apache.cassandra.io.compress.LZ4Compressor},
* bloomFilterFpChance=0.1,memtable_flush_period_in_ms=0,caching=KEYS_ONLY,
* defaultTimeToLive=0,speculative_retry=99.0
* PERCENTILE,indexInterval=128,populateIoCacheOnFlush=false,droppedColumns={},triggers={},isDense=false] to cfIdMap
*/
private static void prepareAndLoadSstables(final String keySpace,
final List<Long> tsIds,
final File outputDir,
String dbHost,
String tenantId,
String name,
Optional<Integer> stage,
String cassandraSeed,
int numOfWriters,
boolean unsorted) throws Exception {
Stopwatch sw = Stopwatch.createStarted();
int writers = numOfWriters;
int readers = numOfWriters * 2;
CountDownLatch doneSignal = new CountDownLatch(readers);
final ArrayBlockingQueue<SamplesWithId> queue = new ArrayBlockingQueue<SamplesWithId>(writers * 2);
int chunkPerEachReader = (int) (1.0 * tsIds.size() / readers + 0.5);
List<List<Long>> partitions = Lists.partition(tsIds, chunkPerEachReader);
ExecutorService pool = Executors.newFixedThreadPool(writers + readers);
List<Future<?>> readerFutures = Lists.newArrayList();
for (int i = 0; i < readers; i++) {
readerFutures.add(pool.submit(new TssReader(i, queue, partitions.get(i), dbHost, keySpace, tenantId,
doneSignal)));
}
AtomicInteger c = new AtomicInteger(1);
List<Future<File>> writerFutures = Lists.newArrayList();
for (int i = 0; i < writers; i++) {
if (unsorted) {
writerFutures.add(pool.submit(new TssUnsortedWriter(i, keySpace, queue, outputDir, tsIds.size(), c,
doneSignal, stage, cassandraSeed)));
} else {
writerFutures.add(pool.submit(new TssCqlWriter(i, keySpace, queue, outputDir, tsIds.size(), c,
doneSignal, stage, cassandraSeed)));
}
}
for (Future<?> rf : readerFutures) {
rf.get();
}
List<File> dirs = Lists.newArrayList();
for (Future<File> wf : writerFutures) {
dirs.add(wf.get());
}
// sstable writers work with 2.1.2, with 2.0.11 or 2.0.12 I get ConcurrentModificationException
// but they produce diff format of sstables that can't be streamed into cassandra
// if (!stage.isPresent() || stage.get() == 4) {
// for (File dir : dirs) {
// logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + dir);
// load(cassandraSeed, dir);
// }
// }
logger.info(name + ": Done preparing in " + sw.elapsed(TimeUnit.MINUTES) + " mins");
}
private static void initCleanup(File outputDir) {
Lists.newArrayList(transform(Files.fileTreeTraverser().children(outputDir), deleteFile));
}
static class ExternalClient extends SSTableLoader.Client {
private final Map<String, CFMetaData> knownCfs = new HashMap<>();
private final Set<InetAddress> hosts;
private final int rpcPort;
private final String user;
private final String passwd;
private final ITransportFactory transportFactory;
private final int storagePort;
private final int sslStoragePort;
private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
public ExternalClient(Set<InetAddress> hosts,
int port,
String user,
String passwd,
ITransportFactory transportFactory,
int storagePort,
int sslStoragePort,
EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions) {
super();
this.hosts = hosts;
this.rpcPort = port;
this.user = user;
this.passwd = passwd;
this.transportFactory = transportFactory;
this.storagePort = storagePort;
this.sslStoragePort = sslStoragePort;
this.serverEncOptions = serverEncryptionOptions;
}
@Override
public void init(String keyspace) {
Iterator<InetAddress> hostiter = hosts.iterator();
while (hostiter.hasNext()) {
try {
// Query endpoint to ranges map and schemas from thrift
InetAddress host = hostiter.next();
Cassandra.Client client =
createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd,
this.transportFactory);
setPartitioner(client.describe_partitioner());
Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
for (TokenRange tr : client.describe_ring(keyspace)) {
Range<Token> range =
new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token),
getPartitioner());
for (String ep : tr.endpoints) {
addRangeForEndpoint(range, InetAddress.getByName(ep));
}
}
String query =
String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'", Keyspace.SYSTEM_KS,
SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspace);
CqlResult result =
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE,
ConsistencyLevel.ONE);
for (CqlRow row : result.rows) {
CFMetaData metadata = CFMetaData.fromThriftCqlRow(row);
knownCfs.put(metadata.cfName, metadata);
}
break;
} catch (Exception e) {
if (!hostiter.hasNext()) throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
}
}
}
@Override
public StreamConnectionFactory getConnectionFactory() {
return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
}
@Override
public CFMetaData getCFMetaData(String keyspace, String cfName) {
return knownCfs.get(cfName);
}
private static Cassandra.Client createThriftClient(String host,
int port,
String user,
String passwd,
ITransportFactory transportFactory) throws Exception {
TTransport trans = transportFactory.openTransport(host, port);
TProtocol protocol = new TBinaryProtocol(trans);
Cassandra.Client client = new Cassandra.Client(protocol);
if (user != null && passwd != null) {
Map<String, String> credentials = new HashMap<>();
credentials.put(IAuthenticator.USERNAME_KEY, user);
credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
client.login(authenticationRequest);
}
return client;
}
}
// Return true when everything is at 100%
static class ProgressIndicator implements StreamEventHandler {
private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>();
private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>();
private long start;
private long lastProgress;
private long lastTime;
public ProgressIndicator() {
start = lastTime = System.nanoTime();
}
public void onSuccess(StreamState finalState) {}
public void onFailure(Throwable t) {}
public void handleStreamEvent(StreamEvent event) {
if (event.eventType == StreamEvent.Type.STREAM_PREPARED) {
SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session;
sessionsByHost.put(session.peer, session);
} else if (event.eventType == StreamEvent.Type.FILE_PROGRESS) {
ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;
// update progress
Set<ProgressInfo> progresses = progressByHost.get(progressInfo.peer);
if (progresses == null) {
progresses = Sets.newSetFromMap(new ConcurrentHashMap<ProgressInfo, Boolean>());
progressByHost.put(progressInfo.peer, progresses);
}
if (progresses.contains(progressInfo)) progresses.remove(progressInfo);
progresses.add(progressInfo);
StringBuilder sb = new StringBuilder();
sb.append("\rprogress: ");
long totalProgress = 0;
long totalSize = 0;
for (Map.Entry<InetAddress, Set<ProgressInfo>> entry : progressByHost.entrySet()) {
SessionInfo session = sessionsByHost.get(entry.getKey());
long size = session.getTotalSizeToSend();
long current = 0;
int completed = 0;
for (ProgressInfo progress : entry.getValue()) {
if (progress.currentBytes == progress.totalBytes) completed++;
current += progress.currentBytes;
}
totalProgress += current;
totalSize += size;
sb.append("[").append(entry.getKey());
sb.append(" ").append(completed).append("/").append(session.getTotalFilesToSend());
sb.append(" (").append(size == 0 ? 100L : current * 100L / size).append("%)] ");
}
long time = System.nanoTime();
long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
lastTime = time;
long deltaProgress = totalProgress - lastProgress;
lastProgress = totalProgress;
sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% - ");
sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
System.out.print(sb.toString());
}
}
private int mbPerSec(long bytes, long timeInMs) {
double bytesPerMs = ((double) bytes) / timeInMs;
return (int) ((bytesPerMs * 1000) / (1024 * 2024));
}
}
private static void load(String cassandraSeed, File outputDir) throws UnknownHostException {
// if from localhost might be used as
// Runtime.getRuntime().exec("/usr/bin/sstableloader -d 127.0.0.1 " + outputDir + "/" +keySpace + "/" +
// TABLE_NAME);
// not sure if it can be run concurrently withing same jvm
ITransportFactory transportFactory = new TFramedTransportFactory();
EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
OutputHandler handler = new OutputHandler.SystemOutput(true, false);
SSTableLoader loader =
new SSTableLoader(outputDir, new ExternalClient(Sets.newHashSet(InetAddress.getByName(cassandraSeed)),
9160, null, null, transportFactory, 7000, 7001, serverEncOptions), handler);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(0);
StreamResultFuture future = null;
try {
Set<InetAddress> ignore = Sets.newHashSet();
future = loader.stream(ignore, new ProgressIndicator());
} catch (Exception e) {
logger.error("Problem in streaming", e);
}
logger.info(String.format("Streaming session ID: %s", future.planId));
try {
future.get();
} catch (Exception e) {
logger.error("Streaming to the following hosts failed:");
logger.error(loader.getFailedHosts());
logger.error(e);
}
}
private static void cleanup(File outputDir) {
// due to some bug in cassandra loader see
// http://mail-archives.apache.org/mod_mbox/incubator-cassandra-user/201412.mbox/%3CCALAMvO_7gU1rOKXO0X33rE3fEnw8bKyY_0mBRU0A-2BVAnesiA@mail.gmail.com%3E
Iterable<File> children = Files.fileTreeTraverser().children(outputDir);
Iterable<File> toBeDeleted = filter(children, new Predicate<File>() {
@Override
public boolean apply(File input) {
String nameWithoutExtension = Files.getNameWithoutExtension(input.getName());
return false;//nameWithoutExtension.contains("Statistics");
/*
* || nameWithoutExtension.contains("Digest") || nameWithoutExtension.contains("TOC") ||
* nameWithoutExtension.contains("CRC") || nameWithoutExtension.contains("CompressionInfo");
*/
}
});
Lists.newArrayList(transform(toBeDeleted, deleteFile));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment