Skip to content

Instantly share code, notes, and snippets.

@bobrik
Created July 20, 2016 11:06
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bobrik/81748aaec4c349b736e8a103d1515103 to your computer and use it in GitHub Desktop.
Save bobrik/81748aaec4c349b736e8a103d1515103 to your computer and use it in GitHub Desktop.
package com.cloudflare;
import com.google.common.collect.Iterators;
import net.opentsdb.core.IllegalDataException;
import net.opentsdb.core.Internal;
import net.opentsdb.core.TSDB;
import net.opentsdb.utils.Config;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.hbase.async.KeyValue;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
public class OpenTSDBInput {
private OpenTSDBInput() {
}
public static JavaRDD<DataPoint> rdd(JavaSparkContext ctx, Config tsdbConfig, Configuration hbaseConfigOverrides) {
Configuration srcConf = HBaseConfiguration.create();
for (Map.Entry<String, String> entry : hbaseConfigOverrides) {
srcConf.set(entry.getKey(), entry.getValue());
}
srcConf.set("hbase.zookeeper.quorum", tsdbConfig.getString("tsd.storage.hbase.zk_quorum"));
srcConf.set("zookeeper.znode.parent", tsdbConfig.getString("tsd.storage.hbase.zk_basedir"));
srcConf.set(TableInputFormat.INPUT_TABLE, tsdbConfig.getString("tsd.storage.hbase.data_table"));
JavaPairRDD<ImmutableBytesWritable, Result> rdd = ctx.newAPIHadoopRDD(srcConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
Map<String, String> config = new ExportableConfig(tsdbConfig).export();
return rdd.mapPartitions(new DataPointPartitionMapper(config))
.flatMap(dps -> dps);
}
public static class ExportableConfig extends Config {
public ExportableConfig(Config parent) {
super(parent);
}
public Map<String, String> export() {
return new HashMap<>(properties);
}
}
public static class DataPoint implements Serializable {
private String metric;
private Map<String, String> tags;
private Long timestamp;
private Number value;
public DataPoint(String metric, Map<String, String> tags, Long timestamp, Number value) {
this.metric = metric;
this.tags = tags;
this.timestamp = timestamp;
this.value = value;
}
public String getMetric() {
return metric;
}
public Map<String, String> getTags() {
return tags;
}
public Long getTimestamp() {
return timestamp;
}
public Number getValue() {
return value;
}
@Override
public String toString() {
String line = metric + " " + timestamp + " " + value;
if (tags.size() > 0) {
List<String> kvs = new LinkedList<>();
for (Map.Entry<String, String> tag : tags.entrySet()) {
kvs.add(tag.getKey() + "=" + tag.getValue());
}
return line + " " + StringUtils.join(kvs, " ");
}
return line;
}
}
public static class DataPointPartitionMapper implements FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, Iterable<DataPoint>> {
private Map<String, String> conf;
private TSDB tsdb;
public DataPointPartitionMapper(Map<String, String> conf) {
this.conf = conf;
}
protected TSDB getTSDB() throws IOException {
if (this.tsdb == null) {
Config config = new Config(false);
for (Map.Entry<String, String> entry : this.conf.entrySet()) {
config.overrideConfig(entry.getKey(), entry.getValue());
}
this.tsdb = new TSDB(config);
}
return this.tsdb;
}
protected void shutdown() {
if (this.tsdb != null) {
try {
this.tsdb.shutdown().joinUninterruptibly();
} catch (Exception e) {
throw new RuntimeException("Cannot shutdown OpenTSDB instance", e);
}
this.tsdb = null;
}
}
@Override
public Iterable<Iterable<DataPoint>> call(Iterator<Tuple2<ImmutableBytesWritable, Result>> rows) throws Exception {
return new Iterable<Iterable<DataPoint>>() {
@Override
public Iterator<Iterable<DataPoint>> iterator() {
Iterator<Iterable<DataPoint>> i = Iterators.transform(rows, (Tuple2<ImmutableBytesWritable, Result> row) -> process(row));
return new Iterator<Iterable<DataPoint>>() {
@Override
public boolean hasNext() {
if (!i.hasNext()) {
shutdown();
return false;
}
return i.hasNext();
}
@Override
public Iterable<DataPoint> next() {
return i.next();
}
};
}
};
}
public Iterable<DataPoint> process(Tuple2<ImmutableBytesWritable, Result> row) {
TSDB tsdb;
try {
tsdb = getTSDB();
} catch (IOException e) {
throw new RuntimeException("Cannot get OpenTSDB instance", e);
}
byte[] key = row._1().get();
String metric = Internal.metricName(tsdb, key);
long baseTime = Internal.baseTime(tsdb, key);
Map<String, String> tags = Internal.getTags(tsdb, key);
List<DataPoint> dps = new LinkedList<>();
for (Cell cell : row._2().rawCells()) {
byte[] family = Arrays.copyOfRange(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyOffset() + cell.getFamilyLength());
byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
KeyValue kv = new KeyValue(key, family, qualifier, cell.getTimestamp(), value);
if (qualifier.length == 2 || qualifier.length == 4 && Internal.inMilliseconds(qualifier)) {
Internal.Cell c = Internal.parseSingleValue(kv);
if (c == null) {
throw new IllegalDataException("Unable to parse row: " + kv);
}
dps.add(parseCell(metric, tags, c, baseTime));
} else {
// compacted column
ArrayList<Internal.Cell> cells;
try {
cells = Internal.extractDataPoints(kv);
} catch (IllegalDataException e) {
throw new IllegalDataException(Bytes.toStringBinary(key), e);
}
for (Internal.Cell c : cells) {
dps.add(parseCell(metric, tags, c, baseTime));
}
}
}
return dps;
}
protected DataPoint parseCell(String metric, Map<String, String> tags, Internal.Cell cell, long baseTime) {
return new DataPoint(metric, tags, cell.absoluteTimestamp(baseTime), cell.parseValue());
}
}
}
@vcarnogu
Copy link

vcarnogu commented Apr 7, 2017

Thanks, this was super helpful to start with processing OpenTSDB data via Spark. I did a lot of research when finding the right approach, you should post it as an article online ;-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment