Skip to content

Instantly share code, notes, and snippets.

@designershao
Last active March 7, 2019 05:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save designershao/d0fa152a27c28438a59025f62c47f322 to your computer and use it in GitHub Desktop.
Save designershao/d0fa152a27c28438a59025f62c47f322 to your computer and use it in GitHub Desktop.
OpenTSDB Limit Query
import com.google.common.base.Strings;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import net.opentsdb.core.*;
import net.opentsdb.utils.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
public class TSDQueryMain {
private static Config config;
private static TSDB tsdb;
/**
* 查询
* @param metric 测点名称
* @param start 开始时间
* @param cursor 游标:上一次请求的最后一条数据的时间戳
* @param callback 回调
*/
public static void Query(String metric, String start, final String cursor, final Callback<Object, Iterator<DataPoint>> callback) {
final TSQuery query = new TSQuery();
query.setStart(start);
final TSSubQuery subQuery = new TSSubQuery();
subQuery.setAggregator("sum"); // DEMO 暂时写死
subQuery.setMetric(metric); // DEMO 暂时写死
subQuery.setDownsample("4m-avg"); // DEMO 暂时写死
final ArrayList<TSSubQuery> subQueries = new ArrayList<TSSubQuery>(1);
subQueries.add(subQuery);
query.setQueries(subQueries);
query.validateAndSetQuery();
Query[] tsdbqueries = query.buildQueries(tsdb);
final ArrayList<DataPoints[]> results = new ArrayList<DataPoints[]>(tsdbqueries.length);
final ArrayList<Deferred<DataPoints[]>> deferreds = new ArrayList<Deferred<DataPoints[]>>(tsdbqueries.length);
for (int i = 0;i<tsdbqueries.length;i++) {
deferreds.add(tsdbqueries[i].runAsync());
}
class QueriesCB implements Callback<Object, ArrayList<DataPoints[]>> {
public Object call(ArrayList<DataPoints[]> dataPoints) throws Exception {
results.addAll(dataPoints);
for (final DataPoints[] dataSets : results) {
for (final DataPoints data : dataSets) {
final SeekableView it = data.iterator();
if (!Strings.isNullOrEmpty(cursor)) {
it.seek(Long.parseLong(cursor)); // 根据游标进行移动
}
callback.call(it);
}
}
return null;
}
}
try {
Deferred.groupInOrder(deferreds)
.addCallback(new QueriesCB())
.join();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 查询
* @param metric 测点名称
* @param start 开始时间
* @param cursor 游标:上一次请求的最后一条数据的时间戳
* @param limit 限制返回条数
* @param callback 回调
*/
public static void Query(String metric, String start, String cursor, final int limit, final Callback<Object, ArrayList<DataPoint>> callback) {
final ArrayList<DataPoint> results = new ArrayList<DataPoint>();
class Cb implements Callback<Object, Iterator<DataPoint>> {
public Object call(Iterator<DataPoint> dataPointIterator) throws Exception {
int i = 1;
while (dataPointIterator.hasNext() && i <= limit) {
final DataPoint dp = dataPointIterator.next();
results.add(dp);
i++;
}
callback.call(results);
return null;
}
}
Query(metric, start,cursor,new Cb());
}
public static void main(String[] args) throws IOException {
config = new Config("your opentsdb.conf path");
tsdb = new TSDB(config);
class Cb implements Callback<Object, ArrayList<DataPoint>> {
public Object call(ArrayList<DataPoint> dataPoints) {
for(int i = 0;i< dataPoints.size(); i ++) {
DataPoint dp = dataPoints.get(i);
System.out.println(" " + dp.timestamp() + " "
+ (dp.isInteger() ? dp.longValue() : dp.doubleValue()));
}
return null;
}
}
Query("proc.loadavg.total_threads", "1day-ago","1551823920000", 10,new Cb());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment