Created July 20, 2016 11:06
package com.cloudflare;
import net.opentsdb.core.IllegalDataException;
import net.opentsdb.core.Internal;
import net.opentsdb.core.TSDB;
import net.opentsdb.utils.Config;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.hbase.async.KeyValue;
import scala.Tuple2;
import java.util.*;
public class OpenTSDBInput {
private OpenTSDBInput() {
public static JavaRDD<DataPoint> rdd(JavaSparkContext ctx, Config tsdbConfig, Configuration hbaseConfigOverrides) {
Configuration srcConf = HBaseConfiguration.create();
for (Map.Entry<String, String> entry : hbaseConfigOverrides) {
srcConf.set(entry.getKey(), entry.getValue());
srcConf.set("hbase.zookeeper.quorum", tsdbConfig.getString(""));
srcConf.set("zookeeper.znode.parent", tsdbConfig.getString(""));
srcConf.set(TableInputFormat.INPUT_TABLE, tsdbConfig.getString(""));
JavaPairRDD<ImmutableBytesWritable, Result> rdd = ctx.newAPIHadoopRDD(srcConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
Map<String, String> config = new ExportableConfig(tsdbConfig).export();
return rdd.mapPartitions(new DataPointPartitionMapper(config))
.flatMap(dps -> dps);
public static class ExportableConfig extends Config {
public ExportableConfig(Config parent) {
public Map<String, String> export() {
return new HashMap<>(properties);
public static class DataPoint implements Serializable {
private String metric;
private Map<String, String> tags;
private Long timestamp;
private Number value;
public DataPoint(String metric, Map<String, String> tags, Long timestamp, Number value) {
this.metric = metric;
this.tags = tags;
this.timestamp = timestamp;
this.value = value;
public String getMetric() {
return metric;
public Map<String, String> getTags() {
return tags;
public Long getTimestamp() {
return timestamp;
public Number getValue() {
return value;
public String toString() {
String line = metric + " " + timestamp + " " + value;
if (tags.size() > 0) {
List<String> kvs = new LinkedList<>();
for (Map.Entry<String, String> tag : tags.entrySet()) {
kvs.add(tag.getKey() + "=" + tag.getValue());
return line + " " + StringUtils.join(kvs, " ");
return line;
public static class DataPointPartitionMapper implements FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, Iterable<DataPoint>> {
private Map<String, String> conf;
private TSDB tsdb;
public DataPointPartitionMapper(Map<String, String> conf) {
this.conf = conf;
protected TSDB getTSDB() throws IOException {
if (this.tsdb == null) {
Config config = new Config(false);
for (Map.Entry<String, String> entry : this.conf.entrySet()) {
config.overrideConfig(entry.getKey(), entry.getValue());
this.tsdb = new TSDB(config);
return this.tsdb;
protected void shutdown() {
if (this.tsdb != null) {
try {
} catch (Exception e) {
throw new RuntimeException("Cannot shutdown OpenTSDB instance", e);
this.tsdb = null;
public Iterable<Iterable<DataPoint>> call(Iterator<Tuple2<ImmutableBytesWritable, Result>> rows) throws Exception {
return new Iterable<Iterable<DataPoint>>() {
public Iterator<Iterable<DataPoint>> iterator() {
Iterator<Iterable<DataPoint>> i = Iterators.transform(rows, (Tuple2<ImmutableBytesWritable, Result> row) -> process(row));
return new Iterator<Iterable<DataPoint>>() {
public boolean hasNext() {
if (!i.hasNext()) {
return false;
return i.hasNext();
public Iterable<DataPoint> next() {
public Iterable<DataPoint> process(Tuple2<ImmutableBytesWritable, Result> row) {
TSDB tsdb;
try {
tsdb = getTSDB();
} catch (IOException e) {
throw new RuntimeException("Cannot get OpenTSDB instance", e);
byte[] key = row._1().get();
String metric = Internal.metricName(tsdb, key);
long baseTime = Internal.baseTime(tsdb, key);
Map<String, String> tags = Internal.getTags(tsdb, key);
List<DataPoint> dps = new LinkedList<>();
for (Cell cell : row._2().rawCells()) {
byte[] family = Arrays.copyOfRange(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyOffset() + cell.getFamilyLength());
byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
KeyValue kv = new KeyValue(key, family, qualifier, cell.getTimestamp(), value);
if (qualifier.length == 2 || qualifier.length == 4 && Internal.inMilliseconds(qualifier)) {
Internal.Cell c = Internal.parseSingleValue(kv);
if (c == null) {
throw new IllegalDataException("Unable to parse row: " + kv);
dps.add(parseCell(metric, tags, c, baseTime));
} else {
// compacted column
ArrayList<Internal.Cell> cells;
try {
cells = Internal.extractDataPoints(kv);
} catch (IllegalDataException e) {
throw new IllegalDataException(Bytes.toStringBinary(key), e);
for (Internal.Cell c : cells) {
dps.add(parseCell(metric, tags, c, baseTime));
return dps;
protected DataPoint parseCell(String metric, Map<String, String> tags, Internal.Cell cell, long baseTime) {
return new DataPoint(metric, tags, cell.absoluteTimestamp(baseTime), cell.parseValue());
vcarnogu commented Apr 7, 2017

Thanks, this was super helpful to start with processing OpenTSDB data via Spark. I did a lot of research when finding the right approach, you should post it as an article online ;-)

