Created
February 10, 2015 13:36
-
-
Save IgorBerman/1895cf2ee7462cf0de4f to your computer and use it in GitHub Desktop.
cassandra multithreaded bulk load
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.myproject.cassandraloading; | |
import static com.google.common.collect.Iterables.filter; | |
import static com.google.common.collect.Iterables.transform; | |
import java.io.File; | |
import java.lang.management.ManagementFactory; | |
import java.net.InetAddress; | |
import java.net.UnknownHostException; | |
import java.nio.ByteBuffer; | |
import java.sql.Array; | |
import java.sql.Connection; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
import java.util.Collection; | |
import java.util.Date; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.Set; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.cassandra.auth.IAuthenticator; | |
import org.apache.cassandra.config.CFMetaData; | |
import org.apache.cassandra.config.Config; | |
import org.apache.cassandra.config.DatabaseDescriptor; | |
import org.apache.cassandra.config.EncryptionOptions; | |
import org.apache.cassandra.db.Keyspace; | |
import org.apache.cassandra.db.SystemKeyspace; | |
import org.apache.cassandra.db.marshal.AbstractType; | |
import org.apache.cassandra.db.marshal.CollectionType; | |
import org.apache.cassandra.db.marshal.ColumnToCollectionType; | |
import org.apache.cassandra.db.marshal.CompositeType; | |
import org.apache.cassandra.db.marshal.EmptyType; | |
import org.apache.cassandra.db.marshal.Int32Type; | |
import org.apache.cassandra.db.marshal.LongType; | |
import org.apache.cassandra.db.marshal.MapType; | |
import org.apache.cassandra.db.marshal.ReversedType; | |
import org.apache.cassandra.db.marshal.TimestampType; | |
import org.apache.cassandra.db.marshal.UTF8Type; | |
import org.apache.cassandra.dht.Murmur3Partitioner; | |
import org.apache.cassandra.dht.Range; | |
import org.apache.cassandra.dht.Token; | |
import org.apache.cassandra.io.compress.CompressionParameters; | |
import org.apache.cassandra.io.compress.LZ4Compressor; | |
import org.apache.cassandra.io.sstable.CQLSSTableWriter; | |
import org.apache.cassandra.io.sstable.SSTableLoader; | |
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; | |
import org.apache.cassandra.streaming.ProgressInfo; | |
import org.apache.cassandra.streaming.SessionInfo; | |
import org.apache.cassandra.streaming.StreamConnectionFactory; | |
import org.apache.cassandra.streaming.StreamEvent; | |
import org.apache.cassandra.streaming.StreamEventHandler; | |
import org.apache.cassandra.streaming.StreamResultFuture; | |
import org.apache.cassandra.streaming.StreamState; | |
import org.apache.cassandra.thrift.AuthenticationRequest; | |
import org.apache.cassandra.thrift.Cassandra; | |
import org.apache.cassandra.thrift.Compression; | |
import org.apache.cassandra.thrift.ConsistencyLevel; | |
import org.apache.cassandra.thrift.CqlResult; | |
import org.apache.cassandra.thrift.CqlRow; | |
import org.apache.cassandra.thrift.ITransportFactory; | |
import org.apache.cassandra.thrift.TFramedTransportFactory; | |
import org.apache.cassandra.thrift.TokenRange; | |
import org.apache.cassandra.tools.BulkLoadConnectionFactory; | |
import org.apache.cassandra.utils.ByteBufferUtil; | |
import org.apache.cassandra.utils.OutputHandler; | |
import org.apache.commons.lang3.ArrayUtils; | |
import org.apache.ibatis.session.SqlSession; | |
import org.apache.log4j.Logger; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import org.apache.thrift.protocol.TProtocol; | |
import org.apache.thrift.transport.TTransport; | |
import org.joda.time.LocalDate; | |
import com.google.common.base.Function; | |
import com.google.common.base.Optional; | |
import com.google.common.base.Predicate; | |
import com.google.common.base.Stopwatch; | |
import com.google.common.collect.ImmutableListMultimap; | |
import com.google.common.collect.ImmutableMap; | |
import com.google.common.collect.Lists; | |
import com.google.common.collect.Multimaps; | |
import com.google.common.collect.Sets; | |
import com.google.common.io.Files; | |
import com.myproject.cassandraloading.dal.MapperFactory; | |
import com.myproject.cassandraloading.dal.TSLoaderMapper; | |
import com.myproject.cassandraloading.dal.TSLoaderSamplesForDay; | |
/* | |
CREATE KEYSPACE myproject_gihon_oper WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; | |
USE myproject_gihon_oper; | |
CREATE TABLE sample ( | |
ts_id bigint, | |
yr int, | |
t timestamp, | |
v double, | |
tgs map<varchar,varchar>, | |
PRIMARY KEY((ts_id,yr), t) | |
) WITH CLUSTERING ORDER BY (t DESC) AND compression={'sstable_compression': 'LZ4Compressor'} | |
and compaction = { 'class' : 'LeveledCompactionStrategy' }; | |
cassandra | |
TRUNCATE sample; | |
sudo ./cassandra-loading prodlikedb.myproject.dev gihon /mnt/data/gihon | |
try to running with this params, otherwise the java enters fullgc with 100% cpu load at some point | |
-javaagent:jamm-0.2.5.jar -Dorg.postgresql.forcebinary=true | |
-Xmx2g -Xms2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC | |
-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=10 | |
*/ | |
public class TimeSeriesCassandraLoader { | |
private static final Logger logger = Logger.getLogger(TimeSeriesCassandraLoader.class.getName()); | |
private static final String TABLE_NAME = "sample"; | |
private static final int BUFFER_SIZE = 32; | |
private static Function<File, Boolean> deleteFile = new Function<File, Boolean>() { | |
@Override | |
public Boolean apply(File input) { | |
return input.delete(); | |
} | |
}; | |
private static ImmutableListMultimap<Integer, TSLoaderSamplesForDay> splitByYear(List<TSLoaderSamplesForDay> readSamples) { | |
return Multimaps.index(readSamples, byYear); | |
} | |
final static Function<TSLoaderSamplesForDay, Integer> byYear = new Function<TSLoaderSamplesForDay, Integer>() { | |
@Override | |
public Integer apply(TSLoaderSamplesForDay input) { | |
return LocalDate.fromDateFields(input.dayTimestamp).year().get(); | |
} | |
}; | |
public static final String SCHEMA = | |
"CREATE TABLE %s.%s (ts_id bigint, yr int, t timestamp, v double, tgs map<varchar,varchar>, PRIMARY KEY((ts_id,yr), t)) " | |
+ " WITH CLUSTERING ORDER BY (t DESC) AND compression={'sstable_compression': 'LZ4Compressor'}" | |
+ " and compaction = { 'class' : 'LeveledCompactionStrategy' }"; | |
public static final String INSERT_STMT = "INSERT INTO %s.%s (ts_id, yr, t, v) VALUES (?,?,?,?)"; | |
public static void main(String[] args) throws Exception { | |
Stopwatch global = Stopwatch.createStarted(); | |
String dbHost = args[0]; | |
String tenantId = args[1]; | |
String outputDirPath = args[2]; | |
String cassandraSeed = args[3]; | |
long fromId = Long.parseLong(args[4]); | |
long toId = Long.parseLong(args[5]); | |
outputDirPath = outputDirPath + "/" + args[4] + "-" + args[5]; | |
int numOfWriters = Integer.parseInt(args[6]); | |
Optional<Integer> stage = Optional.absent(); | |
if (args.length == 8) { | |
stage = Optional.of(Integer.parseInt(args[7])); | |
} | |
logger.info("The output will be placed in " + outputDirPath); | |
String keySpace = String.format("myproject_%s_oper", tenantId); | |
logger.info("Reading timeseries from " + keySpace + " for tenant " + tenantId); | |
SqlSession session = MapperFactory.createSession(dbHost, keySpace, tenantId); | |
TSLoaderMapper mapper = session.getMapper(TSLoaderMapper.class); | |
List<Long> tsIds = mapper.readAllTsIds(fromId, toId); | |
session.rollback(); | |
String name = ManagementFactory.getRuntimeMXBean().getName(); | |
// magic! | |
Config.setClientMode(true); | |
File outputDir = new File(outputDirPath); | |
if (!outputDir.exists() && !outputDir.mkdirs()) { | |
throw new RuntimeException("Cannot create output directory: " + outputDir); | |
} | |
if (!stage.isPresent() || stage.get() == 1) { | |
logger.info(name + ": Cleaning output dir"); | |
initCleanup(outputDir); | |
} | |
if (!stage.isPresent() || stage.get() == 2) { | |
logger.info(name + ": Preparing sstables"); | |
prepareAndLoadSstables(keySpace, tsIds, outputDir, dbHost, tenantId, name, stage, cassandraSeed, | |
numOfWriters, true); | |
} | |
logger.info(name + ": Done in " + global.elapsed(TimeUnit.MINUTES) + " mins"); | |
System.exit(0);// some daemon threads hold jvm | |
} | |
private static final class TssReader implements Runnable { | |
private final ArrayBlockingQueue<SamplesWithId> queue; | |
private final List<Long> tsIds; | |
private final String dbHost; | |
private final String keySpace; | |
private final String tenantId; | |
private final CountDownLatch doneSignal; | |
private int i; | |
private TssReader(int i, | |
ArrayBlockingQueue<SamplesWithId> queue, | |
List<Long> tsIds, | |
String dbHost, | |
String keySpace, | |
String tenantId, | |
CountDownLatch doneSignal) { | |
this.i = i; | |
this.queue = queue; | |
this.tsIds = tsIds; | |
this.dbHost = dbHost; | |
this.keySpace = keySpace; | |
this.tenantId = tenantId; | |
this.doneSignal = doneSignal; | |
} | |
@Override | |
public void run() { | |
Thread.currentThread().setName("DBReader " + i); | |
try { | |
String name = ManagementFactory.getRuntimeMXBean().getName(); | |
int c = 1; | |
for (long tsId : tsIds) { | |
Connection conn = MapperFactory.createConnection(dbHost, keySpace, tenantId); | |
PreparedStatement statement = | |
conn.prepareStatement("select day_timestamp as dayTimestamp, times, values from sample where timeseries_id = ?"); | |
statement.setLong(1, tsId); | |
ResultSet rs = statement.executeQuery(); | |
List<TSLoaderSamplesForDay> readSamples = Lists.newArrayList(); | |
while (rs.next()) { | |
Array tArray = rs.getArray(2); | |
double[] times = ArrayUtils.toPrimitive((Double[]) tArray.getArray(), 0); | |
Array vArray = rs.getArray(3); | |
double[] values = ArrayUtils.toPrimitive((Double[]) vArray.getArray(), 0); | |
TSLoaderSamplesForDay samplesForDay = new TSLoaderSamplesForDay(rs.getDate(1), times, values); | |
readSamples.add(samplesForDay); | |
} | |
rs.close(); | |
statement.close(); | |
SamplesWithId samplesWithId = new SamplesWithId(readSamples, tsId); | |
queue.put(samplesWithId); | |
if (c % 10 == 0) { | |
logger.info(name + i + ": q size: " + queue.size()); | |
} | |
c++; | |
conn.close(); | |
} | |
doneSignal.countDown(); | |
logger.info(name + i + ": reader done"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException("Exception ", e); | |
} | |
} | |
} | |
private static final class TssCqlWriter implements Callable<File> { | |
private final String keySpace; | |
private final ArrayBlockingQueue<SamplesWithId> queue; | |
private final File baseOutputDir; | |
private final int size; | |
private final AtomicInteger c; | |
private final CountDownLatch doneSignal; | |
private final int i; | |
private final Optional<Integer> stage; | |
private final String cassandraSeed; | |
private TssCqlWriter(int i, | |
String keySpace, | |
ArrayBlockingQueue<SamplesWithId> queue, | |
File outputDir, | |
int size, | |
AtomicInteger c, | |
CountDownLatch doneSignal, | |
Optional<Integer> stage, | |
String cassandraSeed) { | |
this.i = i; | |
this.keySpace = keySpace; | |
this.queue = queue; | |
this.baseOutputDir = outputDir; | |
this.size = size; | |
this.c = c; | |
this.doneSignal = doneSignal; | |
this.stage = stage; | |
this.cassandraSeed = cassandraSeed; | |
} | |
@Override | |
public File call() throws Exception { | |
Thread.currentThread().setName("SSTableWriter " + i); | |
File outputDir = | |
new File(this.baseOutputDir, String.valueOf(i) + File.separator + keySpace + File.separator + | |
TABLE_NAME); | |
if (!outputDir.exists() && !outputDir.mkdirs()) { | |
throw new RuntimeException("Cannot create output directory: " + outputDir); | |
} | |
String name = ManagementFactory.getRuntimeMXBean().getName(); | |
try { | |
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder(); | |
String schema = String.format(SCHEMA, keySpace, TABLE_NAME); | |
builder.inDirectory(outputDir).forTable(schema).using(String.format(INSERT_STMT, keySpace, TABLE_NAME)).withPartitioner(new Murmur3Partitioner()).withBufferSizeInMB(BUFFER_SIZE); | |
final CQLSSTableWriter writer = builder.build(); | |
Stopwatch stopWatch = Stopwatch.createStarted(); | |
while (true) { | |
SamplesWithId samplesWithId = queue.poll(1000, TimeUnit.MILLISECONDS); | |
if (samplesWithId == null && doneSignal.await(0, TimeUnit.MILLISECONDS)) { | |
break; | |
} else if (samplesWithId == null) { | |
continue; | |
} | |
int c = this.c.getAndIncrement(); | |
if (c % 10 == 0) { | |
long elapsedSecs = stopWatch.elapsed(TimeUnit.SECONDS); | |
double tsPerSec = 1.0 * c / elapsedSecs; | |
logger.info(name + i + " : " + elapsedSecs + " secs / " + c + " tss done / " + tsPerSec + | |
" #/s / " + size + " / " + (size - c) / (tsPerSec * 60) + | |
" mins remaining(estimated) "); | |
} | |
List<TSLoaderSamplesForDay> samples = samplesWithId.samples; | |
ImmutableListMultimap<Integer, TSLoaderSamplesForDay> samplesByYear = splitByYear(samples); | |
for (Entry<Integer, Collection<TSLoaderSamplesForDay>> yearSamples : samplesByYear.asMap().entrySet()) { | |
int year = yearSamples.getKey(); | |
for (TSLoaderSamplesForDay daySamples : yearSamples.getValue()) { | |
for (int i = 0; i < daySamples.times.length; i++) { | |
double timeInDays = daySamples.times[i]; | |
long timestamp = (long) (TimeUnit.DAYS.toMillis(1) * timeInDays); | |
double value = daySamples.values[i]; | |
Date date = new Date(timestamp); | |
writer.addRow(samplesWithId.tsId, year,// yr | |
date,// t | |
value); | |
} | |
} | |
} | |
samplesByYear = null; | |
samplesWithId = null; | |
} | |
logger.info(name + i + ": closing writer"); | |
writer.close(); | |
logger.info(name + i + ": writer done"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException("Unexpected problem ", e); | |
} | |
if (!stage.isPresent() || stage.get() == 3) { | |
logger.info(name + ": Deleting unneccessary files"); | |
cleanup(outputDir); | |
} | |
logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + outputDir); | |
load(cassandraSeed, outputDir); | |
return outputDir; | |
} | |
} | |
private static final class TssUnsortedWriter implements Callable<File> { | |
private final String keySpace; | |
private final ArrayBlockingQueue<SamplesWithId> queue; | |
private final File baseOutputDir; | |
private final int size; | |
private final AtomicInteger c; | |
private final CountDownLatch doneSignal; | |
private final int i; | |
private final Optional<Integer> stage; | |
private final String cassandraSeed; | |
private TssUnsortedWriter(int i, | |
String keySpace, | |
ArrayBlockingQueue<SamplesWithId> queue, | |
File outputDir, | |
int size, | |
AtomicInteger c, | |
CountDownLatch doneSignal, | |
Optional<Integer> stage, | |
String cassandraSeed) { | |
this.i = i; | |
this.keySpace = keySpace; | |
this.queue = queue; | |
this.baseOutputDir = outputDir; | |
this.size = size; | |
this.c = c; | |
this.doneSignal = doneSignal; | |
this.stage = stage; | |
this.cassandraSeed = cassandraSeed; | |
} | |
@Override | |
public File call() throws UnknownHostException { | |
Thread.currentThread().setName("SSTableWriter " + i); | |
File outputDir = | |
new File(this.baseOutputDir, String.valueOf(i) + File.separator + keySpace + File.separator + | |
TABLE_NAME); | |
if (!outputDir.exists() && !outputDir.mkdirs()) { | |
throw new RuntimeException("Cannot create output directory: " + outputDir); | |
} | |
String name = ManagementFactory.getRuntimeMXBean().getName(); | |
try { | |
CollectionType mapType = MapType.getInstance(UTF8Type.instance, UTF8Type.instance); | |
Map<ByteBuffer, CollectionType> tgsTypeMap = | |
ImmutableMap.of(UTF8Type.instance.decompose("tgs"), mapType); | |
AbstractType<?> columnFComparator = | |
CompositeType.getInstance(ReversedType.getInstance(TimestampType.instance), UTF8Type.instance, | |
ColumnToCollectionType.getInstance(tgsTypeMap)); | |
SSTableSimpleUnsortedWriter writer = | |
new SSTableSimpleUnsortedWriter(outputDir, new Murmur3Partitioner(), keySpace, TABLE_NAME, | |
columnFComparator, null, BUFFER_SIZE, new CompressionParameters( | |
LZ4Compressor.create(null))); | |
long insTs = System.currentTimeMillis(); | |
CompositeType rowKeyType = CompositeType.getInstance(LongType.instance, Int32Type.instance); | |
CompositeType timeColumnType = CompositeType.getInstance(TimestampType.instance, EmptyType.instance); | |
CompositeType valueColumnType = CompositeType.getInstance(TimestampType.instance, UTF8Type.instance); | |
Stopwatch stopWatch = Stopwatch.createStarted(); | |
while (true) { | |
SamplesWithId samplesWithId = queue.poll(1000, TimeUnit.MILLISECONDS); | |
if (samplesWithId == null && doneSignal.await(0, TimeUnit.MILLISECONDS)) { | |
break; | |
} else if (samplesWithId == null) { | |
continue; | |
} | |
int c = this.c.getAndIncrement(); | |
if (c % 10 == 0) { | |
long elapsedSecs = stopWatch.elapsed(TimeUnit.SECONDS); | |
double tsPerSec = 1.0 * c / elapsedSecs; | |
logger.info(name + i + " : " + elapsedSecs + " secs / " + c + " tss done / " + tsPerSec + | |
" #/s / " + size + " / " + (size - c) / (tsPerSec * 60) + | |
" mins remaining(estimated) "); | |
} | |
List<TSLoaderSamplesForDay> samples = samplesWithId.samples; | |
long tsId = samplesWithId.tsId; | |
ImmutableListMultimap<Integer, TSLoaderSamplesForDay> samplesByYear = splitByYear(samples); | |
for (Entry<Integer, Collection<TSLoaderSamplesForDay>> yearSamples : samplesByYear.asMap().entrySet()) { | |
int year = yearSamples.getKey(); | |
for (TSLoaderSamplesForDay daySamples : yearSamples.getValue()) { | |
for (int i = 0; i < daySamples.times.length; i++) { | |
writer.newRow(rowKeyType.decompose(tsId, year)); | |
double timeInDays = daySamples.times[i]; | |
long timestamp = (long) (TimeUnit.DAYS.toMillis(1) * timeInDays); | |
double value = daySamples.values[i]; | |
Date date = new Date(timestamp); | |
writer.addColumn(timeColumnType.decompose(date, null), | |
ByteBufferUtil.EMPTY_BYTE_BUFFER, insTs); | |
writer.addColumn(valueColumnType.decompose(date, "v"), ByteBufferUtil.bytes(value), | |
insTs); | |
} | |
} | |
} | |
samplesByYear = null; | |
samplesWithId = null; | |
} | |
logger.info(name + i + ": closing writer"); | |
writer.close(); | |
logger.info(name + i + ": writer done"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
throw new RuntimeException("Unexpected problem ", e); | |
} | |
if (!stage.isPresent() || stage.get() == 3) { | |
logger.info(name + ": Deleting unneccessary files"); | |
cleanup(outputDir); | |
} | |
logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + outputDir); | |
load(cassandraSeed, outputDir); | |
return outputDir; | |
} | |
} | |
static class SamplesWithId { | |
public SamplesWithId(List<TSLoaderSamplesForDay> readSamples, long tsId2) { | |
this.samples = readSamples; | |
this.tsId = tsId2; | |
} | |
final List<TSLoaderSamplesForDay> samples; | |
final long tsId; | |
} | |
/** | |
* this what cqlsstable writer printed so we are using same defintion with unsorted writer cassandra.config.Schema | |
* DEBUG > Adding org.apache.cassandra.config.CFMetaData@293e2817[cfId=ab47e018- | |
* 7eeb-30c5-8802-7e7651d788ef,ksName=myproject_gihon_oper, cfName=sample,cfType=Standard, | |
* comparator=org.apache.cassandra | |
* .db.marshal.CompositeType(org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra | |
* .db.marshal.TimestampType), | |
* org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.ColumnToCollectionType | |
* (746773:org.apache. | |
* cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal | |
* .UTF8Type))), | |
* comment=,readRepairChance=0.0,dclocalReadRepairChance=0.1,replicateOnWrite=true,gcGraceSeconds=864000 | |
* ,defaultValidator=org.apache.cassandra.db.marshal.BytesType, | |
* keyValidator=org.apache.cassandra.db.marshal.CompositeType | |
* (org.apache.cassandra.db.marshal.LongType,org.apache.cassandra.db.marshal.Int32Type), | |
* minCompactionThreshold=4,maxCompactionThreshold=32, column_metadata={java.nio.HeapByteBuffer[pos=0 lim=3 | |
* cap=3]=ColumnDefinition{name=746773, | |
* validator=org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal | |
* .UTF8Type,org.apache.cassandra.db.marshal.UTF8Type), type=REGULAR, componentIndex=1, indexName=null, | |
* indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]=ColumnDefinition{name=74735f6964, | |
* validator=org.apache.cassandra.db.marshal.LongType, type=PARTITION_KEY, componentIndex=0, indexName=null, | |
* indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]=ColumnDefinition{name=74, | |
* validator=org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimestampType), | |
* type=CLUSTERING_KEY, componentIndex=0, indexName=null, indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=2 | |
* cap=2]=ColumnDefinition{name=7972, validator=org.apache.cassandra.db.marshal.Int32Type, type=PARTITION_KEY, | |
* componentIndex=1, indexName=null, indexType=null}, java.nio.HeapByteBuffer[pos=0 lim=1 | |
* cap=1]=ColumnDefinition{name=76, validator=org.apache.cassandra.db.marshal.DoubleType, type=REGULAR, | |
* componentIndex=1, indexName=null, indexType=null}}, compactionStrategyClass=class | |
* org.apache.cassandra.db.compaction | |
* .LeveledCompactionStrategy,compactionStrategyOptions={},compressionOptions={sstable_compression | |
* =org.apache.cassandra.io.compress.LZ4Compressor}, | |
* bloomFilterFpChance=0.1,memtable_flush_period_in_ms=0,caching=KEYS_ONLY, | |
* defaultTimeToLive=0,speculative_retry=99.0 | |
* PERCENTILE,indexInterval=128,populateIoCacheOnFlush=false,droppedColumns={},triggers={},isDense=false] to cfIdMap | |
*/ | |
private static void prepareAndLoadSstables(final String keySpace, | |
final List<Long> tsIds, | |
final File outputDir, | |
String dbHost, | |
String tenantId, | |
String name, | |
Optional<Integer> stage, | |
String cassandraSeed, | |
int numOfWriters, | |
boolean unsorted) throws Exception { | |
Stopwatch sw = Stopwatch.createStarted(); | |
int writers = numOfWriters; | |
int readers = numOfWriters * 2; | |
CountDownLatch doneSignal = new CountDownLatch(readers); | |
final ArrayBlockingQueue<SamplesWithId> queue = new ArrayBlockingQueue<SamplesWithId>(writers * 2); | |
int chunkPerEachReader = (int) (1.0 * tsIds.size() / readers + 0.5); | |
List<List<Long>> partitions = Lists.partition(tsIds, chunkPerEachReader); | |
ExecutorService pool = Executors.newFixedThreadPool(writers + readers); | |
List<Future<?>> readerFutures = Lists.newArrayList(); | |
for (int i = 0; i < readers; i++) { | |
readerFutures.add(pool.submit(new TssReader(i, queue, partitions.get(i), dbHost, keySpace, tenantId, | |
doneSignal))); | |
} | |
AtomicInteger c = new AtomicInteger(1); | |
List<Future<File>> writerFutures = Lists.newArrayList(); | |
for (int i = 0; i < writers; i++) { | |
if (unsorted) { | |
writerFutures.add(pool.submit(new TssUnsortedWriter(i, keySpace, queue, outputDir, tsIds.size(), c, | |
doneSignal, stage, cassandraSeed))); | |
} else { | |
writerFutures.add(pool.submit(new TssCqlWriter(i, keySpace, queue, outputDir, tsIds.size(), c, | |
doneSignal, stage, cassandraSeed))); | |
} | |
} | |
for (Future<?> rf : readerFutures) { | |
rf.get(); | |
} | |
List<File> dirs = Lists.newArrayList(); | |
for (Future<File> wf : writerFutures) { | |
dirs.add(wf.get()); | |
} | |
// sstable writers work with 2.1.2, with 2.0.11 or 2.0.12 I get ConcurrentModificationException | |
// but they produce diff format of sstables that can't be streamed into cassandra | |
// if (!stage.isPresent() || stage.get() == 4) { | |
// for (File dir : dirs) { | |
// logger.info(name + ": Loading into cassandra " + cassandraSeed + " " + dir); | |
// load(cassandraSeed, dir); | |
// } | |
// } | |
logger.info(name + ": Done preparing in " + sw.elapsed(TimeUnit.MINUTES) + " mins"); | |
} | |
private static void initCleanup(File outputDir) { | |
Lists.newArrayList(transform(Files.fileTreeTraverser().children(outputDir), deleteFile)); | |
} | |
static class ExternalClient extends SSTableLoader.Client { | |
private final Map<String, CFMetaData> knownCfs = new HashMap<>(); | |
private final Set<InetAddress> hosts; | |
private final int rpcPort; | |
private final String user; | |
private final String passwd; | |
private final ITransportFactory transportFactory; | |
private final int storagePort; | |
private final int sslStoragePort; | |
private final EncryptionOptions.ServerEncryptionOptions serverEncOptions; | |
public ExternalClient(Set<InetAddress> hosts, | |
int port, | |
String user, | |
String passwd, | |
ITransportFactory transportFactory, | |
int storagePort, | |
int sslStoragePort, | |
EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions) { | |
super(); | |
this.hosts = hosts; | |
this.rpcPort = port; | |
this.user = user; | |
this.passwd = passwd; | |
this.transportFactory = transportFactory; | |
this.storagePort = storagePort; | |
this.sslStoragePort = sslStoragePort; | |
this.serverEncOptions = serverEncryptionOptions; | |
} | |
@Override | |
public void init(String keyspace) { | |
Iterator<InetAddress> hostiter = hosts.iterator(); | |
while (hostiter.hasNext()) { | |
try { | |
// Query endpoint to ranges map and schemas from thrift | |
InetAddress host = hostiter.next(); | |
Cassandra.Client client = | |
createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd, | |
this.transportFactory); | |
setPartitioner(client.describe_partitioner()); | |
Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); | |
for (TokenRange tr : client.describe_ring(keyspace)) { | |
Range<Token> range = | |
new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token), | |
getPartitioner()); | |
for (String ep : tr.endpoints) { | |
addRangeForEndpoint(range, InetAddress.getByName(ep)); | |
} | |
} | |
String query = | |
String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'", Keyspace.SYSTEM_KS, | |
SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspace); | |
CqlResult result = | |
client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, | |
ConsistencyLevel.ONE); | |
for (CqlRow row : result.rows) { | |
CFMetaData metadata = CFMetaData.fromThriftCqlRow(row); | |
knownCfs.put(metadata.cfName, metadata); | |
} | |
break; | |
} catch (Exception e) { | |
if (!hostiter.hasNext()) throw new RuntimeException("Could not retrieve endpoint ranges: ", e); | |
} | |
} | |
} | |
@Override | |
public StreamConnectionFactory getConnectionFactory() { | |
return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false); | |
} | |
@Override | |
public CFMetaData getCFMetaData(String keyspace, String cfName) { | |
return knownCfs.get(cfName); | |
} | |
private static Cassandra.Client createThriftClient(String host, | |
int port, | |
String user, | |
String passwd, | |
ITransportFactory transportFactory) throws Exception { | |
TTransport trans = transportFactory.openTransport(host, port); | |
TProtocol protocol = new TBinaryProtocol(trans); | |
Cassandra.Client client = new Cassandra.Client(protocol); | |
if (user != null && passwd != null) { | |
Map<String, String> credentials = new HashMap<>(); | |
credentials.put(IAuthenticator.USERNAME_KEY, user); | |
credentials.put(IAuthenticator.PASSWORD_KEY, passwd); | |
AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials); | |
client.login(authenticationRequest); | |
} | |
return client; | |
} | |
} | |
// Return true when everything is at 100% | |
static class ProgressIndicator implements StreamEventHandler { | |
private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>(); | |
private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>(); | |
private long start; | |
private long lastProgress; | |
private long lastTime; | |
public ProgressIndicator() { | |
start = lastTime = System.nanoTime(); | |
} | |
public void onSuccess(StreamState finalState) {} | |
public void onFailure(Throwable t) {} | |
public void handleStreamEvent(StreamEvent event) { | |
if (event.eventType == StreamEvent.Type.STREAM_PREPARED) { | |
SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session; | |
sessionsByHost.put(session.peer, session); | |
} else if (event.eventType == StreamEvent.Type.FILE_PROGRESS) { | |
ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress; | |
// update progress | |
Set<ProgressInfo> progresses = progressByHost.get(progressInfo.peer); | |
if (progresses == null) { | |
progresses = Sets.newSetFromMap(new ConcurrentHashMap<ProgressInfo, Boolean>()); | |
progressByHost.put(progressInfo.peer, progresses); | |
} | |
if (progresses.contains(progressInfo)) progresses.remove(progressInfo); | |
progresses.add(progressInfo); | |
StringBuilder sb = new StringBuilder(); | |
sb.append("\rprogress: "); | |
long totalProgress = 0; | |
long totalSize = 0; | |
for (Map.Entry<InetAddress, Set<ProgressInfo>> entry : progressByHost.entrySet()) { | |
SessionInfo session = sessionsByHost.get(entry.getKey()); | |
long size = session.getTotalSizeToSend(); | |
long current = 0; | |
int completed = 0; | |
for (ProgressInfo progress : entry.getValue()) { | |
if (progress.currentBytes == progress.totalBytes) completed++; | |
current += progress.currentBytes; | |
} | |
totalProgress += current; | |
totalSize += size; | |
sb.append("[").append(entry.getKey()); | |
sb.append(" ").append(completed).append("/").append(session.getTotalFilesToSend()); | |
sb.append(" (").append(size == 0 ? 100L : current * 100L / size).append("%)] "); | |
} | |
long time = System.nanoTime(); | |
long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime); | |
lastTime = time; | |
long deltaProgress = totalProgress - lastProgress; | |
lastProgress = totalProgress; | |
sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% - "); | |
sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s"); | |
sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]"); | |
System.out.print(sb.toString()); | |
} | |
} | |
private int mbPerSec(long bytes, long timeInMs) { | |
double bytesPerMs = ((double) bytes) / timeInMs; | |
return (int) ((bytesPerMs * 1000) / (1024 * 2024)); | |
} | |
} | |
private static void load(String cassandraSeed, File outputDir) throws UnknownHostException { | |
// if from localhost might be used as | |
// Runtime.getRuntime().exec("/usr/bin/sstableloader -d 127.0.0.1 " + outputDir + "/" +keySpace + "/" + | |
// TABLE_NAME); | |
// not sure if it can be run concurrently withing same jvm | |
ITransportFactory transportFactory = new TFramedTransportFactory(); | |
EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions(); | |
OutputHandler handler = new OutputHandler.SystemOutput(true, false); | |
SSTableLoader loader = | |
new SSTableLoader(outputDir, new ExternalClient(Sets.newHashSet(InetAddress.getByName(cassandraSeed)), | |
9160, null, null, transportFactory, 7000, 7001, serverEncOptions), handler); | |
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(0); | |
StreamResultFuture future = null; | |
try { | |
Set<InetAddress> ignore = Sets.newHashSet(); | |
future = loader.stream(ignore, new ProgressIndicator()); | |
} catch (Exception e) { | |
logger.error("Problem in streaming", e); | |
} | |
logger.info(String.format("Streaming session ID: %s", future.planId)); | |
try { | |
future.get(); | |
} catch (Exception e) { | |
logger.error("Streaming to the following hosts failed:"); | |
logger.error(loader.getFailedHosts()); | |
logger.error(e); | |
} | |
} | |
private static void cleanup(File outputDir) { | |
// due to some bug in cassandra loader see | |
// http://mail-archives.apache.org/mod_mbox/incubator-cassandra-user/201412.mbox/%3CCALAMvO_7gU1rOKXO0X33rE3fEnw8bKyY_0mBRU0A-2BVAnesiA@mail.gmail.com%3E | |
Iterable<File> children = Files.fileTreeTraverser().children(outputDir); | |
Iterable<File> toBeDeleted = filter(children, new Predicate<File>() { | |
@Override | |
public boolean apply(File input) { | |
String nameWithoutExtension = Files.getNameWithoutExtension(input.getName()); | |
return false;//nameWithoutExtension.contains("Statistics"); | |
/* | |
* || nameWithoutExtension.contains("Digest") || nameWithoutExtension.contains("TOC") || | |
* nameWithoutExtension.contains("CRC") || nameWithoutExtension.contains("CompressionInfo"); | |
*/ | |
} | |
}); | |
Lists.newArrayList(transform(toBeDeleted, deleteFile)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment